亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

moquette改造筆記(五):設(shè)備連接頻繁上下線或者相互頂替出現(xiàn)的設(shè)備上下線狀態(tài)錯亂問題

betacat / 3253人閱讀

摘要:發(fā)現(xiàn)問題在使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個設(shè)備一樣相互頂替連接的情況下,的和的方法調(diào)用沒有先后順序,如果在這兩個方法里面來記錄設(shè)備上下線狀態(tài),會造成狀態(tài)不對。因?yàn)橄嗷ロ斕娴那闆r并不多見,因此兩個也可以接受,在性能上并不會造成多大影響。

發(fā)現(xiàn)問題

在moquette使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個設(shè)備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConnect和onConnectionLost的方法調(diào)用沒有先后順序,如果在這兩個方法里面來記錄設(shè)備上下線狀態(tài),會造成狀態(tài)不對。

io.moquette.spi.impl.ProtocolProcessor中的processConnect(Channel channel, MqttConnectMessage msg)部分代碼如下

ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
        final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
        if (existing != null) {
            LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
            existing.abort();
            this.connectionDescriptors.removeConnection(existing);
            this.connectionDescriptors.addConnection(descriptor);
        }

        initializeKeepAliveTimeout(channel, msg, clientId);
        storeWillMessage(msg, clientId);
        if (!sendAck(descriptor, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        m_interceptor.notifyClientConnected(msg);

可以看到existing.abort();后會m_interceptor.notifyClientConnected(msg); 先斷開原來的連接,然后接著通知上線。由于Netty本身就是異步的,再加上InterceptHandler相關(guān)方法的調(diào)用都是在線程池中進(jìn)行的,因此nterceptHandler的onConnect和onConnectionLost的方法調(diào)用先后順序是無法保證的。

解決方法

在ChannelHandler鏈中添加一個handler,專門處理設(shè)備上線事件,對于相同ClientId的連接已經(jīng)存在時,連接斷開和連接事件強(qiáng)制加上時序。

@Sharable
public class AbrotExistConnectionMqttHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(AbrotExistConnectionMqttHandler.class);
    private final ProtocolProcessor m_processor;

    private static final ReentrantLock[] locks = new ReentrantLock[8];

    static {
        for (int i = 0; i < locks.length; i++) {
            locks[i] = new ReentrantLock();
        }
    }

    public AbrotExistConnectionMqttHandler(ProtocolProcessor m_processor) {
        this.m_processor = m_processor;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
        MqttMessage msg = (MqttMessage) message;
        MqttMessageType messageType = msg.fixedHeader().messageType();
        LOG.debug("Processing MQTT message, type: {}", messageType);

        if (messageType != MqttMessageType.CONNECT) {
            super.channelRead(ctx, message);
            return;
        }

        MqttConnectMessage connectMessage = (MqttConnectMessage) msg;
        String clientId = connectMessage.payload().clientIdentifier();

        /**
         * 通過鎖和sleep來解決設(shè)備互頂出現(xiàn)的設(shè)備上線和下線回調(diào)時序錯亂的問題
         * 目前解決的方式通過sleep不是太好
         * 解決了多個連接互相頂替出現(xiàn)的問題(有一個連接先連接的情況)
         * */
        ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length];
        lock.lock();
        try {
            if (!m_processor.isConnected(clientId)) {
                super.channelRead(ctx, message);
                return;
            }
            m_processor.abortIfExist(clientId);
            Thread.sleep(50);
            super.channelRead(ctx, message);
            Thread.sleep(30);
        } catch (Exception ex) {
            ex.printStackTrace();
            super.channelRead(ctx, message);
        } finally {
            lock.unlock();
        }

    }
}

解釋:
1.通過ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length];來保證相同的ClientId的連接都會獲得同一個鎖
2.通過兩次Thread.sleep(50);將斷開連接和處理設(shè)備上線變成先后順序關(guān)系。
3.因?yàn)橄嗷ロ斕娴那闆r并不多見,因此兩個Thread.sleep()也可以接受,在性能上并不會造成多大影響。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/77220.html

相關(guān)文章

  • moquette改造筆記(一):整合到SpringBoot

    摘要:整合到本文更加注重代碼實(shí)踐,對于配置相關(guān)的知識會一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導(dǎo)入。接口是預(yù)留給開發(fā)者根據(jù)不同事件處理業(yè)務(wù)邏輯的接口。改造筆記二優(yōu)化邏輯 Moquette簡介 Mqtt作為物聯(lián)網(wǎng)比較流行的協(xié)議現(xiàn)在已經(jīng)被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實(shí)現(xiàn)的輕量級的MQTT BROKEN. Moquet...

    young.li 評論0 收藏0
  • 程序員筆記|詳解Eureka 緩存機(jī)制

    摘要:和二級緩存影響狀態(tài)更新,縮短這兩個定時任務(wù)周期可減少滯后時間,例如配置更新周期更新周期服務(wù)提供者保證服務(wù)正常下線。服務(wù)提供者延遲下線。 引言 Eureka是Netflix開源的、用于實(shí)現(xiàn)服務(wù)注冊和發(fā)現(xiàn)的服務(wù)。Spring Cloud Eureka基于Eureka進(jìn)行二次封裝,增加了更人性化的UI,使用更為方便。但是由于Eureka本身存在較多緩存,服務(wù)狀態(tài)更新滯后,最常見的狀況是:服務(wù)...

    mgckid 評論0 收藏0
  • 網(wǎng)關(guān)實(shí)現(xiàn)灰度發(fā)布

    摘要:就是一種灰度發(fā)布方式,讓一部分用戶繼續(xù)用,一部分用戶開始用,如果用戶對沒有什么反對意見,那么逐步擴(kuò)大范圍,把所有用戶都遷移到上面來?;叶劝l(fā)布可以保證整體系統(tǒng)的穩(wěn)定,在初始灰度的時候就可以發(fā)現(xiàn)調(diào)整問題,以保證其影響度。 一、背景互聯(lián)網(wǎng)產(chǎn)品開發(fā)有個非常特別的地方,就是不停的升級,升級,再升級。采用敏捷開發(fā)的方式,基本上保持每周或者每兩周一次的發(fā)布頻率,系統(tǒng)升級總是伴隨著各種風(fēng)險,新舊版本兼...

    stormjun 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<