亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

kafka

W4n9Hu1 / 2148人閱讀

摘要:生產(chǎn)者發(fā)送消息到指定的下,消息者從這個(gè)下消費(fèi)消息。消費(fèi)組,用于歸組同類消費(fèi)者。中的消息序列是有序的消息序列。在使用偏移量來(lái)指定消息的位置。

什么是Kafka
Kafka是一個(gè)分布式流處理系統(tǒng),流處理系統(tǒng)使它可以像消息隊(duì)列一樣publish或者subscribe消息,分布式提供了容錯(cuò)性,并發(fā)處理消息的機(jī)制。

Kafka的基本概念

kafka運(yùn)行在集群上,集群包含一個(gè)或多個(gè)服務(wù)器。kafka把消息存在topic中,每一條消息包含鍵值(key),值(value)和時(shí)間戳(timestamp)。

kafka有以下一些基本概念:

Producer

消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。

Consumer

消息消費(fèi)者,是消息的使用方,負(fù)責(zé)消費(fèi)Kafka服務(wù)器上的消息。

Topic

主題,由用戶定義并配置在Kafka服務(wù)器,用于建立Producer和Consumer之間的訂閱關(guān)系。生產(chǎn)者發(fā)送消息到指定的Topic下,消息者從這個(gè)Topic下消費(fèi)消息。

Partition

消息分區(qū),一個(gè)topic可以分為多個(gè) partition,每個(gè)partition是一個(gè)有序的隊(duì)列。partition中的每條消息都會(huì)被分配一個(gè)有序的id(offset)。

Broker

一臺(tái)kafka服務(wù)器就是一個(gè)broker。一個(gè)集群由多個(gè)broker組成。一個(gè)broker可以容納多個(gè)topic。

Consumer Group

消費(fèi)組,用于歸組同類消費(fèi)者。每個(gè)consumer屬于一個(gè)特定的consumer group,多個(gè)消費(fèi)組可以共同消息一個(gè)Topic下的消息,每消費(fèi)組中的消費(fèi)者消費(fèi)Topic的部分消息,這些消費(fèi)者就組成了一個(gè)分組。

Offset

消息在partition中的偏移量。每一條消息在partition都有唯一的偏移量,消息者可以指定偏移量來(lái)指定要消費(fèi)的消息。
Kafka分布式架構(gòu)

如上圖所示,kafka將topic中的消息存在不同的partition中。如果存在鍵值(key),消息按照鍵值(key)做分類存在不同的partiition中,如果不存在鍵值(key),消息按照輪詢(Round Robin)機(jī)制存在不同的partition中。默認(rèn)情況下,鍵值(key)決定了一條消息會(huì)被存在哪個(gè)partition中。

partition中的消息序列是有序的消息序列。kafka在partition使用偏移量(offset)來(lái)指定消息的位置。一個(gè)topic的一個(gè)partition只能被一個(gè)consumer group中的一個(gè)consumer消費(fèi),多個(gè)consumer消費(fèi)同一個(gè)partition中的數(shù)據(jù)是不允許的,但是一個(gè)consumer可以消費(fèi)多個(gè)partition中的數(shù)據(jù)。

kafka將partition的數(shù)據(jù)復(fù)制到不同的broker,提供了partition數(shù)據(jù)的備份。每一個(gè)partition都有一個(gè)broker作為leader,若干個(gè)broker作為follower。所有的數(shù)據(jù)讀寫都通過(guò)leader所在的服務(wù)器進(jìn)行,并且leader在不同broker之間復(fù)制數(shù)據(jù)。

上圖中,對(duì)于Partition 0,broker 1是它的leader,broker 2和broker 3是follower。對(duì)于Partition 1,broker 2是它的leader,broker 1和broker 3是follower。

在上圖中,當(dāng)有Client(也就是Producer)要寫入數(shù)據(jù)到Partition 0時(shí),會(huì)寫入到leader Broker 1,Broker 1再將數(shù)據(jù)復(fù)制到follower Broker 2和Broker 3。

在上圖中,Client向Partition 1中寫入數(shù)據(jù)時(shí),會(huì)寫入到Broker 2,因?yàn)锽roker 2是Partition 1的Leader,然后Broker 2再將數(shù)據(jù)復(fù)制到follower Broker 1和Broker 3中。

上圖中的topic一共有3個(gè)partition,對(duì)每個(gè)partition的讀寫都由不同的broker處理,因此總的吞吐量得到了提升。

