亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專(zhuān)欄INFORMATION COLUMN

Kafka學(xué)習(xí)筆記

aikin / 2289人閱讀

摘要:學(xué)習(xí)筆記使用一個(gè)叫的文學(xué)家的名字用來(lái)命名的。引入,正式升級(jí)為分布式流處理平臺(tái)。主要還是針對(duì)組成員數(shù)量減少的情況。當(dāng)所有成員都退出組后,消費(fèi)者組狀態(tài)變更為。自動(dòng)定期刪除過(guò)期位移的條件就是,組要處于狀態(tài)。減少下游系統(tǒng)一次性消費(fèi)的消息總數(shù)。

Kafka 學(xué)習(xí)筆記

Kafka使用一個(gè)叫Franz Kafka的文學(xué)家的名字用來(lái)命名的。

Kafka是一款開(kāi)源的消息引擎系統(tǒng)。也是一個(gè)分布式流處理平臺(tái)。

Kafka同時(shí)支持點(diǎn)對(duì)點(diǎn)模型以及發(fā)布/訂閱模型。

為什么要使用Kakfa?四個(gè)字:削峰填谷!

Kafka 術(shù)語(yǔ)

Record:消息,指Kafka處理對(duì)象

Topic:主題,用來(lái)承載消息的容器

Partition:分區(qū),一個(gè)有序不變的消息隊(duì)列,一個(gè)主題下可以有多個(gè)分區(qū)

Offset:消息位移,表示分區(qū)中每條信息的位置,是一個(gè)單調(diào)遞增不變的值

Replica,副本,數(shù)據(jù)冗余。

領(lǐng)導(dǎo)者副本:對(duì)外提供服務(wù),與客戶端進(jìn)行交互

追隨者副本:不能與外界進(jìn)行交互,只是被動(dòng)地追隨領(lǐng)導(dǎo)者副本

Producer:生產(chǎn)者,向主題發(fā)布新消息的應(yīng)用程序

Consumer:消費(fèi)者,向主題訂閱新消息的應(yīng)用程序

Consumer Offset:消費(fèi)者位移,表示消費(fèi)者消費(fèi)進(jìn)度

Consumer Group:消費(fèi)者組,多個(gè)消費(fèi)者實(shí)例共同組成的一個(gè)組,同時(shí)消費(fèi)多個(gè)分區(qū)來(lái)實(shí)現(xiàn)高吞吐。

Rebalance:重平衡,消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過(guò)程。它是Kafka消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。

Kafka 種類(lèi)

Apache Kafka: 也稱(chēng)社區(qū)版Kafka,迭代速度快,社區(qū)響應(yīng)度高,使用它可以讓你有更高的把控度;缺陷在于僅僅提供基礎(chǔ)核心組件,缺失一些高級(jí)特性

Confluent Kafka: 優(yōu)勢(shì)在于集成了很多高級(jí)特性且由Kafka原班人馬打造,質(zhì)量保證;缺陷在于國(guó)內(nèi)相關(guān)資料不全,普及率較低,沒(méi)有太多可參考的范例。

CDH/HDP Kafka: 優(yōu)勢(shì)在于操作簡(jiǎn)單,節(jié)省運(yùn)維成本;缺陷在于把控度低,演進(jìn)速度慢

Kafka 版本號(hào) 一個(gè)題外話

Kafka新版本客戶端代碼開(kāi)始完全由java語(yǔ)言編寫(xiě),于是有些人開(kāi)始“JAVA VS SCALA”的大討論。并從語(yǔ)言特性上分析為什么社區(qū)擯棄Scala轉(zhuǎn)而投向Java的懷抱。

其實(shí)事情沒(méi)有那么復(fù)雜,僅僅是因?yàn)樯鐓^(qū)來(lái)了一批Java程序猿,而以前老的scala程序猿隱退了罷了。

版本演進(jìn)

Kafka總共演進(jìn)了7個(gè)大版本

