亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

RabbitMQ AMQP 消息模型攻略

xietao3 / 1412人閱讀

摘要:近期對消息隊列比較感興趣因此特意看了一下相關(guān)的知識不過在學(xué)時對的消息模型總是理解的不透徹于是在官網(wǎng)上找了一篇介紹消息模型的文章詳細(xì)地看了一下還是要感嘆一下啊官網(wǎng)的文章果然是最權(quán)威的看了以后有了不小的收獲下面是我學(xué)習(xí)消息模型時的記錄其內(nèi)容大部

近期對消息隊列比較感興趣, 因此特意看了一下 RabbitMQ 相關(guān)的知識, 不過在學(xué) RabbitMQ 時, 對 AMQP 的消息模型總是理解的不透徹, 于是在官網(wǎng)上找了一篇介紹 AMQP 消息模型的文章, 詳細(xì)地看了一下.
還是要感嘆一下啊, 官網(wǎng)的文章果然是最權(quán)威的, 看了以后有了不小的收獲.
下面是我學(xué)習(xí) AMQP 消息模型時的記錄, 其內(nèi)容大部分是翻譯自官網(wǎng), 部分添加了自己的理解.

原文

https://www.rabbitmq.com/tuto...

AMQP 消息模型簡介

AMQP 的消息模型如下圖所示:

通過此圖我們可以知道, 一個消息的發(fā)送流程有如下幾個步驟:

消息生產(chǎn)者將消息發(fā)布(Public)到 Exchange 中.

Exchange 根據(jù)隊列的綁定關(guān)系將消息分發(fā)到不同的 Queue 中.

AMQP broker 根據(jù)訂閱規(guī)則將消息發(fā)送給消費者 或 消費者自行根據(jù)需要從消息隊列中獲取消息.

Exchange 和 Exchange 類型

Exchange 的主要任務(wù)是接收消息并將消息路由到0個或多個 Queue 中, 而路由的算法受 Exchange 類型和綁定(binding) 關(guān)系的影響. AMQP 0-9-1 broker 提供如下四個 exchange 類型:

類型 默認(rèn)預(yù)定義的名字
Direct Exchange 空字符串和 amq.direct
Fanout Exchange amq.fanout
Topic Exchange amq.topic
Headers Exchange amq.match(在 RabbitMQ 中, 額外提供amq.headers)

每個 Exchange 都有如下幾個屬性:

Name, Exchange 的 名字

Durability, 是否是持久的 Exchange, 當(dāng)為真時, broker 重啟后也會保留此 Exchange

Auto-delete, 當(dāng)為真時, 如果所有綁定的的 Queue 都不再使用時, 此 Exchange 會自動刪除

關(guān)于默認(rèn) Exchange

默認(rèn)的 exchange 是一個由 broker 預(yù)創(chuàng)建的匿名的(即名字是空字符串) direct exchagne. 對于簡單的程序來說, 默認(rèn)的 exchange 有一個實用的屬性: 如果沒有顯示地綁定 Exchnge, 那么創(chuàng)建的每個 queue 都會自動綁定到這個默認(rèn)的 exchagne 中, 并且此時這個 queue 的 route key 就是這個queue 的名字.

例如當(dāng)我們聲明了一個名為 "search-indexing-online" 的 queue, 那么 AMQP broker 會以 "search-indexing-online" 作為 route key 將此 queue 綁定到默認(rèn)的 exchange 中. 因此當(dāng)一個消息以 route key 為 "search-indexing-online" 投遞到默認(rèn)的 exchange 中時, 此消息就會被路由到這個 queue 中去. 換句話說, 由于有默認(rèn)的 exchagne 的存在, 我們就好像可以直接將消息投遞到指定的 queue 中去而不需要經(jīng)過 exchange 一樣.
例如:
Send:

public class Send {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent "" + message + """);
        
        channel.close();
        connection.close();
    }
}

Recv:

public class Recv {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received "" + message + """);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

