Netty新连接接入与NioSocketChannel分析

 2023-09-15 阅读 27 评论 0

摘要:原文链接:https://wangwei.one/posts/net... 前面的一些章节,我们分析了Netty的三大组件 —— Channel 、EventLoop、Pipeline ,对Netty的工作原理有了深入的了解。在此基础上,我们来分析一下当Netty服务端启动后,Netty是如何处理新连接接
原文链接:https://wangwei.one/posts/net...

前面的一些章节,我们分析了Netty的三大组件 —— Channel 、EventLoop、Pipeline ,对Netty的工作原理有了深入的了解。在此基础上,我们来分析一下当Netty服务端启动后,Netty是如何处理新连接接入的。

本文内容主要分为以下四部分:

  • 新连接检测
  • NioSocketChannel创建
  • NioSocketChannel初始化与注册
  • NioSocketChannel注册READ兴趣集

新连接检测

前面,我们在讲 EventLoop的启动过程源码分析 时,解读过下面这段代码:

public final class NioEventLoop extends SingleThreadEventLoop {...private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {...try {...if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {// 读取read事件unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}...}...}    

无法连接到这个网络。我们还是以服务端 NioServerSocketChannel 为例,它绑定的unsafe实例为 NioMessageUnsafe 。上面的 unsafe.read() 接口,会向下调用到 NioMessageUnsafe.read() 接口,如下:

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {...private final class NioMessageUnsafe extends AbstractNioUnsafe {// 用于保存新建立的 NioSocketChannel 的集合private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {// 确保在当前线程与EventLoop中的一致assert eventLoop().inEventLoop();// 获取 NioServerSocketChannel config配置final ChannelConfig config = config();// 获取 NioServerSocketChannel 绑定的 pipelinefinal ChannelPipeline pipeline = pipeline();// 获取RecvByteBuf 分配器 Handle// 当channel在接收数据时,allocHandle 会用于分配ByteBuf来保存数据// 关于allocHandle后面再去做详细介绍final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();// 重置已累积的所有计数器,并为下一个读取循环读取多少消息/字节数据提供建议allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {// 调用后面的 doReadMessages 接口,读取到message则返回1int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}// 对当前read循环所读取到的message数量计数+1allocHandle.incMessagesRead(localRead);// 判断是否继续读取message} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 调用pipeline传播ChannelRead事件pipeline.fireChannelRead(readBuf.get(i));}// 清空readBufreadBuf.clear();allocHandle.readComplete();// 调用pipeline传播 ChannelReadComplete 事件pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}...}    

对于 doReadMessages(...) 的分析:

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {...// 读取消息@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {// 获取 SocketChannel SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// 使用SocketChannel创建NioSocketChannel,将其存入buf list中// 关于NioSocketChannel的创建请看后面的分析buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}...}

对于 continueReading() 接口的分析,至于结果为什么返回false,后面会单独分析:

public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {private volatile int maxMessagesPerRead;private volatile boolean respectMaybeMoreData = true;...public abstract class MaxMessageHandle implements ExtendedHandle {private ChannelConfig config;// 每次读取最大的消息数private int maxMessagePerRead;private int totalMessages;private int totalBytesRead;private int attemptedBytesRead;private int lastBytesRead;private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {@Overridepublic boolean get() {return attemptedBytesRead == lastBytesRead;}};...// 判断是否继续读取message    @Overridepublic boolean continueReading() {return continueReading(defaultMaybeMoreSupplier);}// 判断是否继续读取message@Overridepublic boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {// 默认情况下 config.isAutoRead() 为true// respectMaybeMoreData 默认为 true// maybeMoreDataSupplier.get() 为false// totalMessages第一次循环则为1// maxMessagePerRead为16// 结果返回falsereturn config.isAutoRead() &&(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&totalMessages < maxMessagePerRead &&totalBytesRead > 0;}...}...}

NioSocketChannel创建

上面分析新连接接入,提到了 NioSocketChannel 的创建,我们这里来详细分析一下,NioSocketChannel的创建过程与此前我们分析 NioServerSocketChannel创建 大体类似。

构造器

手机无法连接wifi,先来看看 NioSocketChannel 的构造函数:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {...public NioSocketChannel(Channel parent, SocketChannel socket) {// 调用父类构造器super(parent, socket);// 创建NioSocketChannelConfigconfig = new NioSocketChannelConfig(this, socket.socket());}...}

父类 AbstractNioByteChannel 构造器:

public abstract class AbstractNioByteChannel extends AbstractNioChannel {...protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {// 调用父类构造器,并设置兴趣集为SelectionKey.OP_READ,对read事件感兴趣super(parent, ch, SelectionKey.OP_READ);}...}

父类 AbstractNioChannel 构造器:

public abstract class AbstractNioChannel extends AbstractChannel {...protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {// 调用父类构造器super(parent);// 设置channelthis.ch = ch;// 设置兴趣集this.readInterestOp = readInterestOp;try {// 设置为非阻塞ch.configureBlocking(false);} catch (IOException e) {...}}}

父类 AbstractChannel 构造器:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {...protected AbstractChannel(Channel parent) {// 设置parentthis.parent = parent;// 创建channelIdid = newId();// 创建unsafeunsafe = newUnsafe();// 创建pipelinepipeline = newChannelPipeline();}...
}

ChannelConfig创建

怎么连接热点,接着我们看看 NioSocketChannelConfig 的创建逻辑:

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {...private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {// 调用父类构造器super(channel, javaSocket);calculateMaxBytesPerGatheringWrite();}...}

父类 DefaultSocketChannelConfig 构造器:

public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {...public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {// 调用父类构造器,绑定socketchannel super(channel);if (javaSocket == null) {throw new NullPointerException("javaSocket");}// 绑定java socketthis.javaSocket = javaSocket;// Enable TCP_NODELAY by default if possible.// netty一般运行在服务器上,不在Android上,canEnableTcpNoDelayByDefault返回trueif (PlatformDependent.canEnableTcpNoDelayByDefault()) {try {// 开启 TCP_NODELAY ,开启TCP的nagle算法// 尽量不要等待,只要发送缓冲区中有数据,并且发送窗口是打开的,就尽量把数据发送到网络上去。setTcpNoDelay(true);} catch (Exception e) {// Ignore.}}}                                  ... }                                        

NioSocketChannel初始化与注册

上面小节分析了NioSocketChannel的创建逻辑,创建完成之后,我们来分析一下NioSocketChannel是如何注册到NioEventLoop上去的。

在前面小节分析新连接检测的有如下小段代码:

private final class NioMessageUnsafe extends AbstractNioUnsafe {...int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;// 调用pipeline传播ChannelRead事件pipeline.fireChannelRead(readBuf.get(i));}...}    

netty客户端。调用pipeline传播ChannelRead事件,这里的Pipeline是服务端Channel,也就是NioServerSocketChannel所绑定的Pipeline,此时的Pipeline的内部结构是怎么样子的呢?

Pipeline-ServerBootstrapAcceptor

那这个 ServerBootstrapAcceptor 是从哪里来的呢?

在此前,我们分析 NioServerSocketChannel初始化 时,有过下面这段代码:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...// NioServerSocketChannel初始化    void init(Channel channel) throws Exception {// 获取启动器 启动时配置的option参数,主要是TCP的一些属性final Map<ChannelOption<?>, Object> options = options0();// 将获得到 options 配置到 ChannelConfig 中去synchronized (options) {setChannelOptions(channel, options, logger);}// 获取 ServerBootstrap 启动时配置的 attr 参数final Map<AttributeKey<?>, Object> attrs = attrs0();// 配置 Channel attr,主要是设置用户自定义的一些参数synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}// 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipelineChannelPipeline p = channel.pipeline();// 将启动器中配置的 childGroup 保存到局部变量 currentChildGroupfinal EventLoopGroup currentChildGroup = childGroup;// 将启动器中配置的 childHandler 保存到局部变量 currentChildHandlerfinal ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;// 保存用户设置的 childOptions 到局部变量 currentChildOptionssynchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));}// 保存用户设置的 childAttrs 到局部变量 currentChildAttrssynchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));}p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 获取启动器上配置的handlerChannelHandler handler = config.handler();if (handler != null) {// 添加 handler 到 pipeline 中pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor// 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去// 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}...    }

ServerBootstrapAcceptor

netty-socketio?NioServerSocketChannel初始化时,向NioServerSocketChannel所绑定的Pipeline添加了一个InboundHandler节点 —— ServerBootstrapAcceptor ,其代码如下:

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {...private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {// 子EventLoopGroup,即为workGroupprivate final EventLoopGroup childGroup;// ServerBootstrap启动时配置的 childHandlerprivate final ChannelHandler childHandler;// ServerBootstrap启动时配置的 childOptionsprivate final Entry<ChannelOption<?>, Object>[] childOptions;// ServerBootstrap启动时配置的 childAttrsprivate final Entry<AttributeKey<?>, Object>[] childAttrs;private final Runnable enableAutoReadTask;// 构造函数ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;// Task which is scheduled to re-enable auto-read.// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may// not be able to load the class because of the file limit it already reached.//// See https://github.com/netty/netty/issues/1328enableAutoReadTask = new Runnable() {@Overridepublic void run() {channel.config().setAutoRead(true);}};}// 处理Pipeline所传播的channelRead事件// 也就是前面新连接检测时看到的那段代码// pipeline.fireChannelRead(readBuf.get(i));// ServerBootstrapAcceptor的channelRead接口将会被调用,用于处理channelRead事件@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {// 获取传播事件的对象数据,即为前面的readBuf.get(i)// readBuf.get(i)取出的对象为 NioSocketChannelfinal Channel child = (Channel) msg;// 向 NioSocketChannel 添加childHandler,也就是我们常看到的// ServerBootstrap在启动时配置的代码:// ServerBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {...} )// 最终的结果就是向NioSocketChannel的Pipeline添加用户自定义的ChannelHandler// 用于处理客户端的channel连接child.pipeline().addLast(childHandler);// 配置 NioSocketChannel的TCP属性setChannelOptions(child, childOptions, logger);// 配置 NioSocketChannel 一些用户自定义数据for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}// 将NioSocketChannel注册到childGroup,也就是Netty的WorkerGroup当中去try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}...}...}
关于 ChannelInitializer 的讲解,可以看此前 Pipeline源码分析 文章。

后面的register逻辑,就与我们前面讲解 NioServerSocketChannel注册 大体类似了,这里简单介绍一下。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {...// 注册NioSocketChannel// eventLoop为childGroup    @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {...// 绑定eventLoop到NioSocketChannel上AbstractChannel.this.eventLoop = eventLoop;// 现在分析的逻辑是在服务端的线程上,eventLoop与主线程不同,返回falseif (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {// 这里来调用register0方法register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}// 注册private void register0(ChannelPromise promise) {try {...boolean firstRegistration = neverRegistered;// 调用 doRegister()doRegister();neverRegistered = false;registered = true;pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// 服务端的NioServerSocketChannel已经与客户端的NioSocketChannel建立了连接// 所以,NioSocketChannel是处于激活状态,isActive()返回tureif (isActive()) {// 对于新连接,是第一次注册if (firstRegistration) {// 传播ChannelActive事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}...} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}...}        

调用到NioSocketChannel中的doRegister()方法:

public abstract class AbstractNioChannel extends AbstractChannel {...@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 将selector注册到底层JDK channel上,并附加了NioSocketChannel对象// 兴趣集设置为0,表示不关心任何事件selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {...}}}...}    

NioSocketChannel 注册OP_READ兴趣集

紧接着上面的分析,传播ChannelActive事件之后的逻辑,主要就是向客户端的NioSocketChannel注册一个Read兴趣集

if (isActive()) {// 对于新连接,是第一次注册if (firstRegistration) {// 传播ChannelActive事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}
}

netty入门,通过 Pipeline的传播机制 ,最终会调用到doBeginRead()接口,如下:

public abstract class AbstractNioChannel extends AbstractChannel {...protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {...@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called// 保存selectionKey到局部变量final SelectionKey selectionKey = this.selectionKey;// 判断有效性if (!selectionKey.isValid()) {return;}readPending = true;// 获取selectionKey的兴趣集// 前面小结分析doRegister()接口提到,selectionKey的兴趣集设置为0final int interestOps = selectionKey.interestOps();// 这里的 readInterestOp 是前面讲NioSocketChannel创建时设置的值// 为 SelectionKey.OP_READ,也就是1if ((interestOps & readInterestOp) == 0) {// 这样,selectionKey最终设置的兴趣集为SelectionKey.OP_READ// 表示对读事件感兴趣selectionKey.interestOps(interestOps | readInterestOp);}}    ...    }    ...}        

小结

  • Netty是在哪里检测有新连接接入的?
  • 新连接是怎样注册到NioEventLoop线程上的?

参考资料

  • Java读源码之Netty深入剖析

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/4/63416.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息