0.7版本: 上古版本,一旦有人向你推薦這個(gè)版本,懟他。

0.8版本: 開(kāi)始引入副本機(jī)制,另外老版本需要制定zookeeper地址而不是Broker地址。在0.8.2.0版本社區(qū)引入了新版本Producer API,即指定Broker地址的Producer。

0.9版本: 重量級(jí)的大版本更迭。增加了基礎(chǔ)的安全認(rèn)證/權(quán)限功能,引入了Kafka Connect,新版本Producer API穩(wěn)定。

0.10.0.0: 里程碑的大版本。該版本又有兩個(gè)小版本,0.10.1和0.10.2。引入Kafka streams,正式升級(jí)為分布式流處理平臺(tái)。0.10.2.2 新Consumer API穩(wěn)定。

0.11.0.0: 目前最主流的版本之一。引入兩個(gè)重量級(jí)功能變更:一個(gè)是提供冪等性Producer API以及事務(wù) API, 另一個(gè)是對(duì)Kafka消息格式做了重構(gòu)。

1.0和2.0: 如果你是Kafka Stream用戶,至少選擇2.0.0版本吧。

最后還有個(gè)建議,不論你使用的是哪個(gè)版本,都請(qǐng)盡量保持服務(wù)端版本和客戶端版本一致,否則你將損失很多Kafka為你提供的性能優(yōu)化收益。

江湖經(jīng)驗(yàn):不要輕易成為新版本的小白鼠。

集群部署

磁盤(pán)容量舉例:

假設(shè)公司有個(gè)業(yè)務(wù)需要每天向Kafka集群發(fā)送 1 億條信息。每條消息保存兩份來(lái)防止數(shù)據(jù)丟失。消息默認(rèn)保存兩周時(shí)間。并假設(shè)消息的平均大小是1KB。問(wèn)你的Kafka集群需要為這個(gè)業(yè)務(wù)預(yù)留多少磁盤(pán)空間?

總大小:1億 1KB 2備份 * 14 ~= 2800G
加上Kafka的一些索引數(shù)據(jù),為它預(yù)留10%,那么總大小變?yōu)?2800 * (1 + 10%) ~= 3TB

Kafka支持?jǐn)?shù)據(jù)壓縮,壓縮比0.75的話,那么應(yīng)該預(yù)留的存儲(chǔ)空間為2.25TB左右。

帶寬舉例

與其說(shuō)是帶寬資源的規(guī)劃,其實(shí)真正要規(guī)劃的是Kafka服務(wù)器的數(shù)量。

假設(shè)公司機(jī)房環(huán)境1Gbps,現(xiàn)有個(gè)業(yè)務(wù),需要在1小時(shí)內(nèi)處理1TB的業(yè)務(wù)數(shù)據(jù)。

一般單臺(tái)服務(wù)器 規(guī)劃使用70%的帶寬資源的1/3 ~= 240Mbps。

1TB需要1小時(shí)處理,則每秒差不多需要處理2336Mbps的數(shù)據(jù),除 240Mbps,則差不多需要10臺(tái)機(jī)器。如果消息還需要額外復(fù)制的話,那么還要對(duì)應(yīng)乘上備份數(shù)。

集群配置參數(shù)
配置名稱(chēng) 示例 建議值
log.dirs /home/kafka1,/home/kafka2 kafka寫(xiě)日志多路徑,不僅能提升寫(xiě)性能,在1.1版本中還能支持故障轉(zhuǎn)移功能。
zookeeper.connect zk1:2181,zk2:2181,zk3:2181/kafka1
listens listeners=PLAINTEXT://dn1.ambari:6667
auto.create.topics.enable true false,不建議可以自動(dòng)創(chuàng)建主題
unclean.leader.election.enable false false,如果設(shè)置為true有丟數(shù)據(jù)風(fēng)險(xiǎn)
auto.leader.rebalance.enable false false,不定期進(jìn)行l(wèi)eader副本的選舉
log.retention.hours 168 默認(rèn)保持7天數(shù)據(jù)
log.retention.bytes -1 保存多少數(shù)據(jù)都可以
message.max.bytes 1000000 默認(rèn)值建議調(diào)大。該值代表Broker能處理的最大消息大小
生產(chǎn)者分區(qū)策略 輪詢策略

