亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

用redis實(shí)現(xiàn)消息隊(duì)列(實(shí)時(shí)消費(fèi)+ack機(jī)制)

KevinYan / 1502人閱讀

摘要:但是如何實(shí)現(xiàn)即時(shí)消費(fèi)如何實(shí)現(xiàn)機(jī)制這些是實(shí)現(xiàn)的關(guān)鍵所在。如何實(shí)現(xiàn)機(jī)制,即消息確認(rèn)機(jī)制。實(shí)現(xiàn)方案該方案主要解決掛掉的情況維護(hù)兩個(gè)隊(duì)列隊(duì)列和表表??偨Y(jié)作為消息隊(duì)列是有很大局限性的。因?yàn)槠渲饕匦约坝猛緵Q定它只能實(shí)現(xiàn)輕量級(jí)的消息隊(duì)列。

消息隊(duì)列

首先做簡(jiǎn)單的引入。

MQ主要是用來(lái):

解耦應(yīng)用、

異步化消息

流量削峰填谷

目前使用的較多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
網(wǎng)上的資源對(duì)各種情況都有詳細(xì)的解釋,在此不做過(guò)多贅述。本文
僅介紹如何使用Redis實(shí)現(xiàn)輕量級(jí)MQ的過(guò)程。

為什么要用Redis實(shí)現(xiàn)輕量級(jí)MQ?

在業(yè)務(wù)的實(shí)現(xiàn)過(guò)程中,就算沒(méi)有大量的流量,解耦和異步化幾乎也是處處可用,此時(shí)MQ就顯得尤為重要。但與此同時(shí)MQ也是一個(gè)蠻重的組件,例如我們?nèi)绻肦abbitMQ就必須為它搭建一個(gè)服務(wù)器,同時(shí)如果要考慮可用性,就要為服務(wù)端建立一個(gè)集群,而且在生產(chǎn)如果有問(wèn)題也需要查找功能。在中小型業(yè)務(wù)的開(kāi)發(fā)過(guò)程中,可能業(yè)務(wù)的其他整個(gè)實(shí)現(xiàn)都沒(méi)這個(gè)重。過(guò)重的組件服務(wù)會(huì)成倍增加工作量。
所幸的是,Redis提供的list數(shù)據(jù)結(jié)構(gòu)非常適合做消息隊(duì)列。
但是如何實(shí)現(xiàn)即時(shí)消費(fèi)?如何實(shí)現(xiàn)ack機(jī)制?這些是實(shí)現(xiàn)的關(guān)鍵所在。

如何實(shí)現(xiàn)即時(shí)消費(fèi)?

網(wǎng)上所流傳的方法是使用Redis中l(wèi)ist的操作BLPOP或BRPOP,即列表的阻塞式(blocking)彈出。
讓我們來(lái)看看阻塞式彈出的使用方式:

BRPOP key [key ...] timeout

此命令的說(shuō)明是:

1、當(dāng)給定列表內(nèi)沒(méi)有任何元素可供彈出的時(shí)候,連接將被 BRPOP 命令阻塞,直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止。 
2、當(dāng)給定多個(gè)key參數(shù)時(shí),按參數(shù) key 的先后順序依次檢查各個(gè)列表,彈出第一個(gè)非空列表的尾部元素。

另外,BRPOP 除了彈出元素的位置和 BLPOP 不同之外,其他表現(xiàn)一致。

以此來(lái)看,列表的阻塞式彈出有兩個(gè)特點(diǎn):

1、如果list中沒(méi)有任務(wù)的時(shí)候,該連接將會(huì)被阻塞
2、連接的阻塞有一個(gè)超時(shí)時(shí)間,當(dāng)超時(shí)時(shí)間設(shè)置為0時(shí),即可無(wú)限等待,直到彈出消息

由此看來(lái),此方式是可行的,但此為傳統(tǒng)的觀察者模式,業(yè)務(wù)簡(jiǎn)單則可使用,如A的任務(wù)只由B去執(zhí)行。但如果A和Z的任務(wù),B和C都能執(zhí)行,那使用這種方式就相形見(jiàn)肘。這個(gè)時(shí)候就應(yīng)該使用訂閱/發(fā)布模式,使業(yè)務(wù)系統(tǒng)更加清晰。
好在Redis也支持Pub/Sub(發(fā)布/訂閱)。在消息A入隊(duì)list的同時(shí)發(fā)布(PUBLISH)消息B到頻道channel,此時(shí)已經(jīng)訂閱channel的worker就接收到了消息B,知道了list中有消息A進(jìn)入,即可循環(huán)lpop或rpop來(lái)消費(fèi)list中的消息。流程如下:

其中的worker可以是多帶帶的線程,也可以是獨(dú)立的服務(wù),其充當(dāng)了Consumer和業(yè)務(wù)處理者角色。下面做實(shí)例說(shuō)明。

即時(shí)消費(fèi)實(shí)例

示例場(chǎng)景為:worker要做同步文件功能,等到有文件生成時(shí)立馬同步。

首先開(kāi)啟一個(gè)線程代表worker,來(lái)訂閱頻道channel:

@Service
public class SubscribeService {

    @Resource
    private RedisService redisService;
    @Resource
    private SynListener synListener;//訂閱者
    
    @PostConstruct
    public void subscribe() {
        new Thread(new Runnable() {

            @Override
            public void run() {
                LogCvt.info("服務(wù)已訂閱頻道:{}", channel);
                redisService.subscribe(synListener, channel);
            }
        }).start();

    }
}

