亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

淺析 Netty 實(shí)現(xiàn)心跳機(jī)制與斷線重連

waterc / 3541人閱讀

摘要:基礎(chǔ)何為心跳顧名思義所謂心跳即在長連接中客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包通知對方自己還在線以確保連接的有效性為什么需要心跳因?yàn)榫W(wǎng)絡(luò)的不可靠性有可能在保持長連接的過程中由于某些突發(fā)情況例如網(wǎng)線被拔出突然掉電等會造成服務(wù)器和客戶端的

基礎(chǔ) 何為心跳

顧名思義, 所謂 心跳, 即在 TCP 長連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對方自己還在線, 以確保 TCP 連接的有效性.

為什么需要心跳

因?yàn)榫W(wǎng)絡(luò)的不可靠性, 有可能在 TCP 保持長連接的過程中, 由于某些突發(fā)情況, 例如網(wǎng)線被拔出, 突然掉電等, 會造成服務(wù)器和客戶端的連接中斷. 在這些突發(fā)情況下, 如果恰好服務(wù)器和客戶端之間沒有交互的話, 那么它們是不能在短時(shí)間內(nèi)發(fā)現(xiàn)對方已經(jīng)掉線的. 為了解決這個(gè)問題, 我們就需要引入 心跳 機(jī)制. 心跳機(jī)制的工作原理是: 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會發(fā)送一個(gè)特殊的數(shù)據(jù)包給對方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對方仍然在線, 這就確保 TCP 連接的有效性.

如何實(shí)現(xiàn)心跳

我們可以通過兩種方式實(shí)現(xiàn)心跳機(jī)制:

使用 TCP 協(xié)議層面的 keepalive 機(jī)制.

在應(yīng)用層上實(shí)現(xiàn)自定義的心跳機(jī)制.

雖然在 TCP 協(xié)議層面上, 提供了 keepalive ?;顧C(jī)制, 但是使用它有幾個(gè)缺點(diǎn):

它不是 TCP 的標(biāo)準(zhǔn)協(xié)議, 并且是默認(rèn)關(guān)閉的.

TCP keepalive 機(jī)制依賴于操作系統(tǒng)的實(shí)現(xiàn), 默認(rèn)的 keepalive 心跳時(shí)間是 兩個(gè)小時(shí), 并且對 keepalive 的修改需要系統(tǒng)調(diào)用(或者修改系統(tǒng)配置), 靈活性不夠.

TCP keepalive 與 TCP 協(xié)議綁定, 因此如果需要更換為 UDP 協(xié)議時(shí), keepalive 機(jī)制就失效了.

雖然使用 TCP 層面的 keepalive 機(jī)制比自定義的應(yīng)用層心跳機(jī)制節(jié)省流量, 但是基于上面的幾點(diǎn)缺點(diǎn), 一般的實(shí)踐中, 人們大多數(shù)都是選擇在應(yīng)用層上實(shí)現(xiàn)自定義的心跳.
既然如此, 那么我們就來大致看看在在 Netty 中是怎么實(shí)現(xiàn)心跳的吧. 在 Netty 中, 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵是 IdleStateHandler, 它可以對一個(gè) Channel 的 讀/寫設(shè)置定時(shí)器, 當(dāng) Channel 在一定事件間隔內(nèi)沒有數(shù)據(jù)交互時(shí)(即處于 idle 狀態(tài)), 就會觸發(fā)指定的事件.

使用 Netty 實(shí)現(xiàn)心跳

上面我們提到了, 在 Netty 中, 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵是 IdleStateHandler, 那么這個(gè) Handler 如何使用呢? 我們來看看它的構(gòu)造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

實(shí)例化一個(gè) IdleStateHandler 需要提供三個(gè)參數(shù):

readerIdleTimeSeconds, 讀超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有從 Channel 讀取到數(shù)據(jù)時(shí), 會觸發(fā)一個(gè) READER_IDLE 的 IdleStateEvent 事件.

writerIdleTimeSeconds, 寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有數(shù)據(jù)寫入到 Channel 時(shí), 會觸發(fā)一個(gè) WRITER_IDLE 的 IdleStateEvent 事件.