隨機(jī)策略

按消息保存鍵策略

自定義策略 生產(chǎn)者壓縮

壓縮配置

compression.type

壓縮算法

總結(jié)一下壓縮和解壓縮,Producer端壓縮,Broker端保持,Consumer端解壓縮。

無(wú)消息丟失最佳實(shí)踐

不要使用producer.send(msg),而要使用producer.send(msg,callback)

設(shè)置acks=all,表明所有副本Broker都要接受消息,該消息才算是“已提交”

設(shè)置retries>0,表明Producer自動(dòng)重試,當(dāng)網(wǎng)絡(luò)順斷時(shí),防止消息丟失。

設(shè)置unclean.leader.election.enable=false

設(shè)置replication.factor >=3,增加副本數(shù),保證數(shù)據(jù)冗余

設(shè)置min.insync.replicas > 1,控制的是消息至少要被寫(xiě)入多少個(gè)副本才算是 已提交。

確保replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個(gè)副本掛機(jī),整個(gè)分區(qū)就無(wú)法正常工作了。推薦設(shè)置replication.factor = min.insync.replicas + 1

確保消息消費(fèi)完再提交。設(shè)置enable.aoto.commit=false

Kafka 攔截器

分為生產(chǎn)者攔截器和消費(fèi)者攔截器。

典型的應(yīng)用場(chǎng)景可以應(yīng)用于客戶端監(jiān)控、端到端系統(tǒng)性能測(cè)試、消息審計(jì)等多種功能在內(nèi)的場(chǎng)景。

Kafka是如何管理TCP連接的 java生產(chǎn)者是如何管理TCP連接的

KafkaProducer實(shí)例創(chuàng)建時(shí)啟動(dòng)Sender線程,從而創(chuàng)建與bootstrap.servers中所有的Broker的TCP連接。

KafkaProducer實(shí)例首次更新元數(shù)據(jù)信息之后,還會(huì)再次創(chuàng)建與集群中所有Broker的TCP連接

如果Producer端發(fā)送信息到某臺(tái)Broker時(shí),發(fā)現(xiàn)沒(méi)有與該Broker的TCP連接,那么也會(huì)創(chuàng)建連接

如果設(shè)置connections.max.idle.ms > 0,則步驟一中的TCP連接會(huì)被自動(dòng)關(guān)閉;如果設(shè)置該參數(shù)-1,那么步驟一中創(chuàng)建的連接無(wú)法被關(guān)閉,會(huì)成為僵尸進(jìn)程。

Java消費(fèi)者是如何管理TCP連接的

創(chuàng)建的3個(gè)時(shí)機(jī)

發(fā)起FindCoordinator請(qǐng)求時(shí)

連接協(xié)調(diào)者時(shí)

消費(fèi)數(shù)據(jù)時(shí)

消費(fèi)者程序會(huì)創(chuàng)建3類(lèi)TCP連接

確定協(xié)調(diào)者和獲取集群元數(shù)據(jù)

連接協(xié)調(diào)者,令其執(zhí)行組成員管理操作

執(zhí)行實(shí)際的消息獲取

冪等生產(chǎn)者和事務(wù)生產(chǎn)者

消息交付可靠性保障,常見(jiàn)的承諾有以下三種

最多一次:消息可能會(huì)丟失,但絕不會(huì)重復(fù)發(fā)送

至少一次:消息不會(huì)丟失,但有可能被重復(fù)發(fā)送

精確一次:消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送

Kafka默認(rèn)是最少一次

