亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專(zhuān)欄INFORMATION COLUMN

RabbitMQ發(fā)布訂閱實(shí)戰(zhàn)-實(shí)現(xiàn)延時(shí)重試隊(duì)列

Heier / 3057人閱讀

摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。

RabbitMQ是一款使用Erlang開(kāi)發(fā)的開(kāi)源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來(lái)做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門(mén)教程開(kāi)始學(xué)習(xí)。

本文將會(huì)講解如何使用RabbitMQ實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。在這里我會(huì)帶領(lǐng)大家一步一步的實(shí)現(xiàn)一個(gè)帶有失敗重試功能的發(fā)布訂閱組件,使用該組件后可以非常簡(jiǎn)單的實(shí)現(xiàn)消息的發(fā)布訂閱,在進(jìn)行業(yè)務(wù)開(kāi)發(fā)的時(shí)候,業(yè)務(wù)開(kāi)發(fā)人員可以將主要精力放在業(yè)務(wù)邏輯實(shí)現(xiàn)上,而不需要花費(fèi)時(shí)間去理解RabbitMQ的一些復(fù)雜概念。

本文將會(huì)持續(xù)修正和更新,最新內(nèi)容請(qǐng)參考我的 GITHUB 上的 程序猿成長(zhǎng)計(jì)劃 項(xiàng)目,歡迎 Star,更多精彩內(nèi)容請(qǐng) follow me。

概要

我們將會(huì)實(shí)現(xiàn)如下功能

結(jié)合RabbitMQ的Topic模式和Work Queue模式實(shí)現(xiàn)生產(chǎn)方產(chǎn)生消息,消費(fèi)方按需訂閱,消息投遞到消費(fèi)方的隊(duì)列之后,多個(gè)worker同時(shí)對(duì)消息進(jìn)行消費(fèi)

結(jié)合RabbitMQ的 Message TTL 和 Dead Letter Exchange 實(shí)現(xiàn)消息的延時(shí)重試功能

消息達(dá)到最大重試次數(shù)之后,將其投遞到失敗隊(duì)列,等待人工介入處理bug后,重新將其加入隊(duì)列消費(fèi)

具體流程見(jiàn)下圖

生產(chǎn)者發(fā)布消息到主Exchange

主Exchange根據(jù)Routing Key將消息分發(fā)到對(duì)應(yīng)的消息隊(duì)列

多個(gè)消費(fèi)者的worker進(jìn)程同時(shí)對(duì)隊(duì)列中的消息進(jìn)行消費(fèi),因此它們之間采用“競(jìng)爭(zhēng)”的方式來(lái)爭(zhēng)取消息的消費(fèi)

消息消費(fèi)后,不管成功失敗,都要返回ACK消費(fèi)確認(rèn)消息給隊(duì)列,避免消息消費(fèi)確認(rèn)機(jī)制導(dǎo)致重復(fù)投遞,同時(shí),如果消息處理成功,則結(jié)束流程,否則進(jìn)入重試階段

如果重試次數(shù)小于設(shè)定的最大重試次數(shù)(3次),則將消息重新投遞到Retry Exchange的重試隊(duì)列

重試隊(duì)列不需要消費(fèi)者直接訂閱,它會(huì)等待消息的有效時(shí)間過(guò)期之后,重新將消息投遞給Dead Letter Exchange,我們?cè)谶@里將其設(shè)置為主Exchange,實(shí)現(xiàn)延時(shí)后重新投遞消息,這樣消費(fèi)者就可以重新消費(fèi)消息

如果三次以上都是消費(fèi)失敗,則認(rèn)為消息無(wú)法被處理,直接將消息投遞給Failed Exchange的Failed Queue,這時(shí)候應(yīng)用可以觸發(fā)報(bào)警機(jī)制,以通知相關(guān)責(zé)任人處理

等待人工介入處理(解決bug)之后,重新將消息投遞到主Exchange,這樣就可以重新消費(fèi)了

技術(shù)實(shí)現(xiàn)