在這個例子中, 我們并沒有定義 exchange, 也沒有顯示地將 queue 綁定到 exchange 中, 因此 queue "hello" 就自動綁定到默認(rèn)的 exchange 中了, 并且在默認(rèn)的 exchange 中, 其 route key 和 queue 名一致, 即 "hello".
由于這個原因, 我們就可以使用:

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

來發(fā)送消息. 調(diào)用 channel.basicPublish 時, 第一個參數(shù)是 exchange 名, 為空就是默認(rèn)的 exchange, 第二個參數(shù)是 route key, 和 queue 名相同.

direct exchange

direct exchange 可以使用如下圖表示:

direct exchange 根據(jù)消息的 route key 來將消息分發(fā)到不同的 queue 中. direct exchange 適合用于消息的單播發(fā)送. direct exchange 的工作流程如下:

一個 queue 使用 K 作為 route key 綁定到 direct exchange 中.

當(dāng)direct exchange 收到一個 route key 為 R 的消息時, 如果 R == K, 則此 exchange 會將此消息路由到此 queue 中.

direct exchange 經(jīng)常用于在多個 worker 中分配任務(wù)(即一個 Master 和多個相同的 Slave). 當(dāng)使用這個模型時, 需要注意的是:

When doing so, it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers and not between queues.

即 AMQP 0-9-1 的負(fù)載均衡是以consumer為單位的, 而不是以 queue 為單位.

fanout exchange

一個 fanout exchange 會將消息分發(fā)給所有綁定到此 exchange 的 queue 中, 而不會考慮 queue 的 route key. 即如果有 N 個 Queue 綁定到一個 fanout exchange 時, 那么當(dāng)此 exchange 收到消息時, 會將此消息分發(fā)到這 N 個 queue 中. 由于此性質(zhì), fanout exchange 也常用消息的廣播(broadcast).
fanout 可以使用下圖表示:

topic exchange

topic exchange 會根據(jù) route key 將消息分發(fā)到與此消息的 route key 相匹配的并且綁定到此 exchagne 中的 queue 中(如果有多個 queue 使用了相同的 route key 綁定到此 exchange, 那么這些 queue 都會收到消息). 根據(jù)此性質(zhì), topic exchange 經(jīng)常用于實現(xiàn) publish/subscribe 模型, 即消息的多播模型.

header exchange

header exchange 不使用 route key 作為路由的依據(jù), 而是使用消息頭屬性來路由消息.

Queue

AMQP 中的 隊列 的概念和其他消息隊列中 隊列 的概念類似, 它有如下幾個重要的概念:

Name, 名字

Durable, 是否是持久的. 當(dāng)為真時, 即使 broker 重啟時, 此 queue 也不會被刪除.

Exclusive, 是否是獨占的, 當(dāng)為真時, 表示此 queue 只能有一個消費者, 并且當(dāng)此消費者的連接斷開時, 此 queue 會被刪除.

Auto-delete, 當(dāng)為真時, 此 隊列 會在最后一個消費者取消訂閱時被刪除.

在使用一個 隊列 時, 需要先進(jìn)行聲明. 如果我們聲明的隊列不存在, 那么 broker 就會自動創(chuàng)建它. 不過如果此隊列已經(jīng)存在時, 我們就需要注意了, 若我們聲明的隊列的屬性和已存在的隊列的屬性一致, 則不會有任何的問題, 但是如果先后兩次聲明的隊列的屬性不一致, 則會有 PRECONDITION_FAILED 錯誤(錯誤碼為406).

關(guān)于隊列名

AMQP 的隊列名不能以 "amq." 開頭, 因為這樣的隊列名是 AMQP broker 內(nèi)部所使用的. 當(dāng)我們使用了這樣的隊列名時, 那么會有一個 ACCESS_REFUSED 錯誤(錯誤碼為 403)

關(guān)于持久隊列

持久隊列會被持久化到磁盤中, 因此即使 broker 重啟了, 持久隊列也依然存在.
不過需要注意的是, 不要將持久隊列和消息的持久化混淆. 當(dāng) broker 重啟時, 持久隊列會自動重新聲明, 然而只有隊列中的持久化消息(persistent message)才會被恢復(fù).