要保證精確一次,就需要冪等和事務(wù)。不過(guò)性能會(huì)想對(duì)較差。

冪等生產(chǎn)者

冪等性有很多好處。其最大的優(yōu)勢(shì)在于我們可以安全地重試任何冪等性操作,反正它們不會(huì)破壞我們的系統(tǒng)狀態(tài)。

在0.11.0.0版本引入了冪等生產(chǎn)者,只要更改配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。

使用冪等生產(chǎn)者要注意

它只能保證單分區(qū)的冪等,多分區(qū)無(wú)法實(shí)現(xiàn)

只能實(shí)現(xiàn)單會(huì)話上的冪等,重啟之后冪等消失

事務(wù)生產(chǎn)者

設(shè)置事務(wù)型Producer

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)

設(shè)置producer端參數(shù)transctional.id。最好為其設(shè)置一個(gè)有意義的名字

此外代碼也要做一些調(diào)整變化。

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}
重平衡 怎么避免Rebalance

Rebalance發(fā)生的時(shí)機(jī)有三個(gè)

組成員數(shù)據(jù)量發(fā)生變化

訂閱主題數(shù)量發(fā)生變化

訂閱主題的分區(qū)數(shù)發(fā)生變化

后面兩個(gè)通常是運(yùn)維的主動(dòng)操作,無(wú)法避免。主要還是針對(duì)組成員數(shù)量減少的情況。增加一般也是人為主動(dòng)的。

那么避免因?yàn)閰?shù)或邏輯不合理而導(dǎo)致的成員退出,與之相關(guān)的主要參數(shù)

session.timeout.ms,推薦設(shè)置6s

heartbeat.interval.ms,推薦設(shè)置2s

max.poll.interval.ms,推薦設(shè)置比你的業(yè)務(wù)邏輯處理要長(zhǎng)

GC參數(shù),避免頻繁的FULL GC

重平衡通知

重平衡過(guò)程是通過(guò) 消費(fèi)者端的心跳線程來(lái)通知到其他消費(fèi)者實(shí)例的。

0.10.1.0版本之前,發(fā)送心跳請(qǐng)求是在消費(fèi)者主線程完成的,也就是kafkaConsumer.poll方法的那個(gè)線程。這樣做有諸多弊端,因?yàn)橄⑻幚硪彩窃谶@個(gè)線程中完成的。因此當(dāng)業(yè)務(wù)邏輯處理消耗了較長(zhǎng)時(shí)間,心跳請(qǐng)求就無(wú)法及時(shí)發(fā)送到協(xié)調(diào)者那邊了。導(dǎo)致協(xié)調(diào)者 錯(cuò)誤地認(rèn)為該消費(fèi)者已經(jīng)死了。

0.10.1.0版本開(kāi)始,社區(qū)引入了一個(gè)多帶帶的線程來(lái)專(zhuān)門(mén)執(zhí)行心跳發(fā)送。

消費(fèi)者組狀態(tài)機(jī)

定義了5種狀態(tài)

各個(gè)狀態(tài)的流轉(zhuǎn)

一個(gè)消費(fèi)者組最開(kāi)始是Empty狀態(tài),當(dāng)重平衡過(guò)程開(kāi)啟后,它會(huì)被置為PreparingRebalance狀態(tài)等待成員加入,之后變更到CompletingRebalance狀態(tài)等待分配方案,最后流轉(zhuǎn)到Stable狀態(tài)完成重平衡。

當(dāng)有新成員或已有成員退出時(shí),消費(fèi)者組的狀態(tài)從Stable直接跳到PreparingRebalance狀態(tài),此時(shí),所有現(xiàn)存成員就必須重新申請(qǐng)加入組。

當(dāng)所有成員都退出組后,消費(fèi)者組狀態(tài)變更為Empty。

Kafka自動(dòng)定期刪除過(guò)期位移的條件就是,組要處于Empty狀態(tài)。

