亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

RocketMQ源碼學(xué)習(xí)(三)-Broker(與Producer交互部分)

rickchen / 2004人閱讀

摘要:如果干凈頁(yè)不足,此時(shí)寫(xiě)入會(huì)被阻塞,系統(tǒng)嘗試刷盤部分?jǐn)?shù)據(jù),大約每次嘗試個(gè),異步刷盤消息收到后返回同時(shí)調(diào)用消息存儲(chǔ)過(guò)程異常情況下可能造成少量數(shù)據(jù)丟失來(lái)找出更多干凈。刷盤線程刷盤后,喚醒前端等待線程,可能是一批線程。

這次源碼學(xué)習(xí)的方法是帶著問(wèn)題學(xué)習(xí)源碼實(shí)現(xiàn),問(wèn)題列表如下

Broker 怎么接收消息的?

Broker 異常情況下怎么保證數(shù)據(jù)可靠性?

Broker 怎么保證存儲(chǔ)高吞吐量?

Broker 消息堆積應(yīng)該怎么處理?

Broker 怎么處理定時(shí)消息的?

Broker 的buffer滿了怎么辦?

Broker 怎么處理定時(shí)消息的?

Broker 鏈接復(fù)用嗎?

Broker 和Name Server的心跳怎么實(shí)現(xiàn)的?

Broker 怎么處理超時(shí)連接?

Broker

消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息,一般也稱為 Server。在 JMS 規(guī)范中稱為 Provider。但是RocketMQ的Broker和JMS1.1定義的不太一樣,比如JMS中P2P消息消費(fèi)過(guò)后會(huì)刪除.

Broker 怎么接收消息的?

源碼探尋的入口從BrokerController.initialize開(kāi)始,其中啟動(dòng)了個(gè)NettyRemotingServer,注冊(cè)了很多處理器

this.remotingServer = new NettyRemotingServer,(this.nettyServerConfig, this.clientHousekeepingService);
...
 this.registerProcessor();

BrokerController.registerProcessor
把SendMessageProcessor(生產(chǎn)者發(fā)送消息處理器)注冊(cè)至NettyServer

public void registerProcessor() {
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        

SendMessageProcessor.processRequest
在真正處理消息前后加上hook

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (requestHeader.isBatch()) {
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }

                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }

SendMessageProcessor.sendMessage
構(gòu)建內(nèi)部存儲(chǔ)的MessageExtBrokerInner,然后委托給DefaultMessageStore做存儲(chǔ)

    private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
        final RemotingCommand request, //
        final SendMessageContext sendMessageContext, //
        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

        response.setOpaque(request.getOpaque());

        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

        log.debug("receive SendMessage request command, {}", request);

        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
        if (this.brokerController.getMessageStore().now() < startTimstamp) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
            return response;
        }

        response.setCode(-1);
        super.msgCheck(ctx, requestHeader, response);
        if (response.getCode() != -1) {
            return response;
        }

        final byte[] body = request.getBody();



        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return response;
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());

        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (traFlag != null) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
                return response;
            }
        }
        //存放消息
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

    }

DefaultMessageStore.putMessage
這里的邏輯是先看數(shù)據(jù)可以存儲(chǔ)不可以,沒(méi)什么問(wèn)題的話再委托給CommitLog做存儲(chǔ)

    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }

        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }

        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        } else {
            this.printTimes.set(0);
        }

        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }

        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }

        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }

        long beginTime = this.getSystemClock().now();
        //委托CommitLog存儲(chǔ)消息
        PutMessageResult result = this.commitLog.putMessage(msg);

        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 500) {
            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

CommitLog.putMessage

  public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        //獲取到要存儲(chǔ)文件的內(nèi)存映射
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            //附加消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        if (eclipseTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        //處理磁盤刷盤
        handleDiskFlush(result, putMessageResult, msg);
        //處理高可用
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

MappedFile.appendMessage

  public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
        return appendMessagesInner(msg, cb);
    }

