摘要:作為消息隊(duì)列的一個典型實(shí)踐,完全實(shí)現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個隊(duì)列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費(fèi)者。消費(fèi)者接受并消費(fèi)消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊(duì)列來的消息。
推廣
https://segmentfault.com/l/15...
我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項(xiàng)目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面
前言消息隊(duì)列想必大家都有一定了解:用來解耦,上級模塊不用關(guān)心下級模塊是否執(zhí)行成功,最常見的比如說日志,核心系統(tǒng)并不關(guān)心日志是否成功,日志什么時候記錄。這種情形就可以用消息隊(duì)列來解耦。
RabbitMQ作為消息隊(duì)列的一個典型實(shí)踐,完全實(shí)現(xiàn)了AMQ標(biāo)準(zhǔn),與Kafka的快快快不同,它追求的穩(wěn)定、可靠。下面就來幾篇文章來詳細(xì)介紹下,均翻譯至RabbitMQ的官方文檔。
RabbitMQ是一個消息的中介(用來接受以及轉(zhuǎn)發(fā)消息),就像是一個非常可靠的郵局,當(dāng)信件放到郵局時,信件就確保能到達(dá),所以,RabbitMQ可以看成是郵箱、郵局、以及郵遞員的合體
RabbitMQ的一些重要概念 produceing(生產(chǎn)者):生產(chǎn)數(shù)據(jù) queue(隊(duì)列):類似于郵箱,存在于RabbitMQ服務(wù)器的內(nèi)部,用來存儲消息,并且消息只能存儲在隊(duì)列里面。隊(duì)列的大小只受RabbitMQ主機(jī)內(nèi)存和硬盤的影響。同一個隊(duì)列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費(fèi)者。
Consuming(消費(fèi)者):接受并消費(fèi)消息。 Hello World下面我們來寫我們的第一個“Hello World”,我們會使用Java的API來編寫一個生產(chǎn)者來生產(chǎn)消息,以及一個消費(fèi)者來消費(fèi)消息
P是我們的生產(chǎn)者,而C是我們的消費(fèi)者。中間的box是我們的queue:作為消息緩沖,是RabbitMQ用來存儲轉(zhuǎn)發(fā)消息給消費(fèi)者的。
Java客戶端庫RabbitMQ支持多重協(xié)議,這里我們會用AMQP 0-9-1來說明,它是一個消息隊(duì)列的通用協(xié)議。RabbitMQ同時也有多種語言的客戶端,我們在這里用Java來做說明。
首先請下載Java客戶端包以及它所依賴的SLF4J和SLF4J SIMPLE,將它們拷貝到自己的工作區(qū)。
引入RabbitMQ同樣也可以使用Maven來做依賴管理, groupId是com.rabbitmq 以及artifactId amqp-client
發(fā)送請求生產(chǎn)者會發(fā)送消息到MQ,然后退出
在Send.java中,首先我們import一些類
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
設(shè)置我們的主類
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
創(chuàng)建Connection
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
這里我們連接的是本地,你當(dāng)然也可以連接到另一個服務(wù)器上,只需要指明服務(wù)器的名稱和ip地址。
下面我們要創(chuàng)建一個Channel,大家可以想象一些,消息的產(chǎn)生和發(fā)送都是通過這個Channel完成的。
當(dāng)然,我們還需要頂一個一個Queue來接受消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
對于Queue的定義是冥等的,如果不存在才會創(chuàng)建,如果存在則不會再建新的。消息會被格式化成byte的數(shù)組,方便進(jìn)行任意的轉(zhuǎn)換。
最后,我們關(guān)閉通道
channel.close(); connection.close();
完整的代碼可以看這個地方:send.java
接受請求消費(fèi)者會從RabbitMQ接收到請求,消息是被推到消費(fèi)者,而且消費(fèi)者會一直監(jiān)聽著消息隊(duì)列,一旦有有新的消息就會打印出來。
Recv.java幾乎于Send完全類似
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
Defaultconsumer是一個繼承了Consumer接口的類,方便我們來存儲消息隊(duì)列來的消息。建立消費(fèi)者與我們建立生產(chǎn)者非常類似:
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } }
你可以注意到我們在消費(fèi)者定義了一個Queue,因此我們是需要在生產(chǎn)者之前啟動消費(fèi)者的,我們要確保我們在消費(fèi)消息之前這個隊(duì)列是已經(jīng)存在的。
然后我們需要告訴mq服務(wù)可以推送消息給我們。因?yàn)檫@個推送是異步的,因此我們可以提供一個回調(diào)方法,DefaultConsumer會暫時存儲這個消息,直到消費(fèi)者以及準(zhǔn)備好來處理接受到的消息了(消息會存儲在消費(fèi)者中直到消費(fèi)者有能力來消費(fèi)它,可以想象一下數(shù)據(jù)庫等高IO操作)
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
完整的Recv.java地址
跑起來我們可以先用javac來編譯程序
javac -cp amqp-client-4.0.2.jar Send.java Recv.java
而后來運(yùn)行它,這需要我們在路徑加上它的依賴包,我們首先啟動的是消費(fèi)者
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv
而后啟動發(fā)送者
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send
消費(fèi)者會持續(xù)等待,并打印從生產(chǎn)者哪里來的消息,你可以用(Ctrl-C)來停止它。所以你要另外開啟一個命令行窗口來運(yùn)行生產(chǎn)者。
查看隊(duì)列也許你想知道RabbitMQ中到底有多少個消息,你可以使用rabbitmqctl工具:
sudo rabbitmqctl list_queues
在Windows中:
rabbitmqctl.bat list_queues
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/70889.html
摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費(fèi)者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊(duì)列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...
摘要:推廣專題講座開源項(xiàng)目我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機(jī)也可以當(dāng)成其它交換機(jī)來使用,假如隊(duì)列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達(dá)路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性...
摘要:任務(wù)隊(duì)列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費(fèi)大量資源。我們將任務(wù)封裝成一個消息發(fā)送給隊(duì)列,后臺的任務(wù)進(jìn)程會得到這個任務(wù)并執(zhí)行它,而且可以配置多個任務(wù)進(jìn)程,進(jìn)一步加大吞吐率。為了確保消息不丟失,支持消息確認(rèn)。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的...
摘要:可以參考源碼,項(xiàng)目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費(fèi)者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊(duì)列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決...
摘要:因?yàn)橄M(fèi)消息是在另外一個進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊(duì)列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項(xiàng)目 我們利用消息隊(duì)列實(shí)現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖?..
閱讀 3115·2021-10-14 09:42
閱讀 3732·2021-10-11 10:59
閱讀 3030·2019-08-30 11:25
閱讀 3164·2019-08-29 16:25
閱讀 3296·2019-08-26 17:40
閱讀 1354·2019-08-26 13:30
閱讀 1236·2019-08-26 11:46
閱讀 1407·2019-08-23 15:22