重平衡流程 消費(fèi)者端重平衡流程

JoinGroup請(qǐng)求

SyncGroup請(qǐng)求

Broker端重平衡場(chǎng)景分析

新成員入組

組成員主動(dòng)離組

組成員崩潰離組

重平衡時(shí)協(xié)調(diào)者對(duì)組內(nèi)成員提交位移的處理

位移提交

CommitFailedException怎么處理?

縮短消息處理的時(shí)間,該方法優(yōu)先處理

增加Consumer端允許下游系統(tǒng)消費(fèi)一批數(shù)據(jù)的最大時(shí)長(zhǎng)。設(shè)置參數(shù)max.poll.interval.ms,新版本默認(rèn)是5分鐘。

減少下游系統(tǒng)一次性消費(fèi)的消息總數(shù)。max.poll.records

下游系統(tǒng)使用多線程來(lái)加速消費(fèi)

多消費(fèi)者實(shí)例

鑒于KafkaConsumer不是線程安全的事實(shí),制定兩套多線程方案。

每個(gè)線程維護(hù)專(zhuān)屬的KafkaConsumer實(shí)例,負(fù)責(zé)完整的消息獲取、消息處理流程

核心代碼

```
public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;


     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
            ConsumerRecords records = 
                consumer.poll(Duration.ofMillis(10000));
                 //  執(zhí)行消息處理邏輯
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }


     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
```

消費(fèi)者程序使用單或多線程獲取消息,創(chuàng)建多個(gè)消費(fèi)者線程執(zhí)行消息處理邏輯

核心代碼

```
private final KafkaConsumer consumer;
private ExecutorService executors;
...


private int workerNum = ...;
executors = new ThreadPoolExecutor(
    workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000), 
    new ThreadPoolExecutor.CallerRunsPolicy());


...
while (true)  {
    ConsumerRecords records = 
        consumer.poll(Duration.ofSeconds(1));
    for (final ConsumerRecord record : records) {
        executors.submit(new Worker(record));
    }
}

```

兩種方案各有特點(diǎn)。

監(jiān)控消費(fèi)進(jìn)度的3種方法

使用Kafka自帶命令行工具kafka-consumer-groups腳本

使用Kafka Consumer API

使用Kafka自帶的JMX監(jiān)控指標(biāo)

Kafka副本詳解

副本機(jī)制的好處:

提供數(shù)據(jù)冗余

提供高伸縮性

改善數(shù)據(jù)局部性

但Kafka只有第一種好處,原因是這樣的設(shè)計(jì),Kafka有兩點(diǎn)好處

方便實(shí)現(xiàn) Read-your-writes

指當(dāng)你用生產(chǎn)者API向Kafka成功寫(xiě)入消息后,馬上使用消費(fèi)者API去讀取剛才生產(chǎn)的消息

方便實(shí)現(xiàn)單調(diào)讀(Monotonic Reads)

在多次消費(fèi)信息時(shí),不會(huì)看到該消息一會(huì)存在一會(huì)不存在的情況。

判斷Follower副本與Leader副本是否同步的標(biāo)準(zhǔn),Broker參數(shù)replia.lag.time.max.ms的參數(shù)值。Kafka有一個(gè)in-sync Replicas(ISR)集合的概念。

Kafka控制器

控制器組件(Controller),是Kafka的核心組件,它的主要作用是在Apache Zookeeper的幫助下管理和協(xié)調(diào)整個(gè)Kafka集群。

控制器是怎么被選出來(lái)的

每臺(tái)Broker都能充當(dāng)控制器,在Broker啟動(dòng)時(shí),會(huì)嘗試去Zookeeper中創(chuàng)建/controller節(jié)點(diǎn)。Kafka當(dāng)前選舉規(guī)則,第一個(gè)成功創(chuàng)建/controller節(jié)點(diǎn)的Broker會(huì)被指定為控制器。

控制器能做什么?