MappedFile.appendMessagesInner

   public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

        int currentPos = this.wrotePosition.get();

        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result = null;
            if (messageExt instanceof MessageExtBrokerInner) {
            //繼續(xù)委托給CommitLog.doAppend
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos,  this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

CommitLog.doAppend
為結(jié)構(gòu)化消息,又加點(diǎn)附屬信息,最終通過(guò)mappedByteBuffer存儲(chǔ)到內(nèi)存中

    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
            final MessageExtBrokerInner msgInner) {
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET 
// PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); this.resetByteBuffer(hostHolder, 8); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append("-"); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } /** * Serialize message */ final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer //這里終于把消息放到bytebuffer了,刷盤邏輯看下節(jié) byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; }

至此,數(shù)據(jù)存儲(chǔ)于Broker磁盤最后的文件中了

Broker 異常情況下怎么保證數(shù)據(jù)可靠性?

異常情況:

1. Broker 正常關(guān)閉
2. Broker 異常 Crash
3. OS Crash
4. 機(jī)器掉電,但是能立即恢復(fù)供電情況。
5. 機(jī)器無(wú)法開(kāi)機(jī)(可能是cpu、主板、內(nèi)存等關(guān)鍵設(shè)備損壞)
6. 磁盤設(shè)備損壞。

1-4種情況都屬于硬件資源可立即恢復(fù)情況,RocketMQ在這四種情況下能保證消息不丟,或者丟失少量數(shù)據(jù)(依賴刷盤方式是同步還是異步).
5-6屬于單點(diǎn)故障,且無(wú)法恢復(fù),一旦發(fā)生,在此單點(diǎn)上的消息全部丟失。RocketMQ 在這兩種情況下,通過(guò)異步復(fù)制,可保證99%的消息不丟,但是仍然會(huì)有極少量的消息可能丟失。通過(guò)同步雙寫(xiě)技術(shù)可以完全避免單點(diǎn),同步雙寫(xiě)勢(shì)必會(huì)影響性能,適合對(duì)消息可靠性要求極高的場(chǎng)合,例如與Money相關(guān)的應(yīng)用。
刷盤策略:
RocketMQ 的所有消息都是持久化的,先寫(xiě)入系統(tǒng)PAGECACHE,然后刷盤,可以保證內(nèi)存與磁盤都有一份數(shù)據(jù),訪問(wèn)時(shí),直接從內(nèi)存讀取。
異步刷盤:

在有 RAID 卡,SAS 15000 轉(zhuǎn)磁盤測(cè)試順序?qū)懳募?,速度可以達(dá)到 300M 每秒左右,而線上的網(wǎng)卡一般都為千兆網(wǎng)卡,寫(xiě)磁盤速度明顯快于數(shù)據(jù)網(wǎng)絡(luò)入口速度,那么是否可以做到寫(xiě)完內(nèi)存就向用戶返回,由后臺(tái)線程刷盤呢?
(1). 由于磁盤速度大于網(wǎng)卡速度,那么刷盤的進(jìn)度肯定可以跟上消息的寫(xiě)入速度。

(2). 萬(wàn)一由于此時(shí)系統(tǒng)壓力過(guò)大,可能堆積消息,除了寫(xiě)入 IO,還有讀取 IO,萬(wàn)一出現(xiàn)磁盤讀取落后情況,

會(huì)不會(huì)導(dǎo)致系統(tǒng)內(nèi)存溢出,答案是否定的,原因如下:

a) 寫(xiě)入消息到 PAGECACHE 時(shí),如果內(nèi)存不足,則嘗試丟棄干凈的 PAGE,騰出內(nèi)存供新消息使用,策略是 LRU 方式。
b) 如果干凈頁(yè)不足,此時(shí)寫(xiě)入PAGECACHE會(huì)被阻塞,系統(tǒng)嘗試刷盤部分?jǐn)?shù)據(jù),大約每次嘗試 32 個(gè) PAGE,異步刷盤:消息收到后,返回Producer Ok,同時(shí)調(diào)用消息存儲(chǔ)過(guò)程,異常情況下,可能造成少量數(shù)據(jù)丟失來(lái)找出更多干凈 PAGE。

綜上,內(nèi)存溢出的情況不會(huì)出現(xiàn)

同步刷盤:

