摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊列了。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面
前言在第二章中我們描述了任務(wù)隊列,在任務(wù)隊列中一個消息只會發(fā)送給一個消費者。而在這一章中我們將消息發(fā)送給許多個消費者,我們稱之為“發(fā)布/訂閱”
為了更好的闡述這個模式,我們會建立一個新的簡單的logging系統(tǒng),包含2個步驟-第一步發(fā)送log信息,第二步能夠接受并將信息打印出來,而且在第二步中所有的消費者都會接受到同樣的消息,比如一個消費者用來將log信息寫到磁盤,另外一個接受信息并顯示在屏幕上。因此一旦有有消息,消息會廣播到所有的消費者。
交換機(Exchanges)前面的章節(jié)中我們是直接通過queue來處理消息,現(xiàn)在我們來介紹一種更完善的模式
讓我們迅速瀏覽一遍前面的主題:
生產(chǎn)者是一個客戶端程序,用來發(fā)送消息
隊列是一個緩沖,用來存儲消息
消費者是一個客戶端程序,用來接受消息
RabbitMQ的核心思想是生產(chǎn)者不會將消息直接發(fā)送給隊列,意味著生產(chǎn)者是完全看不到隊列的。反之,生產(chǎn)者只能將消息發(fā)送給路由器(Exchange),再由路由器來決定該如何來處理消息,是將消息發(fā)送給一個隊列呢,還是發(fā)送給許多個隊列,或者直接無視,具體的規(guī)則是根據(jù)路由器的類型而定的。
路由器的類型有這樣幾種:直連路由器(dirct), 主題路由器(topic),頭部路由器(headers),以及多廣播路由器(fanout)
channel.exchangeDeclare("logs", "fanout");
廣播路由器聽起來就很簡單,它會將消息廣播到所有的它所知道的隊列,而這正是我們所需要的。
默認(rèn)路由器在前面的章節(jié)中雖然沒有設(shè)置任何路由器,但依然能夠?qū)⑾l(fā)送到隊列,這是因為我們的是默認(rèn)路由器:使用空字符串("")來做的定義:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數(shù)是exchange的名稱,在這里是空字符串,消息會通過路由健(routingKey)發(fā)送到該鍵所對應(yīng)的隊列。
然而現(xiàn)在,我們有了確認(rèn)的路由器
channel.basicPublish( "logs", "", null, message.getBytes());
臨時隊列
我們之前隊列都有名字(Hello隊列和task_queue隊列),給隊列起名字非常重要-需要將消費者綁定到特定的queue上面,以及需要把消息從生產(chǎn)者發(fā)送給特定的消費者。
但對于日志來說,消息會發(fā)送到所有的消費者,而并非個別,We"re also interested only in currently flowing messages not in the old ones.為了滿足當(dāng)前需求我們可以做兩件事
一旦連接上RabbitMQ,需要一個新的空隊列來接受消息,我們可以隨機起個名字,甚至根本不起名,而讓RabbitMQ來命名它。
一旦消費者斷開連接,這個隊列就能被刪除掉
我們可以這樣定義一個不需要持久化、獨立的、能夠被自動刪除的隊列
String queueName = channel.queueDeclare().getQueue();
這個名稱是RabbitMQ隨機分配的,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg.
綁定我們已經(jīng)聲明了一個廣播路由器,現(xiàn)在需要告訴這個路由器需要把信息發(fā)送給哪些隊列,路由器和隊列間的這個關(guān)系就稱之為綁定。
channel.queueBind(queueName, "logs", "");
如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊列了。
整合發(fā)送者與我們之前的代碼基本相同,最重大的區(qū)別我們現(xiàn)在是發(fā)送給帶名稱的路由器了,同時我們也需要一個路由鍵,但這里也不需要,因為廣播路由器會忽略這個值,這是我們EmitLog.java的代碼
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """); channel.close(); connection.close(); } //... }
可以看到,一旦我們建立的連接立即定義了一個路由器,這個步驟對我們非常重要,因為是嚴(yán)禁將消息發(fā)送給并不存在的路由的。
同時,如果路由器沒有綁定隊列,消息也會丟失掉,但這對于我們來說是ok的:如果并沒有消費者在監(jiān)聽,我們可以直接丟棄掉這個消息。
ReciveLogs.java代碼如下:
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯代碼
javac -cp $CP EmitLog.java ReceiveLogs.java
如果你希望將log存儲到本機上
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果你希望在屏幕上顯示log信息,打開一個新的終端:
java -cp $CP ReceiveLogs
發(fā)送消息
java -cp $CP EmitLog
如此一來,就能夠存儲消息的同時進行打印了。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/68123.html
摘要:可以參考源碼,項目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決...
摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實我們是用到了默認(rèn)的,用空字符串來標(biāo)識。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱之為從現(xiàn)在開始這個就會將推向我們的隊列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個task都會派發(fā)給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會...
摘要:本文將會講解如何使用實現(xiàn)延時重試和失敗消息隊列,實現(xiàn)可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊列。本文假設(shè)讀者對RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
摘要:本文將會講解如何使用實現(xiàn)延時重試和失敗消息隊列,實現(xiàn)可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發(fā)的開源消息隊列。本文假設(shè)讀者對RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門...
摘要:性能調(diào)優(yōu)筆記避免雷區(qū)要避免流控機制觸發(fā)服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到,磁盤空閑空間小于,即啟動內(nèi)存報警,磁盤報警報警后服務(wù)端觸發(fā)流控機制。最佳線程生產(chǎn)者使用多線程發(fā)送數(shù)據(jù)到三到五個線程性能發(fā)送最佳,超過它也不能提高生產(chǎn)的發(fā)送速率。 RabbitMq 性能調(diào)優(yōu)筆記 [TOC] 避免雷區(qū) 要避免流控機制觸發(fā) 服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到40%,磁盤空閑空間小于50M,即啟動內(nèi)存報警,磁...
閱讀 1690·2021-11-23 09:51
閱讀 1232·2019-08-30 13:57
閱讀 2327·2019-08-29 13:12
閱讀 2069·2019-08-26 13:57
閱讀 1260·2019-08-26 11:32
閱讀 1040·2019-08-23 15:08
閱讀 790·2019-08-23 14:42
閱讀 3139·2019-08-23 11:41