主題管理

分區(qū)重分配

Prefered領(lǐng)導(dǎo)者選舉

集群成員管理

數(shù)據(jù)服務(wù),控制器上保存最全的集群元數(shù)據(jù)信息

控制器保存了什么數(shù)據(jù)?

這些數(shù)據(jù)其實(shí)也在Zookeeper中存儲(chǔ)了一份。

控制器的故障轉(zhuǎn)移

總結(jié)

小竅門(mén)分享:當(dāng)你覺(jué)得控制器出現(xiàn)問(wèn)題時(shí),比如主題無(wú)法刪除了,重分區(qū)hang住了,你可以不用重啟broker或者控制器,快速簡(jiǎn)便的方法,直接去Zookeeper手動(dòng)刪除/controller節(jié)點(diǎn)。

這樣做的好處是,既可以引發(fā)控制器的重選舉,又可以避免重啟Broker導(dǎo)致的消息中斷。

Kafka請(qǐng)求處理 請(qǐng)求方案

Kafka方案類(lèi)似于Reactor模式

那么Kafka類(lèi)似的方案是這樣的。網(wǎng)絡(luò)線程池默認(rèn)參數(shù)num.network.threads=3

好了,客戶端發(fā)來(lái)的請(qǐng)求會(huì)被Aceptor線程分發(fā)到任意一個(gè)網(wǎng)絡(luò)線程中,由他們進(jìn)行處理。你可能會(huì)認(rèn)為,網(wǎng)絡(luò)線程池是順序處理不就好了?實(shí)際上,Kafka在這個(gè)環(huán)節(jié)上又做了一層異步線程池的處理。

IO線程池執(zhí)行真正的處理。如果是PRODUCER生產(chǎn)請(qǐng)求,則將消息寫(xiě)入到底層的磁盤(pán)日志中;如果是FETCH請(qǐng)求,則從磁盤(pán)或頁(yè)緩存中讀取消息。當(dāng)IO請(qǐng)求處理完請(qǐng)求后,會(huì)將生成的響應(yīng)放入網(wǎng)絡(luò)線程池的響應(yīng)隊(duì)列中,并由對(duì)應(yīng)的網(wǎng)絡(luò)線程負(fù)責(zé)將Response反還給客戶端。

請(qǐng)求隊(duì)列是所有網(wǎng)絡(luò)線程共享的,而響應(yīng)隊(duì)列則是每個(gè)網(wǎng)絡(luò)線程專(zhuān)屬的。

IO線程池默認(rèn)參數(shù)num.io.threads=8

圖中還有一個(gè)Purgatory的組件,這是Kafka中著名的“煉獄”組件。

它是用來(lái)緩存延時(shí)請(qǐng)求的,所謂延時(shí)請(qǐng)求,就是那些一時(shí)未滿足條件的不可立刻處理的請(qǐng)求。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/75734.html

相關(guān)文章

  • 極客時(shí)間《Kafka核心技術(shù)與實(shí)戰(zhàn)》返現(xiàn) + 腦圖 + 送學(xué)習(xí)筆記

    摘要:作者胡夕人人貸計(jì)算平臺(tái)部總監(jiān),將在這篇專(zhuān)欄中一步一步的教你填平這些坑,全面提升你的實(shí)戰(zhàn)能力搭配掘金小冊(cè)圖解之核心原理學(xué)習(xí)效果更佳哦送學(xué)習(xí)筆記 showImg(https://segmentfault.com/img/bVbsg9O?w=258&h=258);關(guān)注有課學(xué)微信公眾號(hào),回復(fù)暗號(hào) kafka 獲取購(gòu)買(mǎi)《Kafka核心技術(shù)與實(shí)戰(zhàn)》極客時(shí)間專(zhuān)欄地址,購(gòu)買(mǎi)成功后提交購(gòu)買(mǎi)截圖即可獲得返...

    yvonne 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

aikin

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<