一文搞懂Netty发送数据全流程 | 你想知道的细节全在这里

我们都知道 Netty 是一款高性能的异步事件驱动的网络通讯框架,既然是网络通讯框架那么它主要做的事情就是:

  • 接收客户端连接。

  • 读取连接上的网络请求数据。

  • 向连接发送网络响应数据。

前边系列文章在介绍Netty的启动以及接收连接的过程中,我们只看到 OP_ACCEPT 事件以及 OP_READ 事件的注册,并未看到 OP_WRITE 事件的注册。

  • 那么在什么情况下 Netty 才会向 SubReactor 去注册 OP_WRITE 事件呢?

  • Netty 又是怎么对写操作做到异步处理的呢?

本文笔者将会为大家一一揭晓这些谜底。我们还是以之前的 EchoServer 为例进行说明。

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //此处的msg就是Netty在read loop中从NioSocketChannel中读取到的ByteBuffer
        ctx.write(msg);
    }

}

我们将在 《Netty如何高效接收网络数据》 一文中读取到的 ByteBuffer (这里的 Object msg),直接发送回给客户端,用这个简单的例子来揭开 Netty 如何发送数据的序幕~~

在实际开发中,我们首先要通过解码器将读取到的 ByteBuffer 解码转换为我们的业务 Request 类,然后在业务线程中做业务处理,在通过编码器对业务 Response 类编码为 ByteBuffer ,最后利用 ChannelHandlerContext ctx 的引用发送响应数据。

本文我们只聚焦 Netty 写数据的过程,对于 Netty 编解码相关的内容,笔者会在后续的文章中专门介绍。

本文概要.png

1. ChannelHandlerContext

pipeline结构.png

通过前面几篇文章的介绍,我们知道 Netty 会为每个 Channel 分配一个 pipeline ,pipeline 是一个双向链表的结构。Netty 中产生的 IO 异步事件会在这个 pipeline 中传播。

Netty 中的 IO 异步事件大体上分为两类:

  • inbound事件:入站事件,比如前边介绍的 ChannelActive 事件, ChannelRead 事件,它们会从 pipeline 的头结点 HeadContext 开始一直向后传播。

  • outbound事件:出站事件,比如本文中即将要介绍到的 write事件 以及 flush 事件,出站事件会从相反的方向从后往前传播直到 HeadContext 。最终会在 HeadContext 中完成出站事件的处理。

    • 本例中用到的 channelHandlerContext.write()  会使 write 事件从当前 ChannelHandler 也就是这里的 EchoServerHandler 开始沿着 pipeline 向前传播。

    • 而 channelHandlerContext.channel().write() 则会使 write 事件从 pipeline 的尾结点 TailContext 开始向前传播直到 HeadContext 。

客户端channel pipeline结构.png

而 pipeline 这样一个双向链表数据结构中的类型正是 ChannelHandlerContext  ,由 ChannelHandlerContext 包裹我们自定义的 IO 处理逻辑 ChannelHandler。

ChannelHandler 并不需要感知到它所处的 pipeline 中的上下文信息,只需要专心处理好 IO 逻辑即可,关于 pipeline 的上下文信息全部封装在 ChannelHandlerContext中。

ChannelHandler 在 Netty 中的作用只是负责处理 IO 逻辑,比如编码,解码。它并不会感知到它在 pipeline 中的位置,更不会感知和它相邻的两个 ChannelHandler。 事实上 ChannelHandler也并不需要去关心这些,它唯一需要关注的就是处理所关心的异步事件

而 ChannelHandlerContext 中维护了 pipeline 这个双向链表中的 pre 以及 next 指针,这样可以方便的找到与其相邻的 ChannelHandler ,并可以过滤出一些符合执行条件的 ChannelHandler。正如它的命名一样, ChannelHandlerContext  正是起到了维护 ChannelHandler 上下文的一个作用。而 Netty 中的异步事件在 pipeline 中的传播靠的就是这个 ChannelHandlerContext 。

这样设计就使得 ChannelHandlerContext 和 ChannelHandler 的职责单一,各司其职,具有高度的可扩展性。

2. write事件的传播

我们无论是在业务线程或者是在 SubReactor 线程中完成业务处理后,都需要通过 channelHandlerContext 的引用将 write事件在 pipeline 中进行传播。然后在 pipeline 中相应的 ChannelHandler 中监听 write 事件从而可以对 write事件进行自定义编排处理(比如我们常用的编码器),最终传播到 HeadContext 中执行发送数据的逻辑操作。