allIdleTimeSeconds, 讀/寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有讀或?qū)懖僮鲿r(shí), 會觸發(fā)一個(gè) ALL_IDLE 的 IdleStateEvent 事件.

為了展示具體的 IdleStateHandler 實(shí)現(xiàn)的心跳機(jī)制, 下面我們來構(gòu)造一個(gè)具體的EchoServer 的例子, 這個(gè)例子的行為如下:

在這個(gè)例子中, 客戶端和服務(wù)器通過 TCP 長連接進(jìn)行通信.

TCP 通信的報(bào)文格式是:

+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+

客戶端每隔一個(gè)隨機(jī)的時(shí)間后, 向服務(wù)器發(fā)送消息, 服務(wù)器收到消息后, 立即將收到的消息原封不動地回復(fù)給客戶端.

若客戶端在指定的時(shí)間間隔內(nèi)沒有讀/寫操作, 則客戶端會自動向服務(wù)器發(fā)送一個(gè) PING 心跳, 服務(wù)器收到 PING 心跳消息時(shí), 需要回復(fù)一個(gè) PONG 消息.

下面所使用的代碼例子可以在我的 Github github.com/yongshun/some_java_code 上找到.

通用部分

根據(jù)上面定義的行為, 我們接下來實(shí)現(xiàn)心跳的通用部分 CustomHeartbeatHandler:

/**
 * @author xiongyongshun
 * @version 1.0
 * @email yongshun1228@gmail.com
 * @created 16/9/18 13:02
 */
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler {
    public static final byte PING_MSG = 1;
    public static final byte PONG_MSG = 2;
    public static final byte CUSTOM_MSG = 3;
    protected String name;
    private int heartbeatCount = 0;

    public CustomHeartbeatHandler(String name) {
        this.name = name;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
        if (byteBuf.getByte(4) == PING_MSG) {
            sendPongMsg(context);
        } else if (byteBuf.getByte(4) == PONG_MSG){
            System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
        } else {
            handleData(context, byteBuf);
        }
    }

    protected void sendPingMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PING_MSG);
        context.writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    private void sendPongMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PONG_MSG);
        context.channel().writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // IdleStateHandler 所產(chǎn)生的 IdleStateEvent 的處理邏輯.
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
    }

    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        System.err.println("---READER_IDLE---");
    }

    protected void handleWriterIdle(ChannelHandlerContext ctx) {
        System.err.println("---WRITER_IDLE---");
    }

    protected void handleAllIdle(ChannelHandlerContext ctx) {
        System.err.println("---ALL_IDLE---");
    }
}

類 CustomHeartbeatHandler 負(fù)責(zé)心跳的發(fā)送和接收, 我們接下來詳細(xì)地分析一下它的作用. 我們在前面提到, IdleStateHandler 是實(shí)現(xiàn)心跳的關(guān)鍵, 它會根據(jù)不同的 IO idle 類型來產(chǎn)生不同的 IdleStateEvent 事件, 而這個(gè)事件的捕獲, 其實(shí)就是在 userEventTriggered 方法中實(shí)現(xiàn)的.
我們來看看 CustomHeartbeatHandler.userEventTriggered 的具體實(shí)現(xiàn):

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent e = (IdleStateEvent) evt;
        switch (e.state()) {
            case READER_IDLE:
                handleReaderIdle(ctx);
                break;
            case WRITER_IDLE:
                handleWriterIdle(ctx);
                break;
            case ALL_IDLE:
                handleAllIdle(ctx);
                break;
            default:
                break;
        }
    }
}

在 userEventTriggered 中, 根據(jù) IdleStateEvent 的 state() 的不同, 而進(jìn)行不同的處理. 例如如果是讀取數(shù)據(jù) idle, 則 e.state() == READER_IDLE, 因此就調(diào)用 handleReaderIdle 來處理它. CustomHeartbeatHandler 提供了三個(gè) idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle, 這三個(gè)方法目前只有默認(rèn)的實(shí)現(xiàn), 它需要在子類中進(jìn)行重寫, 現(xiàn)在我們暫時(shí)略過它們, 在具體的客戶端和服務(wù)器的實(shí)現(xiàn)部分時(shí)再來看它們.

