摘要:可以參考源碼,項目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面
前言在訂閱/發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將log消息廣播給一些消費者。這章我們會在此基礎(chǔ)上加入一些新的特性-我們將有針對性的進(jìn)行消息分發(fā),比如,只把錯誤(error)消息保存到磁盤,與此同時,打印出所有的消息。
綁定我們在前面的例子中,綁定是這么來做的
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定是建立交換機(jī)和隊列之間的一種聯(lián)系:隊列會接受交換機(jī)中的消息。綁定可以用一個路由鍵來指明,為了與basic_publish區(qū)分開,我們稱之為綁定鍵(binding key):
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵跟路由器類型也有關(guān)系,我們之前用的廣播路由器,會忽略掉這個值
直達(dá)交換機(jī)(Direct Exchange)之前我們用的是廣播交換機(jī),會將消息發(fā)送給所有的消費者。這里我們希望通過log的嚴(yán)重程度進(jìn)行過濾,例如只有嚴(yán)重的錯誤才會寫入到磁盤,而warn和info消息就不用了,以此來節(jié)省磁盤空間
而廣播交換機(jī)沒法滿足這個需求-它只是無腦的發(fā)送消息。所以我們會使用直達(dá)交換機(jī)(Direct Exchange)- 消息會通過所綁定的鍵來發(fā)送給對應(yīng)的隊列,可以看如下這幅圖
如上圖所示,直達(dá)交換機(jī)X綁定了兩個隊列,C1是通過orange來綁定,而C2是通過black和green綁定。因此,發(fā)送到路由鍵orange的消息會發(fā)送給隊列Q1,發(fā)送到路由鍵black或者green的消息會發(fā)送給Q2,其它的消息將被丟棄。
多項綁定
當(dāng)然,多個隊列綁定到一個鍵上也是合法的,在這種情況下,直達(dá)交換機(jī)將會將消息發(fā)送給所有的隊列,就像廣播交換機(jī)一樣,如上圖所示,一個鍵為black的消息將會同時被發(fā)送給C1和C2.
我們首先需要創(chuàng)建一個直達(dá)路由器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
并發(fā)送消息到這個路由器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
上面我們是發(fā)送給"severity",簡單起見,假設(shè)有下列幾種日志類型"severity" ,"info", "warning", "error".
訂閱消息(Subscribing)接受消息跟之前一樣,但有一點不同,我們提供了一個binding key,
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }整合
將上面的所有代碼整合到一起
EmitLogDirect.java
import com.rabbitmq.client.*; import java.io.IOException; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent "" + severity + "":"" + message + """); channel.close(); connection.close(); } //.. }
ReceiveLogsDirect.java
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
只保存warning和error的消息到磁盤上
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
將所有的消息打印到頻幕上
java -cp $CP ReceiveLogsDirect info warning error # => [*] Waiting for logs. To exit press CTRL+C
最后,發(fā)送error消息
java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." # => [x] Sent "error":"Run. Run. Or it will explode."
好了,這一章就到這兒,下一章我們將講述如何基于特定模式進(jìn)行監(jiān)聽
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/68120.html
摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機(jī)也可以當(dāng)成其它交換機(jī)來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達(dá)路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性...
摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...
摘要:作為消息隊列的一個典型實踐,完全實現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個隊列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的...
摘要:因為消費消息是在另外一個進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。可以參考...
摘要:路由模式在之前的文章中我們建立了一個簡單的日志系統(tǒng)。更形象的表示,如對中的感興趣。為了進(jìn)行說明,像下圖這么來設(shè)置如圖,可以看到有兩個綁到了類型為的上。如圖的設(shè)置中,一個為的就會同時發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級別。 路由模式 在之前的文章中我們建立了一個簡單的日志系統(tǒng)。我們可以通過這個系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們在這之上,添加一個新的功...
閱讀 1344·2021-10-18 13:32
閱讀 2506·2021-09-24 09:47
閱讀 1401·2021-09-23 11:22
閱讀 2525·2019-08-30 14:06
閱讀 659·2019-08-30 12:48
閱讀 2068·2019-08-30 11:03
閱讀 604·2019-08-29 17:09
閱讀 2543·2019-08-29 14:10