Netty中入站处理器(SimpleChannelInboundHandler 和 ChannelInboundHandlerAdapter)_伍华锋的博客-CSDN博客

文章目录

原博文,点击这里

1.类继承

在这里插入图片描述
SimpleChannelInboundHandler继承于ChannelInboundHandlerAdapter

  • ChannelInboundHandlerAdapter中包含channelRead()方法。
  • SimpleChannelInboundHandler重写了channelRead(),并且新增了channelRead0()方法,(在Netty5中channelRead0已被重命名为messageReceived)。

对于SimpleChannelInboundHandler这里只提供了一个模板,作用是把处理逻辑不变的内容写好在 channelRead(ctx,msg) 中,并且在里面调用 channelRead0 ,这样变化的内容通过抽象方法实现传递到子类中去了。

 @Override// 继承了 ChannelInboundHandlerAdapter#channelRead
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);// 模板方法,抽出可变的部分,具体实现在子类中
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);// 释放资源(保存消息的ByteBuf)
            }
        }
    }
     protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;

因此我们继承SimpleChannelInboundHandler后,处理入站的数据我们只需要重新实现channelRead0方法,当channelRead真正被调用的时候我们的逻辑才会被处理。这里使用的是模板模式,让主要的处理逻辑保持不变,让变化的步骤通过接口实现来完成

值得注意的是对于SimpleChannelInboundHandler入站的数据,当被读取之后可能会执行ReferenceCountUtil.release(msg)释放资源。底层是实现ReferenceCounted,当新的对象初始化的时候计数为1,retain()方法实现其他地方的引用计数加1,release()方法实现应用减一,当计数减少到0的时候会被显示清除,再次访问被清除的对象会出现访问冲突。因此,当我们实现自己的Handler的时候如果希望将客户端发送过来的数据发送到客户端,可能在上述finally中已经释放了资源(writeAndFlush是异步处理),所以会出现异常情况。

但是当我们实现的是ChannelInboundHandler类的时候,重写channelRead方法时,需要释放ByteBuf相关的内存,可以使用Netty提供了一个工具方法,ReferenceCountUtil.release()

如果在channelRead中写了ctx.write(接收到的内容),由于write是异步的,可能在channelRead返回之后,仍然没有完成,为此,扩展了ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息,消息在channelReadComplete(),当writeAndFlush方法被调用时释放。

理解

理解:一个处理器Handler继承SimpleChannelInboundHandler中,无论有没有重写channelRead()方法没有,当有数据的时候都会首先触发SimpleChannelInboundHandler中channelRead()方法,然后在channelRead()中会调用channelRead0()方法,因为他是一个接口,所以调用的事处理器Handler中重写的channelRead0()方法

    • *

2.使用区别:

在客户端的业务Handler继承的是SimpleChannelInboundHandler,而在服务器端继承的是ChannelInboundHandlerAdapter

  最主要的区别就是SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuffer资源(自动调用Bytebuffer.release())。而为何服务器端不能用呢,因为我们想让服务器把客户端请求的数据发送回去,而服务器端有可能在channelRead方法返回前还没有写完数据,因此不能让它自动release

2.1原因:

在客户端,当 channelRead0() 方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler负责释放指向保存该消息的ByteBuf的内存引用。

那为什么服务端不需要这样处理呢?

在EchoServerHandler中,你仍然需要将传入消息回送给发送者,而 write() 操作是异步的,直到 channelRead() 方法返回后可能仍然没有完成。为此,EchoServerHandler扩展了 ChannelInboundHandlerAdapter ,其在这个时间点上不会释放消息。

2.2ChannelInboundHandlerAdapter

EchoServerHandler#channelReadComplete,这是一个EchoServer小例子:
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        //将未决消息冲刷到远程节点,并且关闭该 Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

消息在 channelReadComplete() 方法中,当 writeAndFlush() 方法被调用时才被释放,我们点进去源码验证一下:

AbstractChannelHandlerContext#writeAndFlush,如下所示,最后的资源是在 writeAndFlush() 中被释放的。

    public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());// 跳转到另外一个重载的方法中
    }
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        if (msg == null) {// msg不能为空
            throw new NullPointerException("msg");
        }
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);// 释放资源(保存消息的ByteBuf)
            // cancelled
            return promise;
        }
        write(msg, true, promise);// 异步写操作
        return promise;
    }

2.3扩展 - ReferenceCountUtil

上面的源码中,最后资源是通过 ReferenceCountUtil 来释放的。也就是说,当我们需要释放ByteBuf相关内存的时候,也可以使用 ReferenceCountUtil#release()。

ReferenceCountUtil 底层实现是 ReferenceCounted ,当新的对象初始化的时候计数为1,retain() 方法被调用时引用计数加1,release()方法被调用时引用计数减1,当计数减少到0的时候会被显示清除,再次访问被清除的对象会出现访问冲突(这里想起了JVM判断对象是否存活的引用计数算法)。

ReferenceCountUtil#release:

    public static boolean release(Object msg) {
        if (msg instanceof ReferenceCounted) {
            return ((ReferenceCounted) msg).release();// Decreases the reference count by 1
        }
        return false;
    }

原网址: 访问
创建于: 2023-09-14 16:41:16
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论