知道了這一點(diǎn)后, 我們接下來看看數(shù)據(jù)處理部分:

@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
    if (byteBuf.getByte(4) == PING_MSG) {
        sendPongMsg(context);
    } else if (byteBuf.getByte(4) == PONG_MSG){
        System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
    } else {
        handleData(context, byteBuf);
    }
}

在 CustomHeartbeatHandler.channelRead0 中, 我們首先根據(jù)報(bào)文協(xié)議:

+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+

來判斷當(dāng)前的報(bào)文類型, 如果是 PING_MSG 則表示是服務(wù)器收到客戶端的 PING 消息, 此時(shí)服務(wù)器需要回復(fù)一個(gè) PONG 消息, 其消息類型是 PONG_MSG.
扔報(bào)文類型是 PONG_MSG, 則表示是客戶端收到服務(wù)器發(fā)送的 PONG 消息, 此時(shí)打印一個(gè) log 即可.

客戶端部分 客戶端初始化
public class Client {
    public static void main(String[] args) {
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        Random random = new Random(System.currentTimeMillis());
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler());
                        }
                    });

            Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
            for (int i = 0; i < 10; i++) {
                String content = "client msg " + i;
                ByteBuf buf = ch.alloc().buffer();
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                ch.writeAndFlush(buf);

                Thread.sleep(random.nextInt(20000));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}

上面的代碼是 Netty 的客戶端端的初始化代碼, 使用過 Netty 的朋友對這個(gè)代碼應(yīng)該不會陌生. 別的部分我們就不再贅述, 我們來看看 ChannelInitializer.initChannel 部分即可:

.handler(new ChannelInitializer() {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();
        p.addLast(new IdleStateHandler(0, 0, 5));
        p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
        p.addLast(new ClientHandler());
    }
});

我們給 pipeline 添加了三個(gè) Handler, IdleStateHandler 這個(gè) handler 是心跳機(jī)制的核心, 我們?yōu)榭蛻舳硕嗽O(shè)置了讀寫 idle 超時(shí), 時(shí)間間隔是5s, 即如果客戶端在間隔 5s 后都沒有收到服務(wù)器的消息或向服務(wù)器發(fā)送消息, 則產(chǎn)生 ALL_IDLE 事件.
接下來我們添加了 LengthFieldBasedFrameDecoder, 它是負(fù)責(zé)解析我們的 TCP 報(bào)文, 因?yàn)楹捅疚牡哪康臒o關(guān), 因此這里不詳細(xì)展開.
最后一個(gè) Handler 是 ClientHandler, 它繼承于 CustomHeartbeatHandler, 是我們處理業(yè)務(wù)邏輯部分.

客戶端 Handler
public class ClientHandler extends CustomHeartbeatHandler {
    public ClientHandler() {
        super("client");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }
}

ClientHandler 繼承于 CustomHeartbeatHandler, 它重寫了兩個(gè)方法, 一個(gè)是 handleData, 在這里面實(shí)現(xiàn) 僅僅打印收到的消息.
第二個(gè)重寫的方法是 handleAllIdle. 我們在前面提到, 客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 當(dāng)客戶端產(chǎn)生一個(gè) ALL_IDLE 事件后, 會導(dǎo)致父類的 CustomHeartbeatHandler.userEventTriggered 調(diào)用, 而 userEventTriggered 中會根據(jù) e.state() 來調(diào)用不同的方法, 因此最后調(diào)用的是 ClientHandler.handleAllIdle, 在這個(gè)方法中, 客戶端調(diào)用 sendPingMsg 向服務(wù)器發(fā)送一個(gè) PING 消息.

服務(wù)器部分 服務(wù)器初始化
public class Server {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(10, 0, 0));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ServerHandler());
                        }
                    });

            Channel ch = bootstrap.bind(12345).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

服務(wù)器的初始化部分也沒有什么好說的, 它也和客戶端的初始化一樣, 為 pipeline 添加了三個(gè) Handler.