隊列的綁定

隊列的綁定關(guān)系是 exchagne 用于消息路由的規(guī)則, 即一個 exchange 能夠?qū)⑾⒙酚傻侥硞€隊列的前提是此隊列已經(jīng)綁定到這個 exchange 中了. 當(dāng)隊列綁定到一個 exchange 中時, 我們還可以設(shè)置一個額外的參數(shù), 即 route key, 這個 key 會被 direct exchange 和 topic exchange 作為額外的路由信息而使用, 換句話說, route key 扮演著過濾器的角色.
當(dāng)一個消息沒有被路由到任意的隊列時(例如此 exchange 沒有任何的 queue 綁定著), 那么此時會根據(jù)消息的屬性來決定是將此消息丟棄還是返回給生產(chǎn)者.

消費者

AMQP 0-9-1 支持兩種消息分發(fā)模式:

push 模式, 即 broker 主動推送消息給消費者

pull 模式, 即消費者主動從 broker 中拉取消息.

在 push 模式時, 應(yīng)用程序需要告知 broker 它對哪些消息感興趣, 即也就是我們所說的訂閱一個消息主題. 每個消費者都有一個惟一的標(biāo)識符, 即consumer tag, 我們可以用這個 tag 來取消一個消費者對某個主題的訂閱(unsubscribe).

消息的 ACK

AMQP 0-9-1 有兩種消息 ACK 模式:

自動 ACK 模式

手動 ACK 模式

在自動 ACK 模式下, 當(dāng) broker 發(fā)送消息成功后, 會立即將此消息從消息隊列中刪除, 而不會等待消費者的 ACK 回復(fù). 而在手動 ACK 模式下, 當(dāng) broker 發(fā)送消息給消費者時, 不會立即將此消息刪除, 而是需要等待消費者的 ACK 回復(fù)后才會刪除消息. 因此在手動 ACK 模式下, 當(dāng)消費者收到消息并處理完成后, 需要向 broker 顯示地發(fā)送 ACK 指令.
在手動 ACK 模式下, 如果消費者因為意外的 crash 而沒有發(fā)送 ACK 給 broker, 那么此時 broker 會將此消息轉(zhuǎn)發(fā)給其他的消費者(如果此時沒有消費者了, 那么 broker 會緩存此消息, 直到有新的消費者注冊).

拒絕消息

當(dāng)一個消費者處理消息失敗或此時不能處理消息時, 那么可以給 broker 發(fā)送一個拒絕消息的指令, 并且可以要求 broker 丟棄或重新分發(fā)此消息.
不過需要注意的是, 如果此時只有一個消費者, 那么當(dāng)此消費者拒收消息并要求 broker 重新分發(fā)此消息時, 那么就會造成了此消息不斷地分發(fā)和拒收, 形成了死循環(huán).

預(yù)取消息

通過預(yù)取消息機制, 消費者可以一次性批量取出消息, 然后在處理后對這些批量消息進(jìn)行統(tǒng)一的 ACK 回復(fù), 這樣可以提高消息的吞吐量.
不過, 需要注意的時, RabbitMQ 僅支持 channel 級別的預(yù)取消息的數(shù)量配置, 不支持基于連接的預(yù)取消息數(shù)量配置.

連接

AMQP 的連接是長連接, 它是一個使用 TCP 作為可靠傳輸?shù)膽?yīng)用層協(xié)議.

通道(Channel)

AMQP 不推薦一個應(yīng)用程序發(fā)起多個對 broker 的連接, 因為這樣會消耗系統(tǒng)資源并且也不利于防火墻的配置. 但是如果應(yīng)用程序確實需要有多個不互相干擾的連接來進(jìn)行不同的操作時該怎么辦呢? 為了解決這個問題, AMQP 引入了 Channel 的 概念. 在 AMQP 0-9-1 中, 一個與 broker 的連接是被多個 Channel 復(fù)用的, 因此我們可以將 channel 理解為: 一個共享同一個 TCP 連接的輕量級的連接.

