摘要:安裝模塊基本使用生產(chǎn)者簡(jiǎn)單封裝初始化實(shí)例連接地址設(shè)置獲取當(dāng)前所有獲取當(dāng)前生產(chǎn)者對(duì)象發(fā)送數(shù)據(jù)需要傳入的可迭代對(duì)象連接切換設(shè)置新的獲取當(dāng)前設(shè)置的獲取所有要發(fā)送的可迭代對(duì)象引用來(lái)源博客園測(cè)試集群知乎使用生成器把寫入效率提高倍
1.1安裝模塊
pip install pykafka1.2基本使用
# -* coding:utf8 *- from pykafka import KafkaClient host = "IP:9092, IP:9092, IP:9092" client = KafkaClient(hosts = host) # 生產(chǎn)者 topicdocu = client.topics["my-topic"] producer = topicdocu.get_producer() for i in range(100): print i producer.produce("test message " + str(i ** 2)) producer.stop()1.3簡(jiǎn)單封裝
class KafkaProduct(): def __init__(self,hosts,topic): """ 初始化實(shí)例 :param hosts: 連接地址 :param topic: """ self.__client = KafkaClient(hosts=hosts) self.__topic = self.__client.topics[topic.encode()] def __set_topic(self, topic): self.__topic = self.__client.topics[topic.encode()] def set_topic(self, topic): """ 設(shè)置topic :param topic: :return: """ self.__set_topic(topic) def get_topics(self): """ 獲取當(dāng)前所有topic :return: """ return self.__client.topics def get_topic(self): """ 獲取當(dāng)前topic :return: """ return self.__topic def Producer(self): """ 生產(chǎn)者對(duì)象 :return: """ with self.__topic.get_producer(delivery_reports=True) as producer: next_data = "" while True: if next_data: producer.produce(str(next_data).encode()) next_data = yield True def send_data(self,datas): """ 發(fā)送數(shù)據(jù) :param datas:需要傳入的可迭代對(duì)象 :return: """ c = self.Producer() next(c) for i in datas: c.send(i) if __name__ == "__main__": hosts = "1.2.3.4:9999,2.3.4.5:9090" #連接hosts topic = "test_523" K = KafkaProduct(hosts=hosts, topic=topic) # #K.set_topic("test") #切換設(shè)置新的topic K.get_topic() #獲取當(dāng)前設(shè)置的topic #K.get_topics() #獲取所有topic data = range(10000) #要發(fā)送的可迭代對(duì)象 K.send_data(data)1.4引用來(lái)源
博客園:Python測(cè)試Kafka集群(pykafka)
知乎:使用生成器把Kafka寫入效率提高1000倍
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/43910.html
摘要:大量的和分區(qū)會(huì)嚴(yán)重影響集群性能。介紹可參考收到離線分區(qū)總數(shù)異常告警一般是某個(gè)節(jié)點(diǎn)宕機(jī)或者服務(wù)異常導(dǎo)致。若服務(wù)卡住,可在評(píng)估后在控制臺(tái)重啟該節(jié)點(diǎn)服務(wù)。若想了解當(dāng)前請(qǐng)求延時(shí)情況,建議關(guān)注平均請(qǐng)求延時(shí)監(jiān)控項(xiàng)。 FAQs本篇目錄一個(gè)UKafka集群可以創(chuàng)建多少個(gè)Topic?如何增加Topic的副本數(shù)量(ReplicationFactor)?收到離線分區(qū)總數(shù)>=10.0個(gè)告警,離線分區(qū)總數(shù)是什么,怎么...
摘要:相關(guān)概念協(xié)議高級(jí)消息隊(duì)列協(xié)議是一個(gè)標(biāo)準(zhǔn)開放的應(yīng)用層的消息中間件協(xié)議。可以用命令與不同,不是線程安全的。手動(dòng)提交執(zhí)行相關(guān)邏輯提交注意點(diǎn)將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當(dāng)就能最恰當(dāng)?shù)陌l(fā)揮好的作用。 本文使用的Kafka版本0.11 先思考些問(wèn)題: 我想分析一下用戶行為(pageviews),以便我能設(shè)計(jì)出更好的廣告位 我想對(duì)用戶的搜索關(guān)鍵詞進(jìn)行統(tǒng)計(jì),...
摘要:主題和分區(qū)的悄息通過(guò)主題進(jìn)行分類。在給定的分區(qū)里,每個(gè)悄息的偏移量都是唯一的。消費(fèi)者把每個(gè)分區(qū)最后讀取的悄息偏移量保存在或上,如果悄費(fèi)者關(guān)閉或重啟,它的讀取狀態(tài)不會(huì)丟失。主題可以配置自己的保留策略,可以將悄息保留到不再使用它們?yōu)橹埂0l(fā)布與訂閱消息系統(tǒng) 在正式討論Apache Kafka (以下簡(jiǎn)稱Kafka)之前,先來(lái)了解發(fā)布與訂閱消息系統(tǒng)的概念, 并認(rèn)識(shí)這個(gè)系統(tǒng)的重要性。數(shù)據(jù)(消息)的發(fā)送...
閱讀 1016·2019-08-30 14:24
閱讀 1066·2019-08-30 14:13
閱讀 1866·2019-08-29 17:21
閱讀 2828·2019-08-29 13:44
閱讀 1727·2019-08-29 11:04
閱讀 536·2019-08-26 10:44
閱讀 2649·2019-08-23 14:04
閱讀 964·2019-08-23 12:08