服務(wù)器 Handler
public class ServerHandler extends CustomHeartbeatHandler {
    public ServerHandler() {
        super("server");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
        byte[] data = new byte[buf.readableBytes() - 5];
        ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
        buf.skipBytes(5);
        buf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
        channelHandlerContext.write(responseBuf);
    }

    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        super.handleReaderIdle(ctx);
        System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
        ctx.close();
    }
}

ServerHandler 繼承于 CustomHeartbeatHandler, 它重寫了兩個(gè)方法, 一個(gè)是 handleData, 在這里面實(shí)現(xiàn) EchoServer 的功能: 即收到客戶端的消息后, 立即原封不動地將消息回復(fù)給客戶端.
第二個(gè)重寫的方法是 handleReaderIdle, 因?yàn)榉?wù)器僅僅對客戶端的讀 idle 感興趣, 因此只重新了這個(gè)方法. 若服務(wù)器在指定時(shí)間后沒有收到客戶端的消息, 則會觸發(fā) READER_IDLE 消息, 進(jìn)而會調(diào)用 handleReaderIdle 這個(gè)方法. 我們在前面提到, 客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 并且服務(wù)器的 READER_IDLE 的超時(shí)時(shí)間是客戶端發(fā)送 PING 消息的間隔的兩倍, 因此當(dāng)服務(wù)器 READER_IDLE 觸發(fā)時(shí), 就可以確定是客戶端已經(jīng)掉線了, 因此服務(wù)器直接關(guān)閉客戶端連接即可.

總結(jié)

使用 Netty 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵就是利用 IdleStateHandler 來產(chǎn)生對應(yīng)的 idle 事件.

一般是客戶端負(fù)責(zé)發(fā)送心跳的 PING 消息, 因此客戶端注意關(guān)注 ALL_IDLE 事件, 在這個(gè)事件觸發(fā)后, 客戶端需要向服務(wù)器發(fā)送 PING 消息, 告訴服務(wù)器"我還存活著".

服務(wù)器是接收客戶端的 PING 消息的, 因此服務(wù)器關(guān)注的是 READER_IDLE 事件, 并且服務(wù)器的 READER_IDLE 間隔需要比客戶端的 ALL_IDLE 事件間隔大(例如客戶端ALL_IDLE 是5s 沒有讀寫時(shí)觸發(fā), 因此服務(wù)器的 READER_IDLE 可以設(shè)置為10s)

當(dāng)服務(wù)器收到客戶端的 PING 消息時(shí), 會發(fā)送一個(gè) PONG 消息作為回復(fù). 一個(gè) PING-PONG 消息對就是一個(gè)心跳交互.

實(shí)現(xiàn)客戶端的斷線重連
public class Client {
    private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
    private Channel channel;
    private Bootstrap bootstrap;

    public static void main(String[] args) throws Exception {
        Client client = new Client();
        client.start();
        client.sendData();
    }

    public void sendData() throws Exception {
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < 10000; i++) {
            if (channel != null && channel.isActive()) {
                String content = "client msg " + i;
                ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                channel.writeAndFlush(buf);
            }

            Thread.sleep(random.nextInt(20000));
        }
    }

    public void start() {
        try {
            bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler(Client.this));
                        }
                    });
            doConnect();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void doConnect() {
        if (channel != null && channel.isActive()) {
            return;
        }

        ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);

        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (futureListener.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("Connect to server successfully!");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");

                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConnect();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });
    }

}

上面的代碼中, 我們抽象出 doConnect 方法, 它負(fù)責(zé)客戶端和服務(wù)器的 TCP 連接的建立, 并且當(dāng) TCP 連接失敗時(shí), doConnect 會 通過 "channel().eventLoop().schedule" 來延時(shí)10s 后嘗試重新連接.

客戶端 Handler
public class ClientHandler extends CustomHeartbeatHandler {
    private Client client;
    public ClientHandler(Client client) {
        super("client");
        this.client = client;
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        client.doConnect();
    }
}

斷線重連的關(guān)鍵一點(diǎn)是檢測連接是否已經(jīng)斷開. 因此我們改寫了 ClientHandler, 重寫了 channelInactive 方法. 當(dāng) TCP 連接斷開時(shí), 會回調(diào) channelInactive 方法, 因此我們在這個(gè)方法中調(diào)用 client.doConnect() 來進(jìn)行重連.