Linus Torvalds 曾經(jīng)說(shuō)過(guò)

Talk is cheap. Show me the code

我分別用Java和PHP實(shí)現(xiàn)了本文所講述的方案,讀者可以通過(guò)參考代碼以及本文中的基本步驟來(lái)更好的理解

rabbitmq-pubsub-php

rabbitmq-pubsub-java

創(chuàng)建Exchange

為了實(shí)現(xiàn)消息的延時(shí)重試和失敗存儲(chǔ),我們需要?jiǎng)?chuàng)建三個(gè)Exchange來(lái)處理消息。

master 主Exchange,發(fā)布消息時(shí)發(fā)布到該Exchange

master.retry 重試Exchange,消息處理失敗時(shí)(3次以?xún)?nèi)),將消息重新投遞給該Exchange

master.failed 失敗Exchange,超過(guò)三次重試失敗后,消息投遞到該Exchange

所有的Exchange聲明(declare)必須使用以下參數(shù)

參數(shù) 說(shuō)明
exchange - Exchange名稱(chēng)
type topic Exchange 類(lèi)型
passive false 如果Exchange已經(jīng)存在,則返回成功,不存在則創(chuàng)建
durable true 持久化存儲(chǔ)Exchange,這里僅僅是Exchange本身持久化,消息和隊(duì)列需要多帶帶指定其持久化
no-wait false 該方法需要應(yīng)答確認(rèn)

Java代碼

// 聲明Exchange:主體,失敗,重試
channel.exchangeDeclare("master", "topic", true);
channel.exchangeDeclare("master.retry", "topic", true);
channel.exchangeDeclare("master.failed", "topic", true);

PHP代碼

// 普通交換機(jī)
$this->channel->exchange_declare("master", "topic", false, true, false);
// 重試交換機(jī)
$this->channel->exchange_declare("master.retry", "topic", false, true, false);
// 失敗交換機(jī)
$this->channel->exchange_declare("master.failed", "topic", false, true, false);

在RabbitMQ的管理界面中,我們可以看到創(chuàng)建的三個(gè)Exchange

消息發(fā)布

消息發(fā)布時(shí),使用basic_publish方法,參數(shù)如下

參數(shù) 說(shuō)明
message - 發(fā)布的消息對(duì)象
exchange master 消息發(fā)布到的Exchange
routing-key - 路由KEY,用于標(biāo)識(shí)消息類(lèi)型
mandatory false 是否強(qiáng)制路由,指定了該選項(xiàng)后,如果沒(méi)有訂閱該消息,則會(huì)返回路由不可達(dá)錯(cuò)誤
immediate false 指定了當(dāng)消息無(wú)法直接路由給消費(fèi)者時(shí)如何處理

發(fā)布消息時(shí),對(duì)于message對(duì)象,其內(nèi)容建議使用json編碼后的字符串,同時(shí)消息需要標(biāo)識(shí)以下屬性

"delivery_mode"=> 2 // 1為非持久化,2為持久化

Java代碼

channel.basicPublish(
    "master", 
    routingKey, 
    MessageProperties.PERSISTENT_BASIC, // delivery_mode
    message.getBytes()
);

PHP代碼