>同步刷盤與異步刷盤的唯一區(qū)別是異步刷盤寫(xiě)完 PAGECACHE 直接返回,而同步刷盤需要等待刷盤完成才返回, 同步刷盤流程如下:

(1). 寫(xiě)入 PAGECACHE 后,線程等待,通知刷盤線程刷盤。
(2). 刷盤線程刷盤后,喚醒前端等待線程,可能是一批線程。
(3). 前端等待線程向用戶返回成功。

代碼走讀:

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
            //構(gòu)建同步刷盤請(qǐng)求
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //通知刷盤
                service.putRequest(request);
              
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            //異步耍
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }

CommitLog的構(gòu)造方法中,同步刷和異步刷用兩個(gè)class實(shí)現(xiàn)

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
            defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
        this.defaultMessageStore = defaultMessageStore;

        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            //同步刷盤
            this.flushCommitLogService = new GroupCommitService();
        } else {
            //異步刷盤
            this.flushCommitLogService = new FlushRealTimeService();
        }

        this.commitLogService = new CommitRealTimeService();

        this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
        batchEncoderThreadLocal = new ThreadLocal() {
            @Override protected MessageExtBatchEncoder initialValue() {
                return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
            }
        };
        this.putMessageLock =  defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();

    }

先看同步刷盤:GroupCommitService
通知刷盤

  public synchronized void putRequest(final GroupCommitRequest request) {
            //把請(qǐng)求加入待寫(xiě)列表
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            //通知刷盤
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

等待刷盤

      public boolean waitForFlush(long timeout) {
            try {
                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                return this.flushOK;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            }
        }

這里也沒(méi)做什么啊,只是通過(guò)閉鎖來(lái)等待刷盤,接下來(lái)找下哪里countDownLatch.countDown.

public void wakeupCustomer(final boolean flushOK) {
            this.flushOK = flushOK;
            this.countDownLatch.countDown();
        }

終于看到了刷盤flush

 private void doCommit() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        //重試機(jī)制刷盤
                        boolean flushOK = false;
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }
                        //刷盤成功,釋放閉鎖
                        req.wakeupCustomer(flushOK);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }

又封裝了一層,接著往里看

    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }

        return result;
    }

終于看到了Java NIO的操作,

    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                    //JAVA NIO 刷盤
                        this.fileChannel.force(false);
                    } else {
                    //JAVA NIO 刷盤
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

而doCommit又在另一個(gè)線程里面不停的循環(huán).
猜測(cè):同步刷盤為什么使用閉鎖控制另一個(gè)線程進(jìn)行刷盤操作(而不是同步方法),我認(rèn)為是io讀寫(xiě)是個(gè)費(fèi)時(shí)操作,需要控制timeout,主要使用countDownLatch.await(timeout, TimeUnit.MILLISECONDS)函數(shù)的超時(shí)功能才用多線程控制.

       public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                // 等待時(shí)機(jī)喚醒,然后執(zhí)行flush操作
                    this.waitForRunning(10);
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // Under normal circumstances shutdown, wait for the arrival of the
            // request, and then flush
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn("GroupCommitService Exception, ", e);
            }

            synchronized (this) {
                this.swapRequests();
            }

            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

異步刷盤:
異步喚醒邏輯就是試圖喚醒刷盤邏輯而不等待阻塞.具體的刷盤邏輯就不再看了,與同步類似,在FlushRealTimeService的run()方法內(nèi)

    flushCommitLogService.wakeup();
  public void wakeup() {
        if (hasNotified.compareAndSet(false, true)) {
            waitPoint.countDown(); // notify
        }
    }
Broker 怎么保證存儲(chǔ)高吞吐量?

Broker系統(tǒng)的瓶頸在IO操作,RocketMQ使用的時(shí)文件存儲(chǔ)的方式,使用Java NIO的內(nèi)存直接映射避免了文件到系統(tǒng)調(diào)用再到用戶空間的兩次調(diào)用,根據(jù)kafka官方文檔可以達(dá)到600M/s.上述代碼中的MappedByteBuffer就是內(nèi)存映射文件的Java實(shí)現(xiàn).
關(guān)于NIO,可以參見(jiàn)Java NIO-閱讀筆記及總結(jié)

Broker 消息堆積應(yīng)該怎么處理?