基于同一個 TCP 連接的兩個不同的 channel 直接是不會有任何的干擾的(在邏輯上可以等效地理解為兩個獨立的連接), 因此客戶端和 broker 之間交互時, 需要附帶上 channel id.
通常來說, 在一個多線程消費消息的模型中, 每個線程多帶帶打開一個 channel 是一個推薦的做法, 而最好不要在各個線程中共享一個 channel.

Virtual host

為了在一個 broker 中實現(xiàn)不同的相互隔離的環(huán)境(例如每個環(huán)境中有不同的用戶, 不同的 exchange, 不同的隊列等), AMQP 引入了一個叫做 virtual host(vhost) 的概念. 在連接 broker 時, 客戶端可以指定需要使用哪個 vhost.

本文由 yongshun 發(fā)表于個人博客, 采用 署名-相同方式共享 3.0 中國大陸許可協(xié)議.
Email: yongshun1228@gmail.com
本文標(biāo)題為: RabbitMQ AMQP 消息模型攻略
本文鏈接為: https://segmentfault.com/a/1190000007123977

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/66128.html

相關(guān)文章

  • 消息中間件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP協(xié)議!

    摘要:后續(xù)介紹交換機,生產(chǎn)者直接將消息投遞到中。消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由和組成。也稱為消息隊列,保存消息并將它們轉(zhuǎn)發(fā)給消費者。主要是應(yīng)為和有一個綁定的關(guān)系。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif); showImg(https://img-blog.csdnimg.cn/20190731191914...

    sihai 評論0 收藏0
  • 新手也能看懂,消息隊列其實很簡單

    摘要:通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能即通過異步處理,將短時間高并發(fā)產(chǎn)生的事務(wù)消息存儲在消息隊列中,從而削平高峰期的并發(fā)事務(wù)。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數(shù)接近 16k)。地址:https://github.com/Snailclimb... 本文內(nèi)容思維導(dǎo)圖:showImg(ht...

    Clect 評論0 收藏0
  • 轉(zhuǎn): RabbitMQ與PHP(一)

    摘要:需要特別明確的概念交換機的持久化,并不等于消息的持久化。消息的處理,是有兩種方式,一次性。在上述示例中,使用的,意味著接收全部的消息。注意與是兩個不同的隊列。后端處理,可以針對每一個啟動一個或多個,以提高消息處理的實時性。 RabbitMQ與PHP(一) 項目中使用RabbitMQ作為隊列處理用戶消息通知,消息由前端PHP代碼產(chǎn)生,處理消息使用Python,這就導(dǎo)致代碼一致性問題,調(diào)...

    wpw 評論0 收藏0
  • 慕課網(wǎng)_《RabbitMQ消息中間件極速入門與實戰(zhàn)》學(xué)習(xí)總結(jié)

    摘要:慕課網(wǎng)消息中間件極速入門與實戰(zhàn)學(xué)習(xí)總結(jié)時間年月日星期三說明本文部分內(nèi)容均來自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實戰(zhàn)》學(xué)習(xí)總結(jié) 時間:2018年09月05日星期三 說明:本文部分內(nèi)容均來自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 評論0 收藏0
  • RabbitMQ學(xué)習(xí)筆記

    摘要:消息持久化控制的屬性就是消息的持久化。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時,兩個消費者都會收到消息并處理當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時,只有消費者可以接收到消息。八的消息確認(rèn)機制在中,可以通過持久化數(shù)據(jù)解決服務(wù)器異常的數(shù)據(jù)丟失問題。 一、內(nèi)容大綱&使用場景 1. 消息隊列解決了什么問題? 異步處理 應(yīng)用解耦 流量削鋒 日志處理 ...... 2. rabbitMQ安裝與配置 3. Java操...

    zacklee 評論0 收藏0

發(fā)表評論

0條評論

xietao3

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<