前边也提到 Netty 中有两个触发 write 事件传播的方法,它们的传播处理逻辑都是一样的,只不过它们在 pipeline 中的 传播起点 是不同的。

  • channelHandlerContext.write() 方法会从当前 ChannelHandler 开始在 pipeline 中向前传播 write 事件直到 HeadContext。

  • channelHandlerContext.channel().write() 方法则会从 pipeline 的尾结点 TailContext 开始在 pipeline 中向前传播 write 事件直到 HeadContext 。

客户端channel pipeline结构.png

在我们清楚了 write 事件的总体传播流程后,接下来就来看看在 write 事件传播的过程中Netty为我们作了些什么?这里我们以 channelHandlerContext.write() 方法为例说明。

3. write方法发送数据

write事件传播流程.png

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    @Override
    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }

    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        write(msg, false, promise);
        return promise;
    }

}

这里我们看到 Netty 的写操作是一个异步操作,当我们在业务线程中调用 channelHandlerContext.write() 后,Netty 会给我们返回一个 ChannelFuture,我们可以在这个 ChannelFutrue 中添加 ChannelFutureListener ,这样当 Netty 将我们要发送的数据发送到底层 Socket 中时,Netty 会通过 ChannelFutureListener 通知我们写入结果。

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        //此处的msg就是Netty在read loop中从NioSocketChannel中读取到的ByteBuffer
        ChannelFuture future = ctx.write(msg);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                     处理异常情况
                } else {                    
                     写入Socket成功后,Netty会通知到这里
                }
            }
        });
}

当异步事件在 pipeline 传播的过程中发生异常时,异步事件就会停止在 pipeline 中传播。所以我们在日常开发中,需要对写操作异常情况进行处理。

  • 其中 inbound 类异步事件发生异常时, 会触发exceptionCaught事件传播 。exceptionCaught 事件本身也是一种 inbound 事件,传播方向会从当前发生异常的 ChannelHandler 开始一直向后传播直到 TailContext。

  • 而 outbound 类异步事件发生异常时, 则不会触发exceptionCaught事件传播 。一般只是通知相关 ChannelFuture。但如果是 flush 事件在传播过程中发生异常,则会触发当前发生异常的 ChannelHandler 中 exceptionCaught 事件回调。

我们继续回归到写操作的主线上来~~~

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");

        ................省略检查promise的有效性...............

        //flush = true 表示channelHandler中调用的是writeAndFlush方法,这里需要找到pipeline中覆盖write或者flush方法的channelHandler
        //flush = false 表示调用的是write方法,只需要找到pipeline中覆盖write方法的channelHandler
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        //用于检查内存泄露
        final Object m = pipeline.touch(msg, next);
        //获取pipeline中下一个要被执行的channelHandler的executor
        EventExecutor executor = next.executor();
        //确保OutBound事件由ChannelHandler指定的executor执行
        if (executor.inEventLoop()) {
            //如果当前线程正是channelHandler指定的executor则直接执行
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            //如果当前线程不是ChannelHandler指定的executor,则封装成异步任务提交给指定executor执行,注意这里的executor不一定是reactor线程。
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }
    }

write 事件要向前在 pipeline 中传播,就需要在 pipeline 上找到下一个具有执行资格的 ChannelHandler,因为位于当前 ChannelHandler 前边的可能是 ChannelInboundHandler 类型的也可能是 ChannelOutboundHandler 类型的 ChannelHandler ,或者有可能压根就不关心 write 事件的 ChannelHandler(没有实现write回调方法)。

write事件的传播.png

这里我们就需要通过 findContextOutbound 方法在当前 ChannelHandler 的前边找到 ChannelOutboundHandler 类型并且覆盖实现 write 回调方法的 ChannelHandler 作为下一个要执行的对象。