消息中間件的主要功能是異步解耦,還有個(gè)重要功能是擋住前端的數(shù)據(jù)洪峰,保證后端系統(tǒng)的穩(wěn)定性,這就要 求消息中間件具有一定的消息堆積能力,消息堆積分以下兩種情況:

消息堆積在內(nèi)存 Buffer,一旦超過(guò)內(nèi)存 Buffer,可以根據(jù)一定的丟棄策略來(lái)丟棄消息,如 CORBA Notification 規(guī)范中描述。適合能容忍丟棄消息的業(yè)務(wù),這種情況消息的堆積能力主要在于內(nèi)存 Buffer 大小,而且消息 堆積后,性能下降不會(huì)太大,因?yàn)閮?nèi)存中數(shù)據(jù)多少對(duì)于對(duì)外提供的訪問(wèn)能力影響有限。

消息堆積到持久化存儲(chǔ)系統(tǒng)中,例如DB,KV存儲(chǔ),文件記錄形式。
當(dāng)消息不能在內(nèi)存 Cache 命中時(shí),要不可避免的訪問(wèn)磁盤,會(huì)產(chǎn)生大量讀 IO,讀 IO 的吞吐量直接決定了 消息堆積后的訪問(wèn)能力。

評(píng)估消息堆積能力主要有以下四點(diǎn):

消息能堆積多少條,多少字節(jié)?即消息的堆積容量。

依賴磁盤大小

    b. 消息堆積后,發(fā)消息的吞吐量大小,是否會(huì)受堆積影響?

無(wú) SLAVE 情況,會(huì)受一定影響
有 SLAVE 情況,不受影響

    c. 消息堆積后,正常消費(fèi)的Consumer是否會(huì)受影響?

無(wú) SLAVE 情況,會(huì)受一定影響
有 SLAVE 情況,不受影響

    d . 消息堆積后,訪問(wèn)堆積在磁盤的消息時(shí),吞吐量有多大?

與訪問(wèn)的并發(fā)有關(guān),最慢會(huì)降到 5000 左右。

在有 Slave 情況下,Master 一旦發(fā)現(xiàn) Consumer 訪問(wèn)堆積在磁盤的數(shù)據(jù)時(shí),會(huì)向 Consumer 下達(dá)一個(gè)重定向指令,令 Consumer從Slave拉取數(shù)據(jù),這樣正常的發(fā)消息與正常消費(fèi)的Consumer都不會(huì)因?yàn)橄⒍逊e受影響,因?yàn)橄到y(tǒng)將堆積場(chǎng)景與非堆積場(chǎng)景分割在了兩個(gè)不同的節(jié)點(diǎn)處理。這里會(huì)產(chǎn)生另一個(gè)問(wèn)題,Slave會(huì)不會(huì)寫(xiě)性能下降,答案是否定的。因?yàn)?Slave 的消息寫(xiě)入只追求吞吐量,不追求實(shí)時(shí)性,只要整體的吞吐量高就可以,而 Slave每次都是從Master拉取一批數(shù)據(jù),如1M,這種批量順序?qū)懭敕绞郊词苟逊e情況,整體吞吐量影響相對(duì)較小,只是寫(xiě)入 RT 會(huì)變長(zhǎng)。

Broker 怎么處理定時(shí)消息的?

Borker的定時(shí)處理原理是將定時(shí)消息放入特定的topic(SCHEDULE_TOPIC),然后通過(guò)后臺(tái)線程,到時(shí)間過(guò)再把msg放入原有topic.

放入ScheduleMessageService.SCHEDULE_TOPIC
在CommitLog.putMessage中有處理這段的邏輯,通過(guò)降低QueueId,并設(shè)置topic為ScheduleMessageService.SCHEDULE_TOPIC,將原來(lái)的這兩個(gè)值放入屬性保存

if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }

