diff --git a/whatsmars-rpc/README.md b/whatsmars-rpc/README.md index 8f1432e69263966e7606710636cb2c8d86121cf4..5b80b2efc823e843dada421035fddacf6db5b9ed 100644 --- a/whatsmars-rpc/README.md +++ b/whatsmars-rpc/README.md @@ -408,9 +408,7 @@ header的decode细节在`RocketMQSerializable`里 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() - // DefaultEventExecutorGroup - // defaultEventExecutorGroup用于执行pipeline里的channelHandlers - // 如果没有设置defaultEventExecutorGroup,则用workerGroup执行 + // DefaultEventExecutorGroup: optionalGroup (相当于netty提供的业务线程池) .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, diff --git a/whatsmars-rpc/whatsmars-netty/README.md b/whatsmars-rpc/whatsmars-netty/README.md index 6af19f8e2b7c5840192b6c99e522d535400d8fe6..1dfde0b63e6bc7d1e1a7ea6fcc67f0a55078239d 100644 --- a/whatsmars-rpc/whatsmars-netty/README.md +++ b/whatsmars-rpc/whatsmars-netty/README.md @@ -1,55 +1,76 @@ -# Netty Examples - -### EventLoop & EventExecutor -Netty的线程模型中有4个基础接口,它们分别是EventLoopGroup、EventLoop、EventExecutorGroup、EventExecutor。 -其中EventExecutor扩展自java.util.concurrent.ScheduledExecutorService接口,类似于线程池(执行)的职责, -而EventLoop首先继承自EventExecutor,并主要扩展了register方法,就是将通道Channel注册到Selector的方法。 -NioEventLoop,就是基于Nio的实现,不同OS实现不同,Linux是EpollEventLoop, Mac OS 是KQueueEventLoop。 -在这个类中有一个亮点,就是规避了JDK nio的一个bug,Selector select方法的空轮询,核心思想是,如果连续多少 -次(默认为512)在没有超时的情况就返回,并且已经准备就绪的键的数量为0,则认为发生了空轮询,如果发生了空轮询, -就新建一个新的Selector,并重新将通道,关心的事件注册到新的Selector, 并关闭旧的Selector。 +### Netty线程模型 +Netty的代码很抽象,适合debug和UML跟 +RocketMQ `NettyRemotingServer` ```java -public interface EventExecutorGroup extends ScheduledExecutorService, Iterable - -public interface EventExecutor extends EventExecutorGroup - -public interface OrderedEventExecutor extends EventExecutor - -public interface EventLoopGroup extends EventExecutorGroup - -public interface EventLoop extends OrderedEventExecutor, EventLoopGroup + ServerBootstrap childHandler = + // EventLoopGroup: bossGroup, workerGroup + this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) + .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_KEEPALIVE, false) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) + .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) + .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + // DefaultEventExecutorGroup: optionalGroup (相当于netty提供的业务线程池) + .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, + new HandshakeHandler(TlsSystemConfig.tlsMode)) + .addLast(defaultEventExecutorGroup, + new NettyEncoder(), + new NettyDecoder(), + new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), + new NettyConnectManageHandler(), + // NettyServerHandler里有业务处理线程池 + new NettyServerHandler() + ); + } + }); ``` -### 4.x和3.x的线程模型最大区别 -4.x中ChannelPipeline中的Handler链统一由I/O线程串行调度,无论是读还是写操作,3.x中的write操作时 -由业务线程处理Handler链。4.x中可以降低线程之间的上下文切换带来的时间消耗,但是3.x中业务线程可以并 -发执行Handler链。如果有一些耗时的Handler操作会导致4.x的效率低下,但是可以考虑将这些耗时操作放在业 -务线程最先执行,不放在Handler里处理。由于业务线程可以并发执行,同样也可以提高效率。 - -4.x的串行化线程模型设计 - -![img](https://github.com/javahongxi/static/blob/master/netty_01.png) - -一个NioEventLoop聚合了一个多路复用器Selector,因此可以处理成百上千的客户端连接,Netty的处理策略 -是每当有一个新的客户端接入,则从NioEventLoop线程组中顺序获取一个可用的NioEventLoop,当到达数组上 -限之后,重新返回到0,通过这种方式,可以基本保证各个NioEventLoop的负载均衡。一个客户端连接只注册到 -一个NioEventLoop上,这样就避免了多个I/O线程去并发操作它。 - -Netty通过串行化设计理念降低了用户的开发难度,提升了处理性能。利用线程组实现了多个串行化线程水平并 -行执行,线程之间并没有交集,这样既可以充分利用多核提升并行处理能力,同时避免了线程上下文的切换和并 -发保护带来的额外性能损耗。 +bossGroup,workerGroup,optionalGroup实质都用的是 `MultithreadEventExecutorGroup`, 其维护的nThreads个executor都是`ThreadPerTaskExecutor` +```java +protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { + if (nThreads <= 0) { + throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); + } + + // 使用者没传threadFactory时,executor = null + // 传了时,executor = new ThreadPerTaskExecutor(threadFactory) + if (executor == null) { + executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); + } + + children = new EventExecutor[nThreads]; + for (int i = 0; i < nThreads; i ++) { + boolean success = false; + try { + children[i] = newChild(executor, args); + success = true; + } catch (Exception e) { + // TODO: Think about if this is a good exception type + throw new IllegalStateException("failed to create a child event loop", e); + } finally { + // ... + } + } +} +``` -了解完了Netty 4的串行化设计理念之后,我们继续看Netty 3线程模型存在的问题,总结起来,它的主要问题如下: +![img](https://github.com/javahongxi/static/blob/master/netty_03.png) -- Inbound和Outbound实质都是I/O相关的操作,它们的线程模型竟然不统一,这给用户带来了更多的学习和使用成本; -- Outbound操作由业务线程执行,通常业务会使用线程池并行处理业务消息,这就意味着在某一个时刻会有多个业务线程同时操作ChannelHandler,我们需要对ChannelHandler进行并发保护,通常需要加锁。如果同步块的范围不当,可能会导致严重的性能瓶颈,这对开发者的技能要求非常高,降低了开发效率; -- Outbound操作过程中,例如消息编码异常,会产生Exception,它会被转换成Inbound的Exception并通知到ChannelPipeline,这就意味着业务线程发起了Inbound操作,即需要将该异常交由I/O线程去处理,从而产生了额外的线程的上下文切换!它打破了Inbound操作由I/O线程操作的模型,如果开发者按照Inbound操作只会由一个I/O线程执行的约束进行设计,则会发生线程并发访问安全问题。由于该场景只在特定异常时发生,因此错误非常隐蔽!一旦在生产环境中发生此类线程并发问题,定位难度和成本都非常大。 +![img](https://github.com/javahongxi/static/blob/master/netty_02.png) -### ServerBootstrap bind +`AbstractBootstrap` (main线程) ```java + // serverBootstrap.bind() private ChannelFuture doBind(final SocketAddress localAddress) { - final ChannelFuture regFuture = initAndRegister(); + final ChannelFuture regFuture = initAndRegister(); // 接下来看这里 final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; @@ -61,51 +82,20 @@ Netty通过串行化设计理念降低了用户的开发难度,提升了处理 doBind0(regFuture, channel, localAddress, promise); return promise; } else { - // Registration future is almost always fulfilled already, but just in case it's not. - final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); - regFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Throwable cause = future.cause(); - if (cause != null) { - // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an - // IllegalStateException once we try to access the EventLoop of the Channel. - promise.setFailure(cause); - } else { - // Registration was successful, so set the correct executor to use. - // See https://github.com/netty/netty/issues/2586 - promise.registered(); - - doBind0(regFuture, channel, localAddress, promise); - } - } - }); - return promise; + // ... } } -``` - -- 初始化一个通道,并注册,如果注册失败,直接返回。 -- 如果初始化并立即注册成功,执行doBind0方法,进行绑定 -- 如果未立即注册成功,则添加监听事件,等初始化并注册成功后,执行doBind0方法,这里是Netty对jdk -Future模式进行的扩展,引入事件机制 -### group().register(channel) -```java final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); - init(channel); // NioServerSocketChannel + init(channel); // 接下来看这里 } catch (Throwable t) { - if (channel != null) { - channel.unsafe().closeForcibly(); - return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); - } - return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); + // ... } - ChannelFuture regFuture = config().group().register(channel); + ChannelFuture regFuture = config().group().register(channel); // 看完init看这里 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); @@ -118,42 +108,460 @@ Future模式进行的扩展,引入事件机制 } ``` -根据Netty线程模型,group()返回的是EventLoopGroup,然后Netty会从该EventLoopGroup中获取下一个 -EventLoop来执行。由于程序入口使用的是NioServerSocketChannel,故本例最终会使用NioEventLoop -来作为事件处理器来服务,这里的register方法,其实现在NioEventLoop的父类SingleThreadEventLoop中。 +`ServerBootstrap` (main线程) +```java + @Override + void init(Channel channel) throws Exception { + // ... + + ChannelPipeline p = channel.pipeline(); + final EventLoopGroup currentChildGroup = childGroup; + final ChannelHandler currentChildHandler = childHandler; + final Entry, Object>[] currentChildOptions; + final Entry, Object>[] currentChildAttrs; + synchronized (childOptions) { + currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); + } + synchronized (childAttrs) { + currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); + } + // ChannelInitializer的特点是执行initChannel后会随即从pipeline删除 + p.addLast(new ChannelInitializer() { + @Override + public void initChannel(final Channel ch) throws Exception { + final ChannelPipeline pipeline = ch.pipeline(); + ChannelHandler handler = config.handler(); + if (handler != null) { + pipeline.addLast(handler); + } + // 服务端channel的eventLoop去执行如下task,接下来我们关注eventLoop.execute(Runnable task)是怎么执行的 + ch.eventLoop().execute(new Runnable() { + @Override + public void run() { + pipeline.addLast(new ServerBootstrapAcceptor( + ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); + } + }); + } + }); + } +``` + +`SingleThreadEventExecutor` ```java + private final Queue taskQueue; + + private volatile Thread thread; + + private final Executor executor; // ThreadPerTaskExecutor + @Override - public ChannelFuture register(Channel channel) { - return register(new DefaultChannelPromise(channel, this)); + public void execute(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + + // this.thread == Thread.currentThread() + // 第一次为false,因为刚开始thread为null + boolean inEventLoop = inEventLoop(); + addTask(task); // taskQueue + if (!inEventLoop) { + startThread(); // 接下来看这个 + if (isShutdown() && removeTask(task)) { + reject(); + } + } + + if (!addTaskWakesUp && wakesUpForTask(task)) { + wakeup(inEventLoop); + } } +``` + +```java + private void doStartThread() { + assert thread == null; + // ThreadPerTaskExecutor + executor.execute(new Runnable() { + @Override + public void run() { + // Thread.currentThread() == ThreadPerTaskExecutor创建的线程 + // 从这里我们知道,最初创建的NioEventLoop都是没有绑定线程的 + // 是在执行任务时由ThreadPerTaskExecutor创建线程赋给thread + // thread有值后,下次判断inEventLoop()就可能为true了(视具体线程环境) + thread = Thread.currentThread(); + if (interrupted) { + thread.interrupt(); + } + boolean success = false; + updateLastExecutionTime(); + try { + SingleThreadEventExecutor.this.run(); // 接下来看这个 + success = true; + } catch (Throwable t) { + logger.warn("Unexpected exception from an event executor: ", t); + } finally { + // ... + } + } + }); + } +``` + +`NioEventLoop` +```java @Override - public ChannelFuture register(final ChannelPromise promise) { - ObjectUtil.checkNotNull(promise, "promise"); - promise.channel().unsafe().register(this, promise); - return promise; + protected void run() { + for (;;) { + try { + // 我们以为是select(),其实可能是selectNow(),前者阻塞,后者非阻塞 + switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { + case SelectStrategy.CONTINUE: // -2 + continue; + + case SelectStrategy.BUSY_WAIT: // -3 + // fall-through to SELECT since the busy-wait is not supported with NIO + + case SelectStrategy.SELECT: // -1 + select(wakenUp.getAndSet(false)); + + if (wakenUp.get()) { + selector.wakeup(); + } + default: + } + // 当hasTasks()为true时,以上case都不满足,直接走到这里 + cancelledKeys = 0; + needsToSelectAgain = false; + final int ioRatio = this.ioRatio; + if (ioRatio == 100) { + try { + processSelectedKeys(); + } finally { + // Ensure we always run tasks. + runAllTasks(); + } + } else { // 默认 + final long ioStartTime = System.nanoTime(); + try { + processSelectedKeys(); // 接下来看这里 + } finally { + // Ensure we always run tasks. + final long ioTime = System.nanoTime() - ioStartTime; + runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 接下来看这里 + } + } + } catch (Throwable t) { + handleLoopException(t); + } + // ... + } } ``` -有关Channel的IO类操作,最终都会转发给Unsafe类去执行,直接进入到AbstractUnsafe中 +`DefaultSelectStrategy` +```java + @Override + public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { + return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; // 显然有task时返回值 >= 0 + } +``` +`NioEventLoop` ```java + private final IntSupplier selectNowSupplier = new IntSupplier() { @Override - public final void register(EventLoop eventLoop, final ChannelPromise promise) { - if (eventLoop == null) { - throw new NullPointerException("eventLoop"); + public int get() throws Exception { + return selectNow(); // >= 0 + } + }; + + private void processSelectedKeysOptimized() { + for (int i = 0; i < selectedKeys.size; ++i) { + final SelectionKey k = selectedKeys.keys[i]; + // null out entry in the array to allow to have it GC'ed once the Channel close + // See https://github.com/netty/netty/issues/2363 + selectedKeys.keys[i] = null; + // channel注册selector时传了attachment + final Object a = k.attachment(); + + if (a instanceof AbstractNioChannel) { + processSelectedKey(k, (AbstractNioChannel) a); // 接下来看这里 + } else { + @SuppressWarnings("unchecked") + NioTask task = (NioTask) a; + processSelectedKey(k, task); // 接下来看这里 + } + + if (needsToSelectAgain) { + // null out entries in the array to allow to have it GC'ed once the Channel close + // See https://github.com/netty/netty/issues/2363 + selectedKeys.reset(i + 1); + + selectAgain(); + i = -1; } - if (isRegistered()) { - promise.setFailure(new IllegalStateException("registered to an event loop already")); + } + } + + private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { + final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); + if (!k.isValid()) { + final EventLoop eventLoop; + try { + eventLoop = ch.eventLoop(); + } catch (Throwable ignored) { return; } - if (!isCompatible(eventLoop)) { - promise.setFailure( - new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); + if (eventLoop != this || eventLoop == null) { return; } + unsafe.close(unsafe.voidPromise()); + return; + } + + try { + int readyOps = k.readyOps(); + // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise + // the NIO JDK channel implementation may throw a NotYetConnectedException. + if ((readyOps & SelectionKey.OP_CONNECT) != 0) { + // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking + // See https://github.com/netty/netty/issues/924 + int ops = k.interestOps(); + ops &= ~SelectionKey.OP_CONNECT; + k.interestOps(ops); + + unsafe.finishConnect(); + } + + // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. + if ((readyOps & SelectionKey.OP_WRITE) != 0) { + // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write + ch.unsafe().forceFlush(); + } + + // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead + // to a spin loop + if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { + unsafe.read(); // 接下来看这里 + } + } catch (CancelledKeyException ignored) { + unsafe.close(unsafe.voidPromise()); + } + } + + private static void processSelectedKey(SelectionKey k, NioTask task) { + int state = 0; + try { + task.channelReady(k.channel(), k); // 接下来看这里 + state = 1; + } catch (Exception e) { + k.cancel(); + invokeChannelUnregistered(task, k, e); + state = 2; + } finally { + switch (state) { + case 0: + k.cancel(); + invokeChannelUnregistered(task, k, null); + break; + case 1: + if (!k.isValid()) { // Cancelled by channelReady() + invokeChannelUnregistered(task, k, null); + } + break; + } + } + } +``` + +`SingleThreadEventExecutor` +```java + /** + * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running + * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. + */ + protected boolean runAllTasks(long timeoutNanos) { + fetchFromScheduledTaskQueue(); // scheduledTask会转移到taskQueue + Runnable task = pollTask(); + if (task == null) { + afterRunningAllTasks(); + return false; + } + + final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; + long runTasks = 0; + long lastExecutionTime; + for (;;) { + safeExecute(task); // Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}. + + runTasks ++; + + // Check timeout every 64 tasks because nanoTime() is relatively expensive. + // XXX: Hard-coded value - will make it configurable if it is really a problem. + if ((runTasks & 0x3F) == 0) { + lastExecutionTime = ScheduledFutureTask.nanoTime(); + if (lastExecutionTime >= deadline) { + break; + } + } + + task = pollTask(); + if (task == null) { + lastExecutionTime = ScheduledFutureTask.nanoTime(); + break; + } + } + + afterRunningAllTasks(); + this.lastExecutionTime = lastExecutionTime; + return true; + } +``` + +`AbstractNioMessageChannel` +```java + private final class NioMessageUnsafe extends AbstractNioUnsafe { + + private final List readBuf = new ArrayList(); + + @Override + public void read() { + assert eventLoop().inEventLoop(); + final ChannelConfig config = config(); + final ChannelPipeline pipeline = pipeline(); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.reset(config); + + boolean closed = false; + Throwable exception = null; + try { + try { + do { + int localRead = doReadMessages(readBuf); // 接下来看这里 + if (localRead == 0) { + break; + } + if (localRead < 0) { + closed = true; + break; + } + + allocHandle.incMessagesRead(localRead); + } while (allocHandle.continueReading()); // 控制连接接入速率 + } catch (Throwable t) { + exception = t; + } + + int size = readBuf.size(); + for (int i = 0; i < size; i ++) { + readPending = false; + // 此处的pipeline是服务端channel的 + // 传播channelRead事件触发acceptorHandler的channelRead方法 + pipeline.fireChannelRead(readBuf.get(i)); + } + readBuf.clear(); + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + + if (exception != null) { + closed = closeOnReadError(exception); + + pipeline.fireExceptionCaught(exception); + } + + if (closed) { + inputShutdown = true; + if (isOpen()) { + close(voidPromise()); + } + } + } finally { + // Check if there is a readPending which was not processed yet. + // This could be for two reasons: + // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method + // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method + // + // See https://github.com/netty/netty/issues/2254 + if (!readPending && !config.isAutoRead()) { + removeReadOp(); + } + } + } + } +``` + +`NioServerSocketChannel` +```java + @Override + protected int doReadMessages(List buf) throws Exception { + SocketChannel ch = SocketUtils.accept(javaChannel()); // 终于找到你 + + try { + if (ch != null) { + buf.add(new NioSocketChannel(this, ch)); // 构造NioSocketChannel + return 1; + } + } catch (Throwable t) { + // ... + } + + return 0; + } +``` + +`ServerBootstrapAcceptor` +```java + @Override + @SuppressWarnings("unchecked") + public void channelRead(ChannelHandlerContext ctx, Object msg) { + final Channel child = (Channel) msg; // 客户端channel + + child.pipeline().addLast(childHandler); // ChannelInitializer + + setChannelOptions(child, childOptions, logger); // TCP参数 + + for (Entry, Object> e: childAttrs) { // 自定义属性 + child.attr((AttributeKey) e.getKey()).set(e.getValue()); + } + + try { + // 将客户端channel注册到workerGroup上 + childGroup.register(child).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + forceClose(child, future.cause()); + } + } + }); + } catch (Throwable t) { + forceClose(child, t); + } + } +``` + +`SingleThreadEventLoop` +```java + @Override + public ChannelFuture register(Channel channel) { + return register(new DefaultChannelPromise(channel, this)); + } + + @Override + public ChannelFuture register(final ChannelPromise promise) { + ObjectUtil.checkNotNull(promise, "promise"); + promise.channel().unsafe().register(this, promise); // 接下来看这里 + return promise; + } +``` +`AbstractChannel` +```java + @Override + public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { @@ -163,16 +571,11 @@ EventLoop来执行。由于程序入口使用的是NioServerSocketChannel,故本 eventLoop.execute(new Runnable() { @Override public void run() { - register0(promise); + register0(promise); // 接下来看这里 } }); } catch (Throwable t) { - logger.warn( - "Force-closing a channel whose registration task was not accepted by an event loop: {}", - AbstractChannel.this, t); - closeForcibly(); - closeFuture.setClosed(); - safeSetFailure(promise, t); + // ... } } } @@ -185,7 +588,7 @@ EventLoop来执行。由于程序入口使用的是NioServerSocketChannel,故本 return; } boolean firstRegistration = neverRegistered; - doRegister(); // @@@@@ 1 + doRegister(); // 接下来看这里 neverRegistered = false; registered = true; @@ -194,12 +597,12 @@ EventLoop来执行。由于程序入口使用的是NioServerSocketChannel,故本 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); - pipeline.fireChannelRegistered(); // @@@@@ 2 + pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { - pipeline.fireChannelActive(); + pipeline.fireChannelActive(); // 接下来看这里 } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. @@ -217,39 +620,129 @@ EventLoop来执行。由于程序入口使用的是NioServerSocketChannel,故本 } ``` -完成Channel的注册外,需要调用管道的pipline.fireChannelRegistered,跟踪进去,最终将执行 -DefaultChannelHandlerInvoker的invokeChannelRegistered方法,该方法会执行ChannelInitializer的init方法。 - -### ChannelPipeline -设计理念:ChannelPipeline管道,提供相应的API,增加ChannelHander形成处理链条,在DefaultChannelPipeline -中并不是用一个LikedList 来实现链表,而是在其自身就是一个链表结构,链表的节点是 -AbstractChannelHandlerContext,里面有next,与perv分别指向下一个或上一个节点。 -在DefaultChannelHanderPipeline中持有tail与head引用。 - +`AbstractNioChannel` ```java - public ChannelPipeline fireChannelRegistered() { - head.fireChannelRegistered(); - return this; + @Override + protected void doRegister() throws Exception { + boolean selected = false; + for (;;) { + try { + selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 终于见到你 + return; + } catch (CancelledKeyException e) { + if (!selected) { + // Force the Selector to select now as the "canceled" SelectionKey may still be + // cached and not removed because no Select.select(..) operation was called yet. + eventLoop().selectNow(); + selected = true; + } else { + // We forced a select operation on the selector before but the SelectionKey is still cached + // for whatever reason. JDK bug ? + throw e; + } + } + } } +``` + +`DefaultChannelPipeline` +```java @Override - public ChannelPipeline read() { - tail.read(); + public final ChannelPipeline fireChannelActive() { + AbstractChannelHandlerContext.invokeChannelActive(head); return this; } - @Override - public ChannelFuture write(Object msg) { - return tail.write(msg); +``` + +`AbstractChannelHandlerContext` +```java + static void invokeChannelActive(final AbstractChannelHandlerContext next) { + EventExecutor executor = next.executor(); + if (executor.inEventLoop()) { + next.invokeChannelActive(); // 接下来开这里 + } else { + executor.execute(new Runnable() { + @Override + public void run() { + next.invokeChannelActive(); + } + }); + } } + + private void invokeChannelActive() { + if (invokeHandler()) { + try { + ((ChannelInboundHandler) handler()).channelActive(this); // HeadContext#channelActive + } catch (Throwable t) { + notifyHandlerException(t); + } + } else { + fireChannelActive(); + } + } +``` + +`HeadContext` +```java + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelActive(); + + readIfIsAutoRead(); // 看这里 + } + + private void readIfIsAutoRead() { + if (channel.config().isAutoRead()) { + channel.read(); // 看这里 + } + } + + @Override + public void read(ChannelHandlerContext ctx) { + unsafe.beginRead(); // 看这里 + } +``` + +`AbstractUnsafe` +```java + @Override + public final void beginRead() { + assertEventLoop(); + + if (!isActive()) { + return; + } + + try { + doBeginRead(); // 看这里 + } catch (final Exception e) { + invokeLater(new Runnable() { + @Override + public void run() { + pipeline.fireExceptionCaught(e); + } + }); + close(voidPromise()); + } + } ``` -从上述方法,不难看出,ChannelPipeline的实现源码,就是沿着调用链向上或向下传播事件并执行之。 -其实ChannelPipeline并没有什么高深的地方,其设计哲学就是职责链模式。 +`AbstractNioChannel` +```java + @Override + protected void doBeginRead() throws Exception { + // Channel.read() or ChannelHandlerContext.read() was called + final SelectionKey selectionKey = this.selectionKey; + if (!selectionKey.isValid()) { + return; + } -### ChannelHandlers的执行顺序 -pipeline里的handlers分为两类,分别实现了ChannelInboundHandler和ChannelOutboundHandler接口。 -ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等; -ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。 -ChannelInboundHandler按照注册的先后顺序执行,ChannelOutboundHandler按照注册的先后顺序逆序执行。 + readPending = true; -### 线程安全 -ChannelPipeline与SocketChannel绑定,是线程安全的。标注@Sharable的ChannelHandler必须是线程安全的,如ChannelInitializer。 + final int interestOps = selectionKey.interestOps(); + if ((interestOps & readInterestOp) == 0) { + selectionKey.interestOps(interestOps | readInterestOp); // 找到你了! + } + } +``` \ No newline at end of file