亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

python3連接kafka模塊pykafka生產(chǎn)者簡(jiǎn)單封裝

fizz / 2321人閱讀

摘要:安裝模塊基本使用生產(chǎn)者簡(jiǎn)單封裝初始化實(shí)例連接地址設(shè)置獲取當(dāng)前所有獲取當(dāng)前生產(chǎn)者對(duì)象發(fā)送數(shù)據(jù)需要傳入的可迭代對(duì)象連接切換設(shè)置新的獲取當(dāng)前設(shè)置的獲取所有要發(fā)送的可迭代對(duì)象引用來(lái)源博客園測(cè)試集群知乎使用生成器把寫入效率提高倍

1.1安裝模塊
pip install pykafka
1.2基本使用
  # -* coding:utf8 *-  
  from pykafka import KafkaClient  
  host = "IP:9092, IP:9092, IP:9092"
  client = KafkaClient(hosts = host)  
  # 生產(chǎn)者  
  topicdocu = client.topics["my-topic"]  
  producer = topicdocu.get_producer()  
  for i in range(100):  
      print i  
      producer.produce("test message " + str(i ** 2))  
  producer.stop()
1.3簡(jiǎn)單封裝
  class KafkaProduct():

      def __init__(self,hosts,topic):
          """
          初始化實(shí)例
          :param hosts: 連接地址
          :param topic:
          """
          self.__client = KafkaClient(hosts=hosts)
          self.__topic = self.__client.topics[topic.encode()]
  
      def __set_topic(self, topic):
          self.__topic = self.__client.topics[topic.encode()]
  
      def set_topic(self, topic):
          """
          設(shè)置topic
          :param topic:
          :return:
          """
          self.__set_topic(topic)
  
      def get_topics(self):
          """
          獲取當(dāng)前所有topic
          :return:
          """
          return self.__client.topics
  
      def get_topic(self):
          """
          獲取當(dāng)前topic
          :return:
          """
          return self.__topic
  
      def Producer(self):
          """
          生產(chǎn)者對(duì)象
          :return:
          """
          with self.__topic.get_producer(delivery_reports=True) as producer:
              next_data = ""
              while True:
                  if next_data:
                      producer.produce(str(next_data).encode())
                  next_data = yield True
  
      def send_data(self,datas):
          """
          發(fā)送數(shù)據(jù)
          :param datas:需要傳入的可迭代對(duì)象
          :return:
          """
          c = self.Producer()
          next(c)
          for i in datas:
              c.send(i)

if __name__ == "__main__":

    hosts = "1.2.3.4:9999,2.3.4.5:9090" #連接hosts
    topic = "test_523"
    K = KafkaProduct(hosts=hosts, topic=topic)  #
    #K.set_topic("test")  #切換設(shè)置新的topic
    K.get_topic()  #獲取當(dāng)前設(shè)置的topic
    #K.get_topics() #獲取所有topic
    data = range(10000)  #要發(fā)送的可迭代對(duì)象
    K.send_data(data)
1.4引用來(lái)源

博客園:Python測(cè)試Kafka集群(pykafka)

知乎:使用生成器把Kafka寫入效率提高1000倍

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/43910.html

相關(guān)文章

  • FAQs Kafka消息隊(duì)列 UKafka

    摘要:大量的和分區(qū)會(huì)嚴(yán)重影響集群性能。介紹可參考收到離線分區(qū)總數(shù)異常告警一般是某個(gè)節(jié)點(diǎn)宕機(jī)或者服務(wù)異常導(dǎo)致。若服務(wù)卡住,可在評(píng)估后在控制臺(tái)重啟該節(jié)點(diǎn)服務(wù)。若想了解當(dāng)前請(qǐng)求延時(shí)情況,建議關(guān)注平均請(qǐng)求延時(shí)監(jiān)控項(xiàng)。 FAQs本篇目錄一個(gè)UKafka集群可以創(chuàng)建多少個(gè)Topic?如何增加Topic的副本數(shù)量(ReplicationFactor)?收到離線分區(qū)總數(shù)>=10.0個(gè)告警,離線分區(qū)總數(shù)是什么,怎么...

    ernest.wang 評(píng)論0 收藏2407
  • Kafka學(xué)習(xí)筆記之掃盲

    摘要:相關(guān)概念協(xié)議高級(jí)消息隊(duì)列協(xié)議是一個(gè)標(biāo)準(zhǔn)開放的應(yīng)用層的消息中間件協(xié)議。可以用命令與不同,不是線程安全的。手動(dòng)提交執(zhí)行相關(guān)邏輯提交注意點(diǎn)將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當(dāng)就能最恰當(dāng)?shù)陌l(fā)揮好的作用。 本文使用的Kafka版本0.11 先思考些問(wèn)題: 我想分析一下用戶行為(pageviews),以便我能設(shè)計(jì)出更好的廣告位 我想對(duì)用戶的搜索關(guān)鍵詞進(jìn)行統(tǒng)計(jì),...

    GT 評(píng)論0 收藏0
  • Kafka】《Kafka權(quán)威指南》入門

    摘要:主題和分區(qū)的悄息通過(guò)主題進(jìn)行分類。在給定的分區(qū)里,每個(gè)悄息的偏移量都是唯一的。消費(fèi)者把每個(gè)分區(qū)最后讀取的悄息偏移量保存在或上,如果悄費(fèi)者關(guān)閉或重啟,它的讀取狀態(tài)不會(huì)丟失。主題可以配置自己的保留策略,可以將悄息保留到不再使用它們?yōu)橹埂0l(fā)布與訂閱消息系統(tǒng) 在正式討論Apache Kafka (以下簡(jiǎn)稱Kafka)之前,先來(lái)了解發(fā)布與訂閱消息系統(tǒng)的概念, 并認(rèn)識(shí)這個(gè)系統(tǒng)的重要性。數(shù)據(jù)(消息)的發(fā)送...

    番茄西紅柿 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<