代碼中的SynListener即為所聲明的訂閱者,channel為訂閱的頻道名稱,具體的訂閱邏輯如下:

@Service
public class SynListener extends JedisPubSub {

    @Resource
    private DispatchMessageHandler dispatchMessageHandler;

    @Override
    public void onMessage(String channel, String message) {
        LogCvt.info("channel:{},receives message:{}",channel,message);
        try {
            //處理業(yè)務(wù)(同步文件)
            dispatchMessageHandler.synFile();
        } catch (Exception e) {
            LogCvt.error(e.getMessage(),e);
        }
    }
}

處理業(yè)務(wù)的時(shí)候,就去list中去消費(fèi)消息:

@Service
public class DispatchMessageHandler {
    
    @Resource
    private RedisService redisService;
    @Resource
    private MessageHandler messageHandler;
    
    public void synFile(){
        while(true){
            try {
                String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());
                if (null == message){
                    break;
                }
                Thread.currentThread().setName(Tools.uuid());
                // 隊(duì)列數(shù)據(jù)處理
                messageHandler.synfile(message);
            } catch (Exception e) {
                LogCvt.error(e.getMessage(),e);
            }
        }
    }

}

這樣我們就達(dá)到了消息的實(shí)時(shí)消費(fèi)的目的。

如何實(shí)現(xiàn)ack機(jī)制?

ack,即消息確認(rèn)機(jī)制(Acknowledge)。

首先來(lái)看RabbitMQ的ack機(jī)制:

Publisher把消息通知給Consumer,如果Consumer已處理完任務(wù),那么它將向Broker發(fā)送ACK消息,告知某條消息已被成功處理,可以從隊(duì)列中移除。如果Consumer沒(méi)有發(fā)送回ACK消息,那么Broker會(huì)認(rèn)為消息處理失敗,會(huì)將此消息及后續(xù)消息分發(fā)給其他Consumer進(jìn)行處理(redeliver flag置為true)。

這種確認(rèn)機(jī)制和TCP/IP協(xié)議確立連接類似。不同的是,TCP/IP確立連接需要經(jīng)過(guò)三次握手,而RabbitMQ只需要一次ACK。

值的注意的是,RabbitMQ當(dāng)且僅當(dāng)檢測(cè)到ACK消息未發(fā)出且Consumer的連接終止時(shí)才會(huì)將消息重新分發(fā)給其他Consumer,因此不需要擔(dān)心消息處理時(shí)間過(guò)長(zhǎng)而被重新分發(fā)的情況。

那么在我們用Redis實(shí)現(xiàn)消息隊(duì)列的ack機(jī)制的時(shí)候該怎么做呢?
需要注意兩點(diǎn):

work處理失敗后,要回滾消息到原始pending隊(duì)列

假如worker掛掉,也要回滾消息到原始pending隊(duì)列

上面第一點(diǎn)可以在業(yè)務(wù)中完成,即失敗后執(zhí)行回滾消息。

實(shí)現(xiàn)方案

(該方案主要解決worker掛掉的情況)

維護(hù)兩個(gè)隊(duì)列:pending隊(duì)列和doing表(hash表)。

workers定義為ThreadPool。

由pending隊(duì)列出隊(duì)后,workers分配一個(gè)線程(單個(gè)worker)去處理消息——給目標(biāo)消息append一個(gè)當(dāng)前時(shí)間戳和當(dāng)前線程名稱,將其寫(xiě)入doing表,然后該worker去消費(fèi)消息,完成后自行在doing表擦除信息。

啟用一個(gè)定時(shí)任務(wù),每隔一段時(shí)間去掃描doing隊(duì)列,檢查每隔元素的時(shí)間戳,如果超時(shí),則由worker的ThreadPoolExecutor去檢查線程是否存在,如果存在則取消當(dāng)前任務(wù)執(zhí)行,并把事務(wù)rollback。最后把該任務(wù)從doing隊(duì)列中pop出,再重新push進(jìn)pending隊(duì)列。

在worker的某線程中,如果處理業(yè)務(wù)失敗,則主動(dòng)回滾,并把任務(wù)從doing隊(duì)列中移除,重新push進(jìn)pending隊(duì)列。

總結(jié)

Redis作為消息隊(duì)列是有很大局限性的。因?yàn)槠渲饕匦约坝猛緵Q定它只能實(shí)現(xiàn)輕量級(jí)的消息隊(duì)列。寫(xiě)在最后:沒(méi)有絕對(duì)好的技術(shù),只有對(duì)業(yè)務(wù)最友好的技術(shù),謹(jǐn)此獻(xiàn)給所有developer。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/70693.html

相關(guān)文章

  • MQ對(duì)比之RabbitMQ & Redis

    摘要:消息隊(duì)列選擇是一個(gè)由開(kāi)發(fā)的的開(kāi)源實(shí)現(xiàn)的產(chǎn)品,是一個(gè)消息代理,從生產(chǎn)者接收消息并傳遞消息至消費(fèi)者,期間可根據(jù)規(guī)則路由緩存持久化消息。綁定隊(duì)列和交換機(jī)之間的關(guān)系。根據(jù)消息的屬性和的屬性來(lái)轉(zhuǎn)發(fā)消息。 消息隊(duì)列選擇:RabbitMQ & Redis RabbitMQ RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的AMQP(Advanced Message Queue )的開(kāi)源實(shí)現(xiàn)的產(chǎn)品,Rabbi...

    notebin 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<