$msg = new AMQPMessage($message->serialize(), [
    "delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);

$this->channel->basic_publish($msg, "master", $routingKey);
消息訂閱

消息訂閱的實(shí)現(xiàn)相對(duì)復(fù)雜一些,需要完成隊(duì)列的聲明以及隊(duì)列和Exchange的綁定。

Declare Queue

對(duì)于每一個(gè)訂閱消息的服務(wù),都必須創(chuàng)建一個(gè)該服務(wù)對(duì)應(yīng)的隊(duì)列,將該隊(duì)列綁定到關(guān)注的路由規(guī)則,這樣之后,消息生產(chǎn)者將消息投遞給Exchange之后,就會(huì)按照路由規(guī)則將消息分發(fā)到對(duì)應(yīng)的隊(duì)列供消費(fèi)者消費(fèi)了。

消費(fèi)服務(wù)需要declare三個(gè)隊(duì)列

[queue_name] 隊(duì)列名稱(chēng),格式符合 [服務(wù)名稱(chēng)]@訂閱服務(wù)標(biāo)識(shí)

[queue_name]@retry 重試隊(duì)列

[queue_name]@failed 失敗隊(duì)列

訂閱服務(wù)標(biāo)識(shí)是客戶(hù)端自己對(duì)訂閱的分類(lèi)標(biāo)識(shí)符,比如用戶(hù)中心服務(wù)(服務(wù)名稱(chēng)ucenter),包含兩個(gè)訂閱:user和enterprise,這里兩個(gè)訂閱的隊(duì)列名稱(chēng)就為 ucenter@userucenter@enterprise,其對(duì)應(yīng)的重試隊(duì)列為 ucenter@user@retryucenter@enterprise@retry。

Declare隊(duì)列時(shí),參數(shù)規(guī)定規(guī)則如下

參數(shù) 說(shuō)明
queue - 隊(duì)列名稱(chēng)
passive false 隊(duì)列不存在則創(chuàng)建,存在則直接成功
durable true 隊(duì)列持久化
exclusive false 排他,指定該選項(xiàng)為true則隊(duì)列只對(duì)當(dāng)前連接有效,連接斷開(kāi)后自動(dòng)刪除
no-wait false 該方法需要應(yīng)答確認(rèn)
auto-delete false 當(dāng)不再使用時(shí),是否自動(dòng)刪除

對(duì)于@retry重試隊(duì)列,需要指定額外參數(shù)

"x-dead-letter-exchange"    => "master"
"x-dead-letter-routing-key" => [queue_name],
"x-message-ttl"              => 30 * 1000 // 重試時(shí)間設(shè)置為30s
這里的兩個(gè)header字段的含義是,在隊(duì)列中延遲30s后,將該消息重新投遞到x-dead-letter-exchange對(duì)應(yīng)的Exchange中,并且routing key指定為消費(fèi)隊(duì)列的名稱(chēng),這樣就可以實(shí)現(xiàn)消息只投遞給原始出錯(cuò)時(shí)的隊(duì)列,避免消息重新投遞給所有關(guān)注當(dāng)前routing key的消費(fèi)者了。

Java代碼

// 聲明監(jiān)聽(tīng)隊(duì)列
channel.queueDeclare(
    queueName, // 隊(duì)列名稱(chēng)
    true,      // durable
    false,     // exclusive
    false,     // autoDelete
    null       // arguments
);
channel.queueDeclare(queueName + "@failed", true, false, false, null);

Map arguments = new HashMap();
arguments.put("x-dead-letter-exchange", exchangeName());
arguments.put("x-message-ttl", 30 * 1000);
arguments.put("x-dead-letter-routing-key", queueName);
channel.queueDeclare(queueName + "@retry", true, false, false, arguments);

PHP代碼

$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_declare($failedQueueName, false, true, false, false, false);
$this->channel->queue_declare(
    $retryQueueName, // 隊(duì)列名稱(chēng)
    false,           // passive
    true,            // durable
    false,           // exclusive
    false,           // auto_delete
    false,           // nowait
    new AMQPTable([
        "x-dead-letter-exchange" => "master",
        "x-dead-letter-routing-key" => $queueName,
        "x-message-ttl"          => 30 * 1000,
    ])
);

在RabbitMQ的管理界面中,Queues部分可以看到我們創(chuàng)建的三個(gè)隊(duì)列

查看隊(duì)列的詳細(xì)信息,我們可以看到 queueName@retry 隊(duì)列與其它兩個(gè)隊(duì)列的不同

Bind Exchange & Queue

創(chuàng)建完隊(duì)列之后,需要將隊(duì)列與Exchange綁定(bind),不同隊(duì)列需要綁定到之前創(chuàng)建的對(duì)應(yīng)的Exchange上面

Queue Exchange
[queue_name] master
[queue_name]@retry master.retry
[queue_name]@failed master.failed

綁定時(shí),需要提供訂閱的路由KEY,該路由KEY與消息發(fā)布時(shí)的路由KEY對(duì)應(yīng),區(qū)別是這里可以使用通配符同時(shí)訂閱多種類(lèi)型的消息。

參數(shù) 說(shuō)明
queue - 綁定的隊(duì)列
exchange - 綁定的Exchange
routing-key - 訂閱的消息路由規(guī)則
no-wait false 該方法需要應(yīng)答確認(rèn)

Java代碼

// 綁定監(jiān)聽(tīng)隊(duì)列到Exchange
channel.queueBind(queueName, "master", routingKey);
channel.queueBind(queueName, exchangeName(), queueName);
channel.queueBind(queueName + "@failed", "master.failed", queueName);
channel.queueBind(queueName + "@retry", "master.retry", queueName);

PHP代碼

$this->channel->queue_bind($queueName, "master", $routingKey);
$this->channel->queue_bind($queueName, "master", $queueName);
$this->channel->queue_bind($retryQueueName, "master.retry", $queueName);
$this->channel->queue_bind($failedQueueName, "master.failed", $queueName);

在RabbitMQ的管理界面中,我們可以看到該隊(duì)列與Exchange和routing-key的綁定關(guān)系

消息消費(fèi)實(shí)現(xiàn)

使用 basic_consume 對(duì)消息進(jìn)行消費(fèi)的時(shí)候,需要注意下面參數(shù)

參數(shù) 說(shuō)明
queue - 消費(fèi)的隊(duì)列名稱(chēng)
consumer-tag - 消費(fèi)者標(biāo)識(shí),留空即可
no_local false 如果設(shè)置了該字段,服務(wù)器將不會(huì)發(fā)布消息到 發(fā)布它的客戶(hù)端
no_ack false 需要消費(fèi)確認(rèn)應(yīng)答
exclusive false 排他訪(fǎng)問(wèn),設(shè)置后只允許當(dāng)前消費(fèi)者訪(fǎng)問(wèn)該隊(duì)列
nowait false 該方法需要應(yīng)答確認(rèn)

消費(fèi)端在消費(fèi)消息時(shí),需要從消息中獲取消息被消費(fèi)的次數(shù),以此判斷該消息處理失敗時(shí)重試還是發(fā)送到失敗隊(duì)列。

Java代碼

protected Long getRetryCount(AMQP.BasicProperties properties) {
    Long retryCount = 0L;
    try {
        Map headers = properties.getHeaders();
        if (headers != null) {
            if (headers.containsKey("x-death")) {
                List> deaths = (List>) headers.get("x-death");
                if (deaths.size() > 0) {
                    Map death = deaths.get(0);
                    retryCount = (Long) death.get("count");
                }
            }
        }
    } catch (Exception e) {}

    return retryCount;
}

PHP代碼

protected function getRetryCount(AMQPMessage $msg): int
{
    $retry = 0;
    if ($msg->has("application_headers")) {
        $headers = $msg->get("application_headers")->getNativeData();
        if (isset($headers["x-death"][0]["count"])) {
            $retry = $headers["x-death"][0]["count"];
        }
    }

    return (int)$retry;
}

消息消費(fèi)完成后,需要發(fā)送消費(fèi)確認(rèn)消息給服務(wù)端,使用basic_ack方法

ack(delivery-tag=消息的delivery-tag標(biāo)識(shí))

Java代碼

// 消息消費(fèi)處理
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        ...
        // 注意,由于使用了basicConsume的autoAck特性,因此這里就不需要手動(dòng)執(zhí)行
        // channel.basicAck(envelope.getDeliveryTag(), false);
    }
};
// 執(zhí)行消息消費(fèi)處理
channel.basicConsume(
    queueName, 
    true, // autoAck
    consumer
);