后臺(tái)線程處理定時(shí)topic.ScheduleMessageService.SCHEDULE_TOPIC
BrokerController啟動(dòng)時(shí),會(huì)啟動(dòng)一個(gè)定時(shí)線程來(lái)處理延時(shí)消息

  if (this.messageStore != null) {
            this.messageStore.start();
        }
    
    //上述代碼的 這里的start
    public void start() throws Exception {
        this.flushConsumeQueueService.start();
        this.commitLog.start();
        this.storeStatsService.start();

        if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
            this.scheduleMessageService.start();
        }

        if (this.getMessageStoreConfig().isDuplicationEnable()) {
            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
        } else {
            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
        }
        this.reputMessageService.start();

        this.haService.start();

        this.createTempFile();
        this.addScheduleTask();
        this.shutdown = false;
    }
        
    public void start() {

        for (Map.Entry entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
            //這個(gè)定時(shí)任務(wù)就是處理延遲消息的
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    ScheduleMessageService.this.persist();
                } catch (Exception e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
 @Override
        public void run() {
            try {
                this.executeOnTimeup();
            } catch (Exception e) {
                // XXX: warn and notify me
                log.error("ScheduleMessageService, executeOnTimeup exception", e);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
            }
        }

延遲putMessage,計(jì)算時(shí)間邏輯很復(fù)雜,暫時(shí)不深究

 public void executeOnTimeup() {
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can"t find ext content.So re compute tags code.
                                    log.error("[BUG] can"t find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;

                            if (countdown <= 0) {
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) {
                                    try {
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                            continue;
                                        } else {
                                            // XXX: warn and notify me
                                            log.error(
                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) {
                                        /*
                                         * XXX: warn and notify me



                                         */
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else {
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {

                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else {
                    /*


                     */
                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }
Broker 的buffer滿了怎么辦?

Broker 的 Buffer 通常指的是 Broker 中一個(gè)隊(duì)列的內(nèi)存 Buffer 大小,這類 Buffer 通常大小有限,如果 Buffer 滿 了以后怎么辦?

CORBA Notification 規(guī)范
(1). RejectNewEvents
拒絕新來(lái)的消息,向 Producer 返回 RejectNewEvents 錯(cuò)誤碼。
(2). 按照特定策略丟棄已有消息
a) AnyOrder - Any event may be discarded on overflow. This is the default setting for this property.
b) FifoOrder - The first event received will be the first discarded.
c) LifoOrder - The last event received will be the first discarded.
d) PriorityOrder - Events should be discarded in priority order,
such that lower priority,events will be discarded before higher priority events.
e) DeadlineOrder - Events should be discarded in the order of shortest expiry deadline first.

RocketMQ 沒(méi)有內(nèi)存Buffer概念,RocketMQ的隊(duì)列都是持久化磁盤,數(shù)據(jù)定期清除。
對(duì)于此問(wèn)題的解決思路,RocketMQ 同其他 MQ 有非常顯著的區(qū)別,RocketMQ 的內(nèi)存 Buffer 抽象成一個(gè)無(wú)限長(zhǎng)度的隊(duì)列,不管有多少數(shù)據(jù)進(jìn)來(lái)都能裝得下,這個(gè)無(wú)限是有前提的,Broker 會(huì)定期刪除過(guò)期的數(shù)據(jù),例如 Broker 只保存3天的消息,那么這個(gè)Buffer雖然長(zhǎng)度無(wú)限,但是3天前的數(shù)據(jù)會(huì)被從隊(duì)尾刪除。

Broker HA怎么實(shí)現(xiàn)的?
在刷盤后有與Slave通信的邏輯,具體調(diào)用HAService中的服務(wù),只是個(gè)tcp請(qǐng)求,邏輯比較簡(jiǎn)單,就不再詳細(xì)分析.這里值得一提的是,這里竟然沒(méi)有委托netty實(shí)現(xiàn),而使用原始的Java NIO請(qǐng)求和處理.

 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                // Determine whether to wait
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest  request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

    }
Broker 和Name Server的事務(wù)消息怎么支持的?
1. Producer向Broker發(fā)送1條類型為TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,然后返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,通過(guò)該變量可以直接定位到消息本身),由于該類型的消息在保存的時(shí)候,commitLogOffset沒(méi)有被保存到consumerQueue中,此時(shí)客戶端通過(guò)consumerQueue取不到commitLogOffset,所以該類型的消息無(wú)法被取到,導(dǎo)致不會(huì)被消費(fèi)。
2. Producer端的TransactionExecuterImpl執(zhí)行本地操作,返回本地事務(wù)的狀態(tài),然后發(fā)送一條類型為TransactionCommitType或者TransactionRollbackType的消息到Broker確認(rèn)提交或者回滾,Broker通過(guò)Request中的commitLogOffset,獲取到上面狀態(tài)為TransactionPreparedType的消息(簡(jiǎn)稱消息A),然后重新構(gòu)造一條與消息A內(nèi)容相同的消息B,設(shè)置狀態(tài)為TransactionCommitType或者TransactionRollbackType,然后保存。其中TransactionCommitType類型的,會(huì)放commitLogOffset到consumerQueue中,TransactionRollbackType類型的,消息體設(shè)置為空,不會(huì)放commitLogOffset到consumerQueue中.

上述第一步,prepared消息與普通消息類似,只不過(guò)不放入ConsumerQueue.
第二部,結(jié)束事務(wù)消息見(jiàn)EndTransactionProcessor

        @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);

        if (requestHeader.getFromTransactionCheck()) {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("check producer[{}] transaction state, but it"s pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    LOGGER.warn("check producer[{}] transaction state, the producer commit the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());

                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("check producer[{}] transaction state, the producer rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        } else {
            switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("the producer[{}] end transaction in sending message,  and it"s pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        }

        final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
        if (msgExt != null) {
            final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if (!pgroupRead.equals(requestHeader.getProducerGroup())) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the producer group wrong");
                return response;
            }

            if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the transaction state table offset wrong");
                return response;
            }

            if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the commit log offset wrong");
                return response;
            }
//構(gòu)建與prepare一樣的消息,并設(shè)置flag為commit或rollback
            MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));

            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
            //如果rollback則消息體為空
            if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
                msgInner.setBody(null);
            }

            final MessageStore messageStore = this.brokerController.getMessageStore();
            final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
            if (putMessageResult != null) {
                switch (putMessageResult.getPutMessageStatus()) {
                    // Success
                    case PUT_OK:
                    case FLUSH_DISK_TIMEOUT:
                    case FLUSH_SLAVE_TIMEOUT:
                    case SLAVE_NOT_AVAILABLE:
                        response.setCode(ResponseCode.SUCCESS);
                        response.setRemark(null);
                        break;
                    // Failed
                    case CREATE_MAPEDFILE_FAILED:
                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark("create mapped file failed.");
                        break;
                    case MESSAGE_ILLEGAL:
                    case PROPERTIES_SIZE_EXCEEDED:
                        response.setCode(ResponseCode.MESSAGE_ILLEGAL);
                        response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
                        break;
                    case SERVICE_NOT_AVAILABLE:
                        response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
                        response.setRemark("service not available now.");
                        break;
                    case OS_PAGECACHE_BUSY:
                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark("OS page cache busy, please try another machine");
                        break;
                    case UNKNOWN_ERROR:
                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark("UNKNOWN_ERROR");
                        break;
                    default:
                        response.setCode(ResponseCode.SYSTEM_ERROR);
                        response.setRemark("UNKNOWN_ERROR DEFAULT");
                        break;
                }

                return response;
            } else {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("store putMessage return null");
            }
        } else {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("find prepared transaction message failed");
            return response;
        }

        return response;
    }
