亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

白話RabbitMQ(四): 建立路由

CoderStudy / 2180人閱讀

摘要:可以參考源碼,項目支持網(wǎng)站,最新文章或?qū)崿F(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。

推廣
RabbitMQ專題講座

https://segmentfault.com/l/15...

CoolMQ開源項目

我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項目支持網(wǎng)站: http://rabbitmq.org.cn,最新文章或?qū)崿F(xiàn)會更新在上面

前言

在訂閱/發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將log消息廣播給一些消費者。這章我們會在此基礎(chǔ)上加入一些新的特性-我們將有針對性的進(jìn)行消息分發(fā),比如,只把錯誤(error)消息保存到磁盤,與此同時,打印出所有的消息。

綁定

我們在前面的例子中,綁定是這么來做的

channel.queueBind(queueName, EXCHANGE_NAME, "");

綁定是建立交換機(jī)和隊列之間的一種聯(lián)系:隊列會接受交換機(jī)中的消息。綁定可以用一個路由鍵來指明,為了與basic_publish區(qū)分開,我們稱之為綁定鍵(binding key):

channel.queueBind(queueName, EXCHANGE_NAME, "black");

綁定鍵跟路由器類型也有關(guān)系,我們之前用的廣播路由器,會忽略掉這個值

直達(dá)交換機(jī)(Direct Exchange)

之前我們用的是廣播交換機(jī),會將消息發(fā)送給所有的消費者。這里我們希望通過log的嚴(yán)重程度進(jìn)行過濾,例如只有嚴(yán)重的錯誤才會寫入到磁盤,而warn和info消息就不用了,以此來節(jié)省磁盤空間

而廣播交換機(jī)沒法滿足這個需求-它只是無腦的發(fā)送消息。所以我們會使用直達(dá)交換機(jī)(Direct Exchange)- 消息會通過所綁定的鍵來發(fā)送給對應(yīng)的隊列,可以看如下這幅圖

如上圖所示,直達(dá)交換機(jī)X綁定了兩個隊列,C1是通過orange來綁定,而C2是通過black和green綁定。因此,發(fā)送到路由鍵orange的消息會發(fā)送給隊列Q1,發(fā)送到路由鍵black或者green的消息會發(fā)送給Q2,其它的消息將被丟棄。

多項綁定


當(dāng)然,多個隊列綁定到一個鍵上也是合法的,在這種情況下,直達(dá)交換機(jī)將會將消息發(fā)送給所有的隊列,就像廣播交換機(jī)一樣,如上圖所示,一個鍵為black的消息將會同時被發(fā)送給C1和C2.

我們首先需要創(chuàng)建一個直達(dá)路由器

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

并發(fā)送消息到這個路由器

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

上面我們是發(fā)送給"severity",簡單起見,假設(shè)有下列幾種日志類型"severity" ,"info", "warning", "error".

訂閱消息(Subscribing)

接受消息跟之前一樣,但有一點不同,我們提供了一個binding key,

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
整合

將上面的所有代碼整合到一起

EmitLogDirect.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
        System.out.println(" [x] Sent "" + severity + "":"" + message + """);

        channel.close();
        connection.close();
    }
    //..
}

ReceiveLogsDirect.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1){
      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
      System.exit(1);
    }

    for(String severity : argv){
      channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """);
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

編譯

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

只保存warning和error的消息到磁盤上

java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

將所有的消息打印到頻幕上

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

最后,發(fā)送error消息

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent "error":"Run. Run. Or it will explode."

好了,這一章就到這兒,下一章我們將講述如何基于特定模式進(jìn)行監(jiān)聽

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/68120.html

相關(guān)文章

  • 白話RabbitMQ(五): 主題路由器(Topic Exchange)

    摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。主題交換機(jī)也可以當(dāng)成其它交換機(jī)來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達(dá)路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性...

    Gilbertat 評論0 收藏0
  • 白話RabbitMQ(三):發(fā)布/訂閱

    摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發(fā)送給相應(yīng)的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...

    Ververica 評論0 收藏0
  • 白話rabbitmq(一): HelloWorld

    摘要:作為消息隊列的一個典型實踐,完全實現(xiàn)了標(biāo)準(zhǔn),與的快快快不同,它追求的穩(wěn)定可靠。同一個隊列不僅可以綁定多個生產(chǎn)者,而且能夠發(fā)送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的...

    garfileo 評論0 收藏0
  • 白話RabbitMQ(六): RPC

    摘要:因為消費消息是在另外一個進(jìn)程中,我們需要阻塞我們的進(jìn)程直到結(jié)果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務(wù)的最終一致性解決方案,請大家圍觀。可以參考...

    KevinYan 評論0 收藏0
  • 【譯】RabbitMQ系列() - 路由模式

    摘要:路由模式在之前的文章中我們建立了一個簡單的日志系統(tǒng)。更形象的表示,如對中的感興趣。為了進(jìn)行說明,像下圖這么來設(shè)置如圖,可以看到有兩個綁到了類型為的上。如圖的設(shè)置中,一個為的就會同時發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級別。 路由模式 在之前的文章中我們建立了一個簡單的日志系統(tǒng)。我們可以通過這個系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們在這之上,添加一個新的功...

    liuchengxu 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<