亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

MaxCompute Tunnel SDK數(shù)據(jù)上傳利器——BufferedWriter使用指南

nanfeiyan / 1632人閱讀

摘要:會(huì)盡最大可能容錯(cuò),保證數(shù)據(jù)上傳上去。多線程上傳示例多線程上傳時(shí),每個(gè)線程只需要打開一個(gè)往里面寫數(shù)據(jù)就行了。多個(gè)進(jìn)程共享由于一個(gè)的上傳狀態(tài)是通過(guò)維護(hù)一個(gè)實(shí)現(xiàn)的,對(duì)于多線程程序來(lái)講,通過(guò)鎖很容易實(shí)現(xiàn)資源的分配。

摘要: MaxCompute 的數(shù)據(jù)上傳接口(Tunnel)定義了數(shù)據(jù) block 的概念:一個(gè) block 對(duì)應(yīng)一個(gè) http request,多個(gè) block 的上傳可以并發(fā)而且是原子的,一次同步請(qǐng)求要么成功要么失敗,不會(huì)污染其他的 block。這種設(shè)計(jì)對(duì)于服務(wù)端來(lái)講十分簡(jiǎn)潔,但是也把記錄狀態(tài)做 fa.

本文用到的

阿里云數(shù)加-大數(shù)據(jù)計(jì)算服務(wù)MaxCompute產(chǎn)品地址:https://www.aliyun.com/produc...

MaxCompute 的數(shù)據(jù)上傳接口(Tunnel)定義了數(shù)據(jù) block 的概念:一個(gè) block 對(duì)應(yīng)一個(gè) http request,多個(gè) block 的上傳可以并發(fā)而且是原子的,一次同步請(qǐng)求要么成功要么失敗,不會(huì)污染其他的 block。這種設(shè)計(jì)對(duì)于服務(wù)端來(lái)講十分簡(jiǎn)潔,但是也把記錄狀態(tài)做 failover 的工作交給了客戶端。

用戶在使用 Tunnel SDK 編程時(shí),需要對(duì) block 這一層的語(yǔ)義進(jìn)行認(rèn)知,并且驅(qū)動(dòng)數(shù)據(jù)上傳的整個(gè)過(guò)程[1],并且自己進(jìn)行容錯(cuò),畢竟『網(wǎng)絡(luò)錯(cuò)誤是正常而不是異常』。由于用戶文檔中并沒有強(qiáng)調(diào)這一點(diǎn)的重要性,導(dǎo)致很多用戶踩了坑,一種常見的出錯(cuò)場(chǎng)景是,當(dāng)客戶端寫數(shù)據(jù)的速度過(guò)慢,兩次 write 的間隔超時(shí)[2],導(dǎo)致整個(gè) block 上傳失敗。

High Level API

MaxCompute Java SDK 在 0.21.3-public 之后新增了 BufferredWriter 這個(gè)更高層的 API,簡(jiǎn)化了數(shù)據(jù)上傳的過(guò)程,并且提供了容錯(cuò)的功能。 BufferedWriter 對(duì)用戶隱藏了 block 這個(gè)概念,從用戶角度看,就是在 session 上打開一個(gè) writer 然后往里面寫記錄即可:

RecordWriter writer = null;

try {
  int i = 0;  
  writer = uploadSession.openBufferedWriter();
  Record product = uploadSession.newRecord();

  for (String item : items) {
    product.setString("name", item);
    product.setBigint("id", i);
    writer.write(product);
    i += 1;
  }
} finally {
  if (writer != null) {
    writer.close();
  }
}
uploadSession.commit();

具體實(shí)現(xiàn)時(shí) BufferedWriter 先將記錄緩存在客戶端的緩沖區(qū)中,并在緩沖區(qū)填滿之后打開一個(gè) http 連接進(jìn)行上傳。BufferedWriter 會(huì)盡最大可能容錯(cuò),保證數(shù)據(jù)上傳上去。

由于屏蔽了底層細(xì)節(jié),這個(gè)接口可能并不適合數(shù)據(jù)預(yù)劃分、斷點(diǎn)續(xù)傳、分批次上傳等需要細(xì)粒度控制的場(chǎng)景。

多線程上傳示例

多線程上傳時(shí),每個(gè)線程只需要打開一個(gè) writer 往里面寫數(shù)據(jù)就行了。

class UploadThread extends Thread {
  private UploadSession session;
  private static int RECORD_COUNT = 1200;

  public UploadThread(UploadSession session) {
    this.session = session;
  }

  @Override
  public void run() {
    RecordWriter writer = up.openBufferedWriter();
    Record r = up.newRecord();
    for (int i = 0; i < RECORD_COUNT; i++) {
      r.setBigint(0, i);
      writer.write(r);
    }
    writer.close();
  }
};