Broker 鏈接復(fù)用嗎?

同一個(gè)網(wǎng)絡(luò)連接,客戶端多個(gè)線程可以同時(shí)發(fā)送請(qǐng)求,應(yīng)答響應(yīng)通過(guò) header 中的 opaque 字段來(lái)標(biāo)識(shí)。

Broker 怎么處理超時(shí)連接?

如果某個(gè)連接超過(guò)特定時(shí)間沒(méi)有活動(dòng)(無(wú)讀寫(xiě)事件),則自動(dòng)關(guān)閉此連接,并通知上層業(yè)務(wù),清除連接對(duì)應(yīng)的 注冊(cè)信息。

Broker 和Name Server的心跳怎么實(shí)現(xiàn)的?

Broker啟動(dòng)時(shí),會(huì)在定時(shí)線程池中每30秒注冊(cè)信息至Name Server

  this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
Broker 怎么處理consumer消息的?

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/70328.html

相關(guān)文章

  • 高并發(fā)異步解耦利器:RocketMQ究竟強(qiáng)在哪里?

    摘要:它是阿里巴巴于年開(kāi)源的第三代分布式消息中間件。是一個(gè)分布式消息中間件,具有低延遲高性能和可靠性萬(wàn)億級(jí)別的容量和靈活的可擴(kuò)展性,它是阿里巴巴于年開(kāi)源的第三代分布式消息中間件。上篇文章消息隊(duì)列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊(duì)列的發(fā)展史:并且詳細(xì)介紹了RabbitMQ,其功能也是挺強(qiáng)大的,那么,為啥又要搞一個(gè)RocketMQ出來(lái)呢?是重復(fù)造輪子嗎?本文我們就帶大家來(lái)詳...

    tainzhi 評(píng)論0 收藏0
  • RocketMQ源碼學(xué)習(xí)(六)-Name Server

    摘要:完全無(wú)狀態(tài),可集群部署與集群中的其中一個(gè)節(jié)點(diǎn)隨機(jī)選擇建立長(zhǎng)連接,定期從取路由信息,并向提供服務(wù)的建立長(zhǎng)連接,且定時(shí)向發(fā)送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規(guī)則由配置決定。 問(wèn)題列表: Name Server 的作用是什么? Name Server 存儲(chǔ)了Broker的什么信息? Name Server 為Producer的提供些什么信息? Name Server 為Co...

    Joyven 評(píng)論0 收藏0
  • RocketMQ源碼學(xué)習(xí)(一)-概述

    摘要:每個(gè)與集群中的所有節(jié)點(diǎn)建立長(zhǎng)連接,定時(shí)注冊(cè)信息到所有。完全無(wú)狀態(tài),可集群部署。本系列源碼解析主要參照原理簡(jiǎn)介來(lái)追尋其代碼實(shí)現(xiàn)雖然版本不太一致但這也是能找到的最詳細(xì)的資料了接下來(lái)根據(jù)其模塊來(lái)源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對(duì)MQ的理解一直不深,上周看了,還是覺(jué)得不夠深入,找個(gè)成熟的產(chǎn)品來(lái)學(xué)習(xí)吧,RabbitMQ是erLang寫(xiě)的,Kafka是Scala寫(xiě)的,非Java寫(xiě)...

    godlong_X 評(píng)論0 收藏0
  • RocketMQ源碼學(xué)習(xí)(五)-Broker(Consumer交互部分)

    摘要:發(fā)送消息階段,不允許發(fā)送重復(fù)的消息。雖然不能嚴(yán)格保證不重復(fù),但是正常情況下很少會(huì)出現(xiàn)重復(fù)發(fā)送消費(fèi)情況,只有網(wǎng)絡(luò)異常,啟停等異常情況下會(huì)出現(xiàn)消息重復(fù)。 問(wèn)題列表 Broker 怎么響應(yīng)Consumer請(qǐng)求? Broker 怎么維護(hù)ConsumeQueue? Broker 怎么處理事務(wù)消息的 ConsumeQueue ? Broker 怎么處理定時(shí)消息的 ConsumeQueue? B...

    paulli3 評(píng)論0 收藏0
  • RocketMQ源碼學(xué)習(xí)(二)-Producer

    摘要:每個(gè)優(yōu)先級(jí)可以用不同的表示,發(fā)消息時(shí),指定不同的來(lái)表示優(yōu)先級(jí),這種方式可以解決絕大部分的優(yōu)先級(jí)問(wèn)題,但是對(duì)業(yè)務(wù)的優(yōu)先級(jí)精確性做了妥協(xié)。支持定時(shí)消息,但是不支持任意時(shí)間精度,支持特定的,例如定時(shí),,等。 Producer 生產(chǎn)者 這次源碼學(xué)習(xí)的方法是帶著問(wèn)題學(xué)習(xí)源碼實(shí)現(xiàn),問(wèn)題列表如下 Producer 同步消息怎么發(fā)送? Producer 是與NameServer什么交互? Prod...

    chengtao1633 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

rickchen

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<