PHP代碼

$this->channel->basic_consume(
    $queueName,
    "",    // customer_tag
    false, // no_local
    false, // no_ack
    false, // exclusive
    false, // nowait
    function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) {
        ...
        $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
    }
);

如果消息處理中出現(xiàn)異常,應(yīng)該將該消息重新投遞到重試Exchange,等待下次重試

basic_publish(msg, "master.retry", queueName)
ack(delivery-tag) // 不要忘記了應(yīng)答消費(fèi)成功消息

如果判斷重試次數(shù)大于3次,仍然處理失敗,則應(yīng)該講消息投遞到失敗Exchange,等待人工處理

basic_publish(msg, "master.failed", queueName)
ack(delivery-tag) // 不要忘記了應(yīng)答消費(fèi)成功消息
一定不要忘記ack消息,因?yàn)橹卦?、失敗都是通過(guò)將消息重新投遞到重試、失敗Exchange來(lái)實(shí)現(xiàn)的,如果忘記ack,則該消息在超時(shí)或者連接斷開(kāi)后,會(huì)重新被重新投遞給消費(fèi)者,如果消費(fèi)者依舊無(wú)法處理,則會(huì)造成死循環(huán)。

Java代碼

try {
    String message = new String(body, "UTF-8");
    // 消息處理函數(shù)
    handler.handle(message, envelope.getRoutingKey());

} catch (Exception e) {
    long retryCount = getRetryCount(properties);
    if (retryCount > 3) {
        // 重試次數(shù)大于3次,則自動(dòng)加入到失敗隊(duì)列
        Map headers = new HashMap<>();
        headers.put("x-orig-routing-key", getOrigRoutingKey(properties, envelope.getRoutingKey()));
        channel.basicPublish("master.failed", queueName, createOverrideProperties(properties, headers), body);
    } else {
        // 重試次數(shù)小于3,則加入到重試隊(duì)列,30s后再重試
        Map headers = properties.getHeaders();
        if (headers == null) {
            headers = new HashMap<>();
        }

        headers.put("x-orig-routing-key", getOrigRoutingKey(properties, envelope.getRoutingKey()));
        channel.basicPublish("master.retry", queueName, createOverrideProperties(properties, headers), body);
    }
}