public class Example {
  public static void main(String args[]) {

   // 初始化 MaxCompute 和 tunnel 的代碼

   TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(projectName, tableName);
   UploadThread t1 = new UploadThread(up);
   UploadThread t2 = new UploadThread(up);

   t1.start();
   t2.start();
   t1.join();
   t2.join();

   uploadSession.commit();
 }

更多控制

重試策略

由于底層在上傳出錯(cuò)時(shí)會(huì)回避一段固定的時(shí)間并進(jìn)行重試,但如果你的程序不想花太多時(shí)間在重試上,或者你的程序位于一個(gè)極其惡劣的網(wǎng)絡(luò)環(huán)境中,為此 TunnelBufferedWriter 允許用戶配置重試策略。

用戶可以選擇三種重試回避策略:指數(shù)回避(EXPONENTIAL_BACKOFF)、線性時(shí)間回避(LINEAR_BACKOFF)、常數(shù)時(shí)間回避(CONSTANT_BACKOFF)。

例如下面這段代碼可以將,write 的重試次數(shù)調(diào)整為 6,每一次重試之前先分別回避 4s、8s、16s、32s、64s 和 128s(從 4 開始的指數(shù)遞增的序列)。

RetryStrategy retry 
  = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF)

writer = (TunnelBufferedWriter) uploadSession.openBufferedWriter();
writer.setRetryStrategy(retry);

緩沖區(qū)控制

如果你的程序?qū)?JVM 的內(nèi)存有嚴(yán)格的要求,可以通過(guò)下面這個(gè)接口修改緩沖區(qū)占內(nèi)存的字節(jié)數(shù)(bytes):

writer.setBufferSize(1024*1024);

默認(rèn)配置每一個(gè) Writer 的 BufferSize 是 10 MiB。TunnelBufferedWriter 一次 flush buffer 的操作上傳一個(gè) block 的數(shù)據(jù)[3]。

多個(gè)進(jìn)程共享 Session

由于一個(gè) Session 的上傳狀態(tài)是通過(guò)維護(hù)一個(gè) block list 實(shí)現(xiàn)的,對(duì)于多線程程序來(lái)講,通過(guò)鎖很容易實(shí)現(xiàn)資源的分配。但對(duì)于兩個(gè)進(jìn)程空間里的程序想要復(fù)用一個(gè) Session 時(shí),必須通過(guò)一種機(jī)制對(duì)資源進(jìn)行隔離。

具體地,在 getUploadSession 的時(shí)候,必須指定這個(gè)共享這個(gè) Session 的進(jìn)程數(shù)目,以及一個(gè)用來(lái)區(qū)分進(jìn)程的 global id:

//程序1:這個(gè) session 將被兩個(gè) writer 共享,我是其中第 0 個(gè)
TableTunnel.UploadSession up 
  = tunnel.getUploadSession(projectName, tableName, sid, 2, 0); 
writer = session.openBufferedWriter();

//程序1:這個(gè) session 將被兩個(gè) writer 共享,我是其中第 1 個(gè)
TableTunnel.UploadSession up 
  = tunnel.getUploadSession(projectName, tableName, sid, 2, 1); 
writer = session.openBufferedWriter();

Notes

[1] 一次完整的上傳流程通常包括以下步驟:

先對(duì)數(shù)據(jù)進(jìn)行劃分
為每個(gè)數(shù)據(jù)塊指定 block id,即調(diào)用 openRecordWriter(id)
然后用一個(gè)或多個(gè)線程分別將這些 block 上傳上去
并在某個(gè) block 上傳失敗以后,需要對(duì)整個(gè) block 進(jìn)行重傳
在所有 block 都上傳以后,向服務(wù)端提供上傳成功的 blockid list 進(jìn)行校驗(yàn),即調(diào)用 session.commit([1,2,3,...])
[2] 因?yàn)槭褂瞄L(zhǎng)連接,服務(wù)端有計(jì)時(shí)器判斷是否客戶端是否 alive

[3] block 在服務(wù)端有 20000 個(gè)的數(shù)量上限,如果 BufferSize 設(shè)得太小會(huì)導(dǎo)致 20000 個(gè) block 很快被用光

[4] Session的有效期為24小時(shí),超過(guò)24小時(shí)會(huì)導(dǎo)致數(shù)據(jù)上傳失敗

原文鏈接

閱讀更多干貨好文,請(qǐng)關(guān)注掃描以下二維碼:

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/71017.html

相關(guān)文章

  • MaxCompute Studio使用心得系列6——一個(gè)工具完成整個(gè)Python UDF開發(fā)

    摘要:摘要北京云棲大會(huì)上阿里云發(fā)布了最新的功能,萬(wàn)眾期待的功能終于支持啦,我怎么能不一試為快,今天就分享如何通過(guò)進(jìn)行開發(fā)。注冊(cè)函數(shù)在腳本中編輯試用好了,一個(gè)簡(jiǎn)單完整的通過(guò)開發(fā)實(shí)踐分享完成。 摘要: 2017/12/20 北京云棲大會(huì)上阿里云MaxCompute發(fā)布了最新的功能Python UDF,萬(wàn)眾期待的功能終于支持啦,我怎么能不一試為快,今天就分享如何通過(guò)Studio進(jìn)行Python u...

    張遷 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<