實(shí)驗(yàn)一:kafka-python實(shí)現(xiàn)生產(chǎn)者消費(fèi)者

kafka-python是一個(gè)python的Kafka客戶端,可以用來(lái)向kafka的topic發(fā)送消息、消費(fèi)消息。

這個(gè)實(shí)驗(yàn)會(huì)實(shí)現(xiàn)一個(gè)producer和一個(gè)consumer,producer向kafka發(fā)送消息,consumer從topic中消費(fèi)消息。結(jié)構(gòu)如下圖

producer代碼

#-*- coding: utf-8 -*-

from kafka import KafkaProducer
import time

producer = KafkaProducer(bootstrap_servers="localhost:9092")

i = 1000
while True:
    ts = int(time.time() * 1000)
    producer.send(topic="py_test", value=str(i), key=str(i), timestamp_ms=ts)
    producer.flush()
    print i
    i += 1
    time.sleep(1)

consumer代碼

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer("py_test", bootstrap_servers=["localhost:9092"])
for message in consumer:
    print message

接下來(lái)創(chuàng)建test topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

打開兩個(gè)窗口中,我們?cè)趙indow1中運(yùn)行producer,如下:

在window2中運(yùn)行consumer,如下:

實(shí)驗(yàn)二:消費(fèi)組實(shí)現(xiàn)容錯(cuò)性機(jī)制

這個(gè)實(shí)驗(yàn)將展示消費(fèi)組的容錯(cuò)性的特點(diǎn)。這個(gè)實(shí)驗(yàn)中將創(chuàng)建一個(gè)有2個(gè)partition的topic,和2個(gè)consumer,這2個(gè)consumer共同消費(fèi)同一個(gè)topic中的數(shù)據(jù)。結(jié)構(gòu)如下所示

producer部分代碼和實(shí)驗(yàn)一相同,這里不再重復(fù)。consumer需要指定所屬的consumer group,代碼如下

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer

consumer = KafkaConsumer("py_test", group_id="testgt", bootstrap_servers=["localhost:9092"])
for message in consumer:
    print message

接下來(lái)我們創(chuàng)建topic,名字test,設(shè)置partition數(shù)量為2

打開三個(gè)窗口,一個(gè)窗口運(yùn)行producer,還有兩個(gè)窗口運(yùn)行consumer。
運(yùn)行consumer的兩個(gè)窗口的輸出如下:

可以看到兩個(gè)consumer同時(shí)運(yùn)行的情況下,它們分別消費(fèi)不同partition中的數(shù)據(jù)。window1中的consumer消費(fèi)partition 0中的數(shù)據(jù),window2中的consumer消費(fèi)parition 1中的數(shù)據(jù)。
我們嘗試關(guān)閉window1中的consumer,可以看到如下結(jié)果

剛開始window2中的consumer只消費(fèi)partition1中的數(shù)據(jù),當(dāng)window1中的consumer退出后,window2中的consumer中也開始消費(fèi)partition 0中的數(shù)據(jù)了。

實(shí)驗(yàn)三:offset管理

kafka允許consumer將當(dāng)前消費(fèi)的消息的offset提交到kafka中,這樣如果consumer因異常退出后,下次啟動(dòng)仍然可以從上次記錄的offset開始向后繼續(xù)消費(fèi)消息。

這個(gè)實(shí)驗(yàn)的結(jié)構(gòu)和實(shí)驗(yàn)一的結(jié)構(gòu)是一樣的,使用一個(gè)producer,一個(gè)consumer,test topic的partition數(shù)量設(shè)為1。

producer的代碼和實(shí)驗(yàn)一中的一樣,這里不再重復(fù)。consumer的代碼稍作修改,這里consumer中打印出下一個(gè)要被消費(fèi)的消息的offset。consumer代碼如下

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition("py_test", 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test_g", auto_offset_reset="earliest", enable_auto_commit=False)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    # pass
    print message.value
auto.offset.reset值含義解釋
earliest 
當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無(wú)提交的offset時(shí),從頭開始消費(fèi) 
latest 
當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù) 
none 
topic各分區(qū)都存在已提交的offset時(shí),從offset后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常

在一個(gè)窗口中啟動(dòng)producer,在另一個(gè)窗口并且啟動(dòng)consumer。consumer的輸出如下