在消息發(fā)送到重試隊(duì)列和失敗隊(duì)列時(shí),我們?cè)谙⒌膆eaders中添加了一個(gè)名為x-orig-routing-key的字段,該字段是實(shí)現(xiàn)消息重試的關(guān)鍵字段,由于我們的消息需要在不同的Exchange,Queue之間流轉(zhuǎn),為了避免消息在重新投遞到主Exchange時(shí),被所有的消費(fèi)者隊(duì)列重新消費(fèi),在重試過(guò)程中,我們將消息的routing-key修改為隊(duì)列名稱(chēng),直接投遞給原始消費(fèi)消息的隊(duì)列。x-orig-routing-key用于在之后能夠重新獲取到最開(kāi)始的routing-key。

這里的重復(fù)消費(fèi)是指 某個(gè)消息被兩個(gè)消費(fèi)方A和B消費(fèi)了,其中A消費(fèi)失敗,B成功,這時(shí)候,消息由A消費(fèi)者重新投遞到主Exchange后,B消費(fèi)隊(duì)列也會(huì)獲取到該消息,因此就會(huì)導(dǎo)致B消費(fèi)者重復(fù)消費(fèi)已經(jīng)消費(fèi)國(guó)的消息
失敗任務(wù)重試

如果任務(wù)重試三次仍未成功,則會(huì)被投遞到失敗隊(duì)列,這時(shí)候需要人工處理程序異常,處理完畢后,需要將消息重新投遞到隊(duì)列進(jìn)行處理,這里唯一需要做的就是從失敗隊(duì)列訂閱消息,然后獲取到消息后,清空其application_headers頭信息,然后重新投遞到master這個(gè)Exchange即可。

Java代碼

channel.basicPublish(
    "master", 
    envelope.getRoutingKey(),
    MessageProperties.PERSISTENT_BASIC,
    body
);

PHP代碼