3.1 findContextOutbound

  private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        //获取当前ChannelHandler的executor
        EventExecutor currentExecutor = executor();
        do {
            //获取前一个ChannelHandler
            ctx = ctx.prev;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
        return ctx;
    }
    //判断前一个ChannelHandler是否具有响应Write事件的资格
    private static boolean skipContext(
            AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {

        return (ctx.executionMask & (onlyMask | mask)) == 0 ||
                (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
    }

findContextOutbound 方法接收的参数是一个掩码,这个掩码表示要向前查找具有什么样执行资格的 ChannelHandler。因为我们这里调用的是 ChannelHandlerContext 的 write 方法所以 flush = false,传递进来的掩码为 MASK_WRITE,表示我们要向前查找覆盖实现了 write 回调方法的 ChannelOutboundHandler。

3.1.1 掩码的巧妙应用

Netty 中将 ChannelHandler 覆盖实现的一些异步事件回调方法用 int 型的掩码来表示,这样我们就可以通过这个掩码来判断当前 ChannelHandler 具有什么样的执行资格。

final class ChannelHandlerMask {
    ....................省略......................

    static final int MASK_CHANNEL_ACTIVE = 1 << 3;
    static final int MASK_CHANNEL_READ = 1 << 5;
    static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
    static final int MASK_WRITE = 1 << 15;
    static final int MASK_FLUSH = 1 << 16;

   //outbound事件掩码集合
   static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
    ....................省略......................
}

在 ChannelHandler 被添加进 pipeline 的时候,Netty 会根据当前 ChannelHandler 的类型以及其覆盖实现的异步事件回调方法,通过 | 运算 向 ChannelHandlerContext#executionMask 字段添加该 ChannelHandler 的执行资格。

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    //ChannelHandler执行资格掩码
    private final int executionMask;

    ....................省略......................
}

类似的掩码用法其实我们在前边的文章 ? 《一文聊透Netty核心引擎Reactor的运转架构》 中也提到过,在 Channel 向对应的 Reactor 注册自己感兴趣的 IO 事件时,也是用到了一个 int 型的掩码 interestOps 来表示 Channel 感兴趣的 IO 事件集合。

    @Override
    protected void doBeginRead() throws Exception {

        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
         * 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            //注册监听OP_ACCEPT或者OP_READ事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
  • 用 & 操作判断,某个事件是否在事件集合中: (readyOps & SelectionKey.OP_CONNECT) != 0

  • 用 | 操作向事件集合中添加事件: interestOps | readInterestOp

  • 从事件集合中删除某个事件,是通过先将要删除事件取反 ~ ,然后在和事件集合做 & 操作: ops &= ~SelectionKey.OP_CONNECT

这部分内容笔者会在下篇文章全面介绍 pipeline 的时候详细讲解,大家这里只需要知道这里的掩码就是表示一个执行资格的集合。当前 ChannelHandler 的执行资格存放在它的 ChannelHandlerContext 中的 executionMask 字段中。

3.1.2 向前查找具有执行资格的ChannelOutboundHandler

  private AbstractChannelHandlerContext findContextOutbound(int mask) {
        //当前ChannelHandler
        AbstractChannelHandlerContext ctx = this;
        //获取当前ChannelHandler的executor
        EventExecutor currentExecutor = executor();
        do {
            //获取前一个ChannelHandler
            ctx = ctx.prev;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
        return ctx;
    }

    //判断前一个ChannelHandler是否具有响应Write事件的资格
    private static boolean skipContext(
            AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {

        return (ctx.executionMask & (onlyMask | mask)) == 0 ||
                (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
    }

前边我们提到 ChannelHandlerContext 不仅封装了 ChannelHandler 的执行资格掩码还可以感知到当前 ChannelHandler 在 pipeline 中的位置,因为 ChannelHandlerContext 中维护了前驱指针 prev 以及后驱指针 next。

这里我们需要在 pipeline 中传播 write 事件,它是一种 outbound 事件,所以需要向前传播,这里通过 ChannelHandlerContext 的前驱指针 prev 拿到当前 ChannelHandler 在 pipeline 中的前一个节点。

ctx = ctx.prev;

通过 skipContext 方法判断前驱节点是否具有执行的资格。如果没有执行资格则跳过继续向前查找。如果具有执行资格则返回并响应 write 事件。

在 write 事件传播场景中,执行资格指的是前驱 ChannelHandler 是否是ChannelOutboundHandler 类型的,并且它是否覆盖实现了 write 事件回调方法。

public class EchoChannelHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
    }
}

3.1.3 skipContext

该方法主要用来判断当前 ChannelHandler 的前驱节点是否具有 mask 掩码中包含的事件响应资格。

方法参数中有两个比较重要的掩码:

  • int onlyMask :用来指定当前 ChannelHandler 需要符合的类型。其中MASK_ONLY_OUTBOUND 为 ChannelOutboundHandler 类型的掩码, MASK_ONLY_INBOUND 为 ChannelInboundHandler 类型的掩码。

final class ChannelHandlerMask {

    //outbound事件的掩码集合
    static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
            MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

    //inbound事件的掩码集合
    static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
}

比如本小节中我们是在介绍 write 事件的传播,那么就需要在当前ChannelHandler 前边首先是找到一个 ChannelOutboundHandler 类型的ChannelHandler。

ctx.executionMask & (onlyMask | mask)) == 0 用于判断前一个 ChannelHandler 是否为我们指定的 ChannelHandler 类型,在本小节中我们指定的是 onluMask = MASK_ONLY_OUTBOUND 即 ChannelOutboundHandler 类型。如果不是,这里就会直接跳过,继续在 pipeline 中向前查找。

  • int mask :用于指定前一个 ChannelHandler 需要实现的相关异步事件处理回调。在本小节中这里指定的是 MASK_WRITE ,即需要实现 write 回调方法。通过 (ctx.executionMask & mask) == 0 条件来判断前一个ChannelHandler 是否实现了 write 回调,如果没有实现这里就跳过,继续在 pipeline 中向前查找。

关于 skipContext 方法的详细介绍,笔者还会在下篇文章全面介绍 pipeline的时候再次进行介绍,这里大家只需要明白该方法的核心逻辑即可。

3.1.4 向前传播write事件

通过 findContextOutbound 方法我们在 pipeline 中找到了下一个具有执行资格的 ChannelHandler,这里指的是下一个 ChannelOutboundHandler 类型并且覆盖实现了 write 方法的 ChannelHandler。

Netty 紧接着会调用这个 nextChannelHandler 的 write 方法实现 write 事件在 pipeline 中的传播。

        //获取下一个要被执行的channelHandler指定的executor
        EventExecutor executor = next.executor();
        //确保outbound事件的执行 是由 channelHandler指定的executor执行的
        if (executor.inEventLoop()) {
            //如果当前线程是指定的executor 则直接操作
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            //如果当前线程不是channelHandler指定的executor,则封装程异步任务 提交给指定的executor执行
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }

在我们向 pipeline 添加 ChannelHandler 的时候可以通过 ChannelPipeline#addLast(EventExecutorGroup,ChannelHandler......) 方法指定执行该 ChannelHandler 的executor。如果不特殊指定,那么执行该 ChannelHandler 的executor默认为该 Channel 绑定的 Reactor 线程。

执行 ChannelHandler 中异步事件回调方法的线程必须是 ChannelHandler 指定的executor。

所以这里首先我们需要获取在 findContextOutbound 方法查找出来的下一个符合执行条件的 ChannelHandler 指定的executor。

EventExecutor executor = next.executor()

并通过 executor.inEventLoop() 方法判断当前线程是否是该 ChannelHandler 指定的 executor。

如果是,那么我们直接在当前线程中执行 ChannelHandler 中的 write 方法。

如果不是,我们就需要将 ChannelHandler 对 write 事件的回调操作封装成异步任务 WriteTask 并提交给 ChannelHandler 指定的 executor 中,由 executor 负责执行。

这里需要注意的是这个 executor 并不一定是 channel 绑定的 reactor 线程。它可以是我们自定义的线程池,不过需要我们通过 ChannelPipeline#addLast 方法进行指定,如果我们不指定,默认情况下执行 ChannelHandler 的 executor 才是 channel 绑定的 reactor 线程。

这里Netty需要确保 outbound 事件是由 channelHandler 指定的 executor 执行的。

这里有些同学可能会有疑问,如果我们向pipieline添加ChannelHandler的时候,为每个ChannelHandler指定不同的executor时,Netty如果确保线程安全呢??

大家还记得pipeline中的结构吗?

客户端channel pipeline结构.png

outbound 事件在 pipeline 中的传播最终会传播到 HeadContext 中,之前的系列文章我们提到过,HeadContext 中封装了 Channel 的 Unsafe 类负责 Channel 底层的 IO 操作。而 HeadContext 指定的 executor 正是对应 channel 绑定的 reactor 线程。

image.png

所以最终在 netty 内核中执行写操作的线程一定是 reactor 线程从而保证了线程安全性。

忘记这段内容的同学可以在回顾下 ? 《Reactor在Netty中的实现(创建篇)》 ,类似的套路我们在介绍 NioServerSocketChannel 进行 bind 绑定以及 register 注册的时候都介绍过,只不过这里将 executor 扩展到了自定义线程池的范围。

3.1.5 触发nextChannelHandler的write方法回调

write事件的传播1.png

            //如果当前线程是指定的executor 则直接操作
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }

由于我们在示例 ChannelHandler 中调用的是 ChannelHandlerContext#write 方法,所以这里的 flush = false 。触发调用 nextChannelHandler 的 write 方法。

    void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            // 当前channelHandler虽然添加到pipeline中,但是并没有调用handlerAdded
            // 所以不能调用当前channelHandler中的回调方法,只能继续向前传递write事件
            write(msg, promise);
        }
    }

这里首先需要通过 invokeHandler() 方法判断这个 nextChannelHandler 中的 han

  • 1
    点赞
  • 14
    收藏
    觉得还不错? 一键收藏
  • 2
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值