可以嘗試退出consumer,再啟動(dòng)consumer。每一次重新啟動(dòng),consumer都是從offset=98的消息開始消費(fèi)的。
修改consumer的代碼如下 在consumer消費(fèi)每一條消息后將offset提交回kafka

#-*- coding: utf-8 -*-
from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition("py_test", 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test", auto_offset_reset="earliest", enable_auto_commit=True)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    print message.offset
    # consumer.commit() 也可以主動(dòng)提交offset

啟動(dòng)consumer

可以看到consumer從offset=98的消息開始消費(fèi),到offset=829時(shí),我們Ctrl+C退出consumer。

我們?cè)俅螁?dòng)consumer

可以看到重新啟動(dòng)后,consumer從上一次記錄的offset開始繼續(xù)消費(fèi)消息。之后每一次consumer重新啟動(dòng),consumer都會(huì)從上一次停止的地方繼續(xù)開始消費(fèi)。

不同的消費(fèi)組有不同的offset管理,相互不影響
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer, TopicPartition

tp = TopicPartition("py_test", 0)
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test_1", auto_offset_reset="earliest", enable_auto_commit=False)
consumer.assign([tp])
print "starting offset is", consumer.position(tp)
for message in consumer:
    print message.offset
    # consumer.commit()

換一個(gè)group_id test_1,會(huì)從starting offset is 0開始輸出:

starting offset is 0
0

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/42705.html

相關(guān)文章

  • U大使獎(jiǎng)勵(lì)規(guī)則全新發(fā)布

    U大使重要風(fēng)控規(guī)則提醒——一、以下行為嚴(yán)令禁止,一旦發(fā)現(xiàn)虛假推廣行為,將對(duì)月結(jié)傭金進(jìn)行凍結(jié)扣除并終身終止推廣合作:1. U大使利用發(fā)現(xiàn)的活動(dòng)規(guī)則漏洞等增加推薦業(yè)績(jī)、獲得不合理的服務(wù)費(fèi)用;2. 鏈接劫持、強(qiáng)制捆綁、違反法律法規(guī)等的非正當(dāng)方式推廣方式;3. U大使私自承諾向新用戶返利;4. 與 UCloud 銷售人員、其他U大使、或被推薦用戶相互串通,弄虛作假,騙取服務(wù)費(fèi)用;5. 將UCloud發(fā)放的...

    UCloud小助手 評(píng)論0 收藏1
  • U大使推廣獎(jiǎng)勵(lì)規(guī)則

    新用戶通過(guò)點(diǎn)擊U大使的邀請(qǐng)鏈接注冊(cè)UCloud賬戶,并在注冊(cè)90日內(nèi)購(gòu)買指定范圍內(nèi)的產(chǎn)品,UCloud將按照新用戶自首日訂單起90日內(nèi)現(xiàn)金支付金額乘以約定獎(jiǎng)勵(lì)比例進(jìn)行現(xiàn)金獎(jiǎng)勵(lì)。一、推廣資格本活動(dòng)U大使僅限UCloud已實(shí)名的個(gè)人用戶,如推廣賬號(hào)由個(gè)人認(rèn)證變更為企業(yè)認(rèn)證,未發(fā)放的推廣傭金將不再發(fā)放;UCloud(前)員工及其家屬、與UCloud有合作關(guān)系的銷售工作人員及代理商,不能參加本活動(dòng)。立即加...

    UCloud小助手 評(píng)論0 收藏0
  • 開源組件Flink性能優(yōu)化之實(shí)時(shí)計(jì)算延遲填坑記

    開源組件Flink性能優(yōu)化之實(shí)時(shí)計(jì)算延遲填坑記 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%; ...

    IT那活兒 評(píng)論0 收藏1513
  • GoldenGate間斷性休眠的troubleshooting

    GoldenGate間斷性休眠的troubleshooting img{ display:block; margin:0 auto !important; width:100%; } body{ width:75...

    IT那活兒 評(píng)論0 收藏1099
  • GreenPlumn數(shù)據(jù)采集踩坑事記

    GreenPlumn數(shù)據(jù)采集踩坑事記 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%; ...

    IT那活兒 評(píng)論0 收藏2508
  • 奧卡姆剃刀原則解決flink日志延時(shí)問題

    奧卡姆剃刀原則解決flink日志延時(shí)問題 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%; ...

    IT那活兒 評(píng)論0 收藏1466

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<