完整代碼可以在我的 Github github.com/yongshun/some_java_code 上找到.

本文由 yongshun 發(fā)表于個(gè)人博客, 采用署名-非商業(yè)性使用-相同方式共享 3.0 中國大陸許可協(xié)議.
非商業(yè)轉(zhuǎn)載請注明作者及出處. 商業(yè)轉(zhuǎn)載請聯(lián)系作者本人
Email: yongshun1228@gmail.com
本文標(biāo)題為: 淺析 Netty 實(shí)現(xiàn)心跳機(jī)制與斷線重連
本文鏈接為: https://segmentfault.com/a/1190000006931568

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/65168.html

相關(guān)文章

  • Netty實(shí)現(xiàn)心跳檢測斷線重連

    摘要:使用實(shí)現(xiàn)心跳機(jī)制代碼環(huán)境和具體思路如下使用提供的來檢測讀寫操作的空閑時(shí)間使用序列化客戶端空閑后向服務(wù)端發(fā)送一個(gè)心跳包服務(wù)端空閑后心跳丟失計(jì)數(shù)器丟失的心跳包數(shù)量當(dāng)丟失的心跳包數(shù)量超過個(gè)時(shí),主動斷開該客戶端的斷開連接后,客戶端之后重新連接代碼已 使用Netty實(shí)現(xiàn)心跳機(jī)制 代碼環(huán)境:JDK1.8和Netty4.x 具體思路如下: 使用Netty提供的IdleStateHandler來檢測...

    RobinQu 評論0 收藏0
  • 使用Netty,我們到底在開發(fā)些什么?

    摘要:比如面向連接的功能包發(fā)送接收數(shù)量包發(fā)送接收速率錯(cuò)誤計(jì)數(shù)連接重連次數(shù)調(diào)用延遲連接狀態(tài)等。你要處理的,就是心跳超時(shí)的邏輯,比如延遲重連。發(fā)生異常后,可以根據(jù)不同的類型選擇斷線重連比如一些二進(jìn)制協(xié)議的編解碼紊亂問題,或者調(diào)度到其他節(jié)點(diǎn)。 在java界,netty無疑是開發(fā)網(wǎng)絡(luò)應(yīng)用的拿手菜。你不需要太多關(guān)注復(fù)雜的nio模型和底層網(wǎng)絡(luò)的細(xì)節(jié),使用其豐富的接口,可以很容易的實(shí)現(xiàn)復(fù)雜的通訊功能。 和...

    DesGemini 評論0 收藏0
  • 使用Netty,我們到底在開發(fā)些什么?

    摘要:比如面向連接的功能包發(fā)送接收數(shù)量包發(fā)送接收速率錯(cuò)誤計(jì)數(shù)連接重連次數(shù)調(diào)用延遲連接狀態(tài)等。你要處理的,就是心跳超時(shí)的邏輯,比如延遲重連。發(fā)生異常后,可以根據(jù)不同的類型選擇斷線重連比如一些二進(jìn)制協(xié)議的編解碼紊亂問題,或者調(diào)度到其他節(jié)點(diǎn)。 在java界,netty無疑是開發(fā)網(wǎng)絡(luò)應(yīng)用的拿手菜。你不需要太多關(guān)注復(fù)雜的nio模型和底層網(wǎng)絡(luò)的細(xì)節(jié),使用其豐富的接口,可以很容易的實(shí)現(xiàn)復(fù)雜的通訊功能。 和...

    MSchumi 評論0 收藏0
  • 長連接的心跳重連設(shè)計(jì)

    摘要:超過后則認(rèn)為服務(wù)端出現(xiàn)故障,需要重連。同時(shí)在每次心跳時(shí)候都用當(dāng)前時(shí)間和之前服務(wù)端響應(yīng)綁定到上的時(shí)間相減判斷是否需要重連即可。客戶端檢測到某個(gè)服務(wù)端遲遲沒有響應(yīng)心跳也能重連獲取一個(gè)新的連接。 showImg(https://segmentfault.com/img/remote/1460000017987884?w=800&h=536); 前言 說道心跳這個(gè)詞大家都不陌生,當(dāng)然不是指男女...

    dreamGong 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<