摘要:一關(guān)鍵字和之間的連接關(guān)系實際存儲消息。生產(chǎn)者進(jìn)行接受應(yīng)答,用來確定這條消息是否正常的發(fā)送到了,這種方式也是消息的可靠性投遞的核心保障。支持消息的過期時間,在消息發(fā)送時可以進(jìn)行指定??梢员O(jiān)聽這個隊列中消息做相應(yīng)的處理。
一、rabbitmq關(guān)鍵字
Binding:Exchange和Exchange、Queue之間的連接關(guān)系二、rabbitmq高級特性
Queue:實際存儲消息。
Durability:是否持久化,Durable:是,Transient:否。
Auto delete:如選yes,代表當(dāng)最后一個監(jiān)聽被移除之后,該Queue會自動刪除。
Message:服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)。本質(zhì)上就是一段數(shù)據(jù),由Properties和Payload(Body)組成。常用屬性:delivery mode、headers(自定義屬性)
Virtual host:虛擬主機,用于進(jìn)行邏輯隔離,最上層的消息路由。一個Virtual Host里面可以有若干個Exchange和Queue,同一個Virtual Host里面不能有相同名稱的Exchange或者Queue。
消息如何保證100%的投遞成功?
生產(chǎn)端可靠性投遞
保障消息的成功發(fā)出
保障MQ節(jié)點的成功接收
發(fā)送端收到MQ節(jié)點(Broker)確認(rèn)應(yīng)答
完善的消息進(jìn)行補償機制
可靠性投遞解決方案
消息落庫,對消息狀態(tài)進(jìn)行打標(biāo)
消息的延遲投遞,做二次確認(rèn),回調(diào)檢查
消息集群鏡像隊列:rabbitmq集群模式非常經(jīng)典的就是Mirror鏡像模式,保證100%數(shù)據(jù)不丟失,在實際工作中也是用的最多的。并且實現(xiàn)集群非常簡單,一般互聯(lián)網(wǎng)公司都會構(gòu)建這種鏡像集群模式。https://segmentfault.com/a/11...
冪等性概念詳解
在海量訂單產(chǎn)生的業(yè)務(wù)高峰期,如何避免消息的重復(fù)消費問題?
Confirm確認(rèn)消息、Return返回消息
理解Confirm消息確認(rèn)機制:
消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會給我們生產(chǎn)者一個應(yīng)答。
生產(chǎn)者進(jìn)行接受應(yīng)答,用來確定這條消息是否正常的發(fā)送到了Broker,這種方式也是消息的可靠性投遞的核心保障。
如何實現(xiàn)Confirm確認(rèn)消息?
第一步:在channel上開啟確認(rèn)模式:channel.confirmSelect()
第二步:在channel上添加監(jiān)聽:addConfirmListener,監(jiān)聽成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對消息進(jìn)行重新發(fā)送、或者記錄日志等后續(xù)處理。
Return消息機制
Return Listener用于處理一些不可路由的消息
我們的消息生產(chǎn)者,通過制定一個Exchange和Routing Key,把消息送達(dá)到某一個隊列中去,然后我們的消費者監(jiān)聽隊列,進(jìn)行消費處理。
但是在某寫情況下,如果我們在發(fā)送消息的時候,當(dāng)前的exchange不存在或者指定的路由key路由不到,這個時候如果我們需要監(jiān)聽這種不可達(dá)的消息,就要使用Return Listener。
Mandatory:如果為true,則監(jiān)聽器會接收到路由不可達(dá)的消息,然后進(jìn)行后續(xù)處理,如果為false,那么broker端自動刪除該消息。
自定義消費者
public class MyConsumer extends DefaultConsumer implements Consumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException { } }
消息的ACK與重回隊列
消費端手工ACK(應(yīng)答成功)和NACK(應(yīng)答失?。?/b>
消費端進(jìn)行消費的時候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補償。
由于服務(wù)器宕機等嚴(yán)重問題,我們就要手工進(jìn)行ACK保障消費端消費成功。比如:消費一半的時候宕機了,broker端沒有收到應(yīng)答,重發(fā)消息。
消費端重回隊列
消費端重回隊列是為了對沒有處理成功的消息,把消息重新回遞給broker。
一般我們在實際應(yīng)用中,都會關(guān)閉重回隊列,也就是設(shè)置為false。
channel.basciNack(envelope.getDeliveryTag(),false,true) // 為true的話,在消費失敗的情況下會重回隊列放入隊列末端。
消息的限流
假設(shè)有一個場景,RabbitMq服務(wù)器上有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現(xiàn)下面的情況: 巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數(shù)據(jù)。RabbitMq提供了一種qos(服務(wù)質(zhì)量保證)功能,即在非自動確認(rèn)消息的前提下,如果一定數(shù)目的消息(通過基于consume或者channel設(shè)置Qos的值)未被確認(rèn)前,不進(jìn)行消費新的消息。
void BasicQos(unit prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0
prefetchCount: 會告訴RabbitMq不要同時給一個消費者推送多于N個消息,即一旦有N個消息還沒有被ack,則該consumer將block掉,知道有消息ack。
global: true/false,是否將上面設(shè)置應(yīng)用于channel,簡答點說,就是上面限制是channel級別的還是consumer級別。
// 限流方式,第一件事就是autoAck設(shè)置為false,關(guān)閉自動簽收,必須手動簽收 channel.basicQos(0,3,false); // 3表示如果消息積壓了1000條,先給我推3條,這三條消費結(jié)束后,我會給你一個ack表示這三條我已經(jīng)處理完了,然后再給我推送3條... channel.basicConsume(queueName,false,new MyConsumer(channel)) //在MyConsumer中對消息進(jìn)行簽收ack channel.basicAck(envelope,getDeliveryTag(), false);
TTL消息
TTL:是Time To Live。就是生存時間。
RabbitMQ支持消息的過期時間,在消息發(fā)送時可以進(jìn)行指定。
RabbitMQ支持隊列的過期時間,從消息入隊開始計算,只要超過了隊列的超時時間配置,那么消息會自動清除。
死信隊列
死信隊列:DLX,Dead-Letter-Exchange。
利用DLX,當(dāng)消息在一個隊列中變成了死信(dead message:這條消息沒有消費者去消費)之后,他能被重新publish到另外一個Exchange,這個Exchange就是DLX。
進(jìn)入死信隊列(進(jìn)入死信的三種方式)1.消息被拒絕(basic.reject or basic.nack)并且requeue=false
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
2.消息TTL過期
3.隊列達(dá)到最大長度
備注說明
DLX也是一個正常的Exchange,和一般的Exchange沒區(qū)別,他能在任何的隊列上被指定,實際上就是設(shè)置某個隊列的屬性。
當(dāng)這個隊列中有死信時,RabbitMQ會自動將這個消息重新發(fā)送到已經(jīng)設(shè)置了的Exchange上去,進(jìn)而被路由到另外一個隊列上去。
可以監(jiān)聽這個隊列中消息做相應(yīng)的處理。
死信隊列設(shè)置
首先需要設(shè)置死信隊列的exchange和queue,然后進(jìn)行綁定:Exchange:dlx.exchange、Queue:dlx.queue、RoutingKey:#
然后進(jìn)行正常聲明交換機、隊列、綁定,只不過我們需要在隊列加上一個參數(shù)即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/77668.html
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:第一步安裝因為是語言編寫的,所以我們首先需要安裝第二步安裝官網(wǎng)提供的安裝方式本人安裝成功的方式第三步查看是否已經(jīng)安裝好了,能查到說明已經(jīng)安裝完成了。 第一步:安裝Erlang 因為rabbitMQ是Erlang語言編寫的,所以我們首先需要安裝Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
摘要:本文基于的插件,針對進(jìn)行簡單的測試。包括協(xié)議的介紹,的安裝配置開啟插件及基于進(jìn)行的測試。協(xié)議是基于發(fā)布訂閱模型的物聯(lián)網(wǎng)消息傳遞協(xié)議。對傳輸消息有三種服務(wù)質(zhì)量最多一次,這一級別會發(fā)生消息丟失或重復(fù),消息發(fā)布依賴于底層網(wǎng)絡(luò)。 ...
閱讀 2273·2023-04-25 14:50
閱讀 3012·2021-11-17 09:33
閱讀 2697·2019-08-30 13:07
閱讀 2933·2019-08-29 16:57
閱讀 998·2019-08-29 15:26
閱讀 3655·2019-08-29 13:08
閱讀 2076·2019-08-29 12:32
閱讀 3490·2019-08-26 13:57