$msg->set("application_headers", new AMQPTable([]));
$this->channel->basic_publish(
    $msg,
    "master",
    $msg->get("routing_key")
);
怎么使用

隊(duì)列和Exchange以及發(fā)布訂閱的關(guān)系我們就說(shuō)完了,那么使用起來(lái)是什么效果呢?這里我們以Java代碼為例

// 發(fā)布消息
Publisher publisher = new Publisher(factory.newConnection(), "master");
publisher.publish("{"id":121, "name":"guanyiyao"}", "user.create");

// 訂閱消息
new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME)
    .init("user-monitor", "user.*")
    .subscribe((message, routingKey) -> {
        // TODO 業(yè)務(wù)邏輯
        System.out.printf("    <%s> message consumed: %s
", routingKey, message);
    }
);
總結(jié)

使用RabbitMQ時(shí),實(shí)現(xiàn)延時(shí)重試和失敗隊(duì)列的方式并不僅僅局限于本文中描述的方法,如果讀者有更好的實(shí)現(xiàn)方案,歡迎拍磚,在這里我也只是拋磚引玉了。本文中講述的方法還有很多優(yōu)化空間,讀者也可以試著去改進(jìn)其實(shí)現(xiàn)方案,比如本文中使用了三個(gè)Exchagne,是否只使用一個(gè)Exchange也能實(shí)現(xiàn)本文中所講述的功能。

本文將會(huì)持續(xù)修正和更新,最新內(nèi)容請(qǐng)參考我的 GITHUB 上的 程序猿成長(zhǎng)計(jì)劃 項(xiàng)目,歡迎 Star,更多精彩內(nèi)容請(qǐng) follow me。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/30771.html

相關(guān)文章

  • RabbitMQ發(fā)布訂閱實(shí)戰(zhàn)-實(shí)現(xiàn)延時(shí)重試隊(duì)列

    摘要:本文將會(huì)講解如何使用實(shí)現(xiàn)延時(shí)重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動(dòng)延時(shí)將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。 RabbitMQ是一款使用Erlang開(kāi)發(fā)的開(kāi)源消息隊(duì)列。本文假設(shè)讀者對(duì)RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來(lái)做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門(mén)...

    vslam 評(píng)論0 收藏0
  • 新手也能看懂,消息隊(duì)列其實(shí)很簡(jiǎn)單

    摘要:通過(guò)以上分析我們可以得出消息隊(duì)列具有很好的削峰作用的功能即通過(guò)異步處理,將短時(shí)間高并發(fā)產(chǎn)生的事務(wù)消息存儲(chǔ)在消息隊(duì)列中,從而削平高峰期的并發(fā)事務(wù)。 該文已加入開(kāi)源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類(lèi)項(xiàng)目,Star 數(shù)接近 16k)。地址:https://github.com/Snailclimb... 本文內(nèi)容思維導(dǎo)圖:showImg(ht...

    Clect 評(píng)論0 收藏0
  • nsq 優(yōu)秀的消息隊(duì)列

    摘要:簡(jiǎn)介是語(yǔ)言編寫(xiě)的,開(kāi)源的分布式消息隊(duì)列中間件,其設(shè)計(jì)的目的是用來(lái)大規(guī)模地處理每天數(shù)以十億計(jì)級(jí)別的消息。 簡(jiǎn)介 NSQ是Go語(yǔ)言編寫(xiě)的,開(kāi)源的分布式消息隊(duì)列中間件,其設(shè)計(jì)的目的是用來(lái)大規(guī)模地處理每天數(shù)以十億計(jì)級(jí)別的消息。NSQ 具有分布式和去中心化拓?fù)浣Y(jié)構(gòu),該結(jié)構(gòu)具有無(wú)單點(diǎn)故障、故障容錯(cuò)、高可用性以及能夠保證消息的可靠傳遞的特征,是一個(gè)成熟的、已在大規(guī)模生成環(huán)境下應(yīng)用的產(chǎn)品。 NSQ在國(guó)...

    Atom 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<