分析方式:
- 以下面的这一段经典的netty示例代码出发,熟悉netty的api编程模型
- 第二章从内存中的资源入手,理解资源的结构和使用,整理至第一章节
- 第三章从服务端的线程行为中
解析源码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new DiscardServerHandler());
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
一. 源码结构解析
1.1 Bootstrap
1.1.1 结构图
接口/类 | 说明 |
---|---|
Channel |
Bootstrap如其名为一个”引导”作用的工具类,功能为辅助生成对应和初始化对应的channel.
1.1.2 AbstractBootstrap工具类抽象
AbstractBootstrap为实例化channel的抽象层。主要保存的数据:
- EventLoopGroup - channel关联的IO线程池
- ChannelHandler - channel的处理链
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
//channel的io线程池
volatile EventLoopGroup group;
@SuppressWarnings("deprecation")
//channel工厂类
private volatile ChannelFactory<? extends C> channelFactory;
//略...
private volatile ChannelHandler handler;
final ChannelFuture initAndRegister() {
//生成channel
Channel channel = channelFactory.newChannel();
//抽象方法,初始化channel
init(channel);
//将这个channel注册至io线程池中
ChannelFuture regFuture = config().group().register(channel);
//略...
return regFuture;
}
//设置channel类型的时候自动设置了channel工厂类
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
}
**1.1.3 ServerBootstrap **
对于服务端程序网络程序来说主要有两个工作:
- 监听一个端口,等待客户端建立连接
- 客户端建立连接,处理客户端请求
ServerBootstrap为ServerChannel的引导帮助类,针对上面说的服务端的两个工作,ServerChannel所代表的就是上面的第一个工作。
ServerChannel的主要作用:
- 监听一个端口,等待客户端建立连接,这个便是ServerChannel的映射。
- 客户端建立连接以后,实例化一个channel做后续处理。客户端与服务端建立的连接便是ServerChannel的child。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
//略...
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
//客户端新建连接后使用的IO线程池
private volatile EventLoopGroup childGroup;
//客户端新建连接后使用的处理链
private volatile ChannelHandler childHandler;
@Override
void init(Channel channel) { //初始化监听端口的处理流程
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
//serverchannel的处理链设置
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//初始化channel的时候,向channel所属的线程池提交一个任务,往这个pipeline中添加一个handler
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}
结合上面的分析, Bootstrap主要保存的数据为:
- 每一类channel对应的IO线程池
- 每一类channel对应的handler处理链
channel初始化和IO线程池绑定的方法主要在:
- AbstractBootstrap.initAndRegister()
- 抽象方法init与各类bootstrap的实现
上面的ChannelFuture regFuture = config().group().register(channel);由下面进行分析、
1.2 EventExecutorGroup
1.2.1 结构图
接口类 | 说明 |
---|---|
Executor | 任务执行器最简单抽象接口,只有一个execute方法,可以执行任务 |
ExecutorService | 更丰富的线程池抽象接口,包含生命周期方法,任务提交方法, 流式操作方法 |
ScheduledExecutorService | 代表定时执行线程池抽象接口,包括定时执行方法。 |
EventExecutorGroup | 代表一个线程池组的抽象接口,管理多个线程池,提供线程池组的迭代方法next(), 优雅停止方法。由上面实现的3个接口可以看出,在外部看来这是一个线程池, 但是内部实现是多个线程池组成。 |
EventLoopGroup | 代表一个特殊的线程池组抽象接口,这个线程池组可以将一个channel 注册至这个线程池组中,复写next()方法,返回一个EventLoop |
AbstractEventExecutorGroup | EventExecutorGroup的骨架抽象实现类,这里不保存任何数据, 在这里调用抽象方法next()获取其中一个线程池,处理提交的任务 |
MultithreadEventExecutorGroup | EventExecutorGroup的抽象实现类,在这里支持多个线程同时处理任务。 这这里保存多个线程池。 抽象方法next()具体实现。提供抽象方法newChild()实现具体的线程池 |
MultithreadEventLoopGroup | EventLoopGroup的抽象实现类,复写next()方法实现,不保存任何数据 |
NioEventLoopGroup | MultithreadEventLoopGroup的具体实现, 不保存任何数据, 最终实现newChild方法,提供适配NIO的线程池 |
上面的抽象方法:newChild的具体实现看如下
1.3 EventExecutor
1.3.1 结构图
EventLoop为前面的EventLoopGroup,线程池组的线程池实现。左边的大部分在上面已经分析过了, 主要看右边的EventExecutor的实现。
接口类 | 说明 |
---|---|
AbstractExecutorService | 线程池接口ExecutorService的骨架抽象实现类, 不保存任务数据,只做骨架实现 |
EventExecutor | 特殊的EventExecutorGroup接口,next()方法指向自己, 同时提供parent()方法指向所属的线程池组 |
AbstractEventExecutor | EventExecutor的骨架抽象实现,保存所属的线程池组, 也就是parent()方法 |
AbstractScheduledEventExecutor | 支持定时调度的EventExecutor的骨架抽象实现, 保存有定时调度队列,提供这个队列的轮询迭代方法等 |
OrderedEventExecutor | 一个标记接口,代表会串行执行所有的任务 |
SingleThreadEventExecutor | OrderedEventExecutor的抽象实现类,会使用一个线程串行执行所有的提交任务。 保存这个线程池的工作线程, 线程池,任务队列等 |
EventLoop | 可注册线程池接口,会在这里面处理所有的IO操作,一旦channel注册后 |
SingleThreadEventLoop | EventLoop抽象实现类, 不保存任何数据 |
NioEventLoop | EventLoop的NIO实现,保存NIO相关的Selector数据。 |
1.4 Promise
接口类 | 说明 |
---|---|
java.util.concurrent.Future | juc自带future接口,包含五个方法, cancel, iscancel,isdone, get |
io.netty.util.concurrent.Future | netty丰富的future接口方法,增加listener机制, await(), sync等方法 |
AbstractFuture | future骨架抽象类,不保存任何数据,实现get方法的骨架逻辑 |
ChannelFuture | channel异步IO操作结果接口,可在其中获取channel |
Promise | 特殊的Future接口, 可以写入 |
ChannelPromise | 特殊的ChannelFuture接口,可以写入 |
DefaultPromise | 默认的future实现,保存异步结果, future所关联的线程池等(用于唤醒iolistener) |
DefaultChannelPromise | 和channel关联在一起的future实现类, 由于Netty中所有的io操作都是异步的,当channel的io线程执行了一个异步 操作的时候,便会产生一个DefaultChannelPromise,用于关联channelhe、 eventloop和channelPromise |
1.5 channel
结构图
接口类 | 说明 |
---|---|
ChannelOutboundInvoker | channel基础IO操作接口, 包含connect, close, write, newPromise等最基础的channel方法 |
AttributeMap | 属性map接口,要求线程安全 |
Channel | socket在netty中的映射接口,包含最基础的IO操作,如:read,write,bind,connect 此接口要求提供以下数据给用户: 1. 连接状态:连接、关闭、打开 2. 连接配置:channelConfig 3. 所有支持的io操作,读写连接等 4. 处理所有的IO时间的channelpipeline 特性:1.所有io操作都是异步的。2. channel是分层的,可以有parent, 例如一个channel是由别的channel创建的时候 |
ServerChannel | 服务端的标记接口,代表可以接受连接请求,创建子连接。 |
ServerSocketChannel | 服务端的TPC/IP连接接口,指代一个服务端的TCP/IP连接, 等待新的tcp连接 |
AbstractChannel | channel骨架实现抽象类, 保存数据有父channel,pipeline, 线程池,unsafe接口 |
AbstractNioChannel | channel的nio骨架实现抽象类,所有的IO操作都是用NIO的 selector机制进行实现 |
AbstractNioMessageChannel | 基于消息的NIO骨架实现抽象类,所有的IO操作基于消息 |
NioServerSocketChannel | ServerSocketChannel实现类, 使用NIO实现接受新的连接 |
1.6 Unsafe
接口/类 | 说明 |
---|---|
Unsafe | 抽象出来的channel中属于IO线程专属的调用方法接口,包含了各种IO操作方法, 比如,读,写,连接,绑定端口,除了以下方法外,其他的方法都必须由IO线程调用 1. localAddress() 2. remoteAddress() 3. closeForcibly() 4. register(EventLoop, ChannelPromise) 5. deregister(ChannelPromise) 6. voidPromise() |
NioUnsafe | Nio的unsafe接口,可以访问到NIO包的SelectableChannel类 |
AbstractUnsafe | Unsafe基础骨架抽象类, 实现每个IO操作的骨架流程,子类实现差异化抽象方法。 保存有写出去的数据的buffer, 连接是否注册标志位等数据 |
AbstractNioUnsafe | AbstractUnsafe流程的NIO骨架抽象类,不保存任务数据,使用NIO方式构造io操作流程 |
NioByteUnsafe | AbstractNioUnsafe具体实现类,不保存任务数据, 执行实际的NIO操作 |
1.7 Pipeline
接口/类 | 说明 |
---|---|
ChannelPipeline | channel所属的处理链管道 |
1.8 ChannelHandlerContext
接口/类 | 说明 |
---|---|
ChannelInboundInvoker | |
ChannelOutboundInvoker | |
ChannelHandlerContext | pipeline处理链的每个handler的的上下文接口, 保存handler和其所有包含的元数据 |
AbstractChannelHandlerContext | handler上下文接口实现的抽象类,保存例如handler 所属的channel,线程池,包含的传播方法的标记mask等的元数据 |
二. 资源解析
2.1 线程池组
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
2.1.1 线程池组初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider()); //1. 提供原生的NIO的selector
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);//2. 设置默认的IO线程数
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);//3. 设置线程池选择器
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
//4. 设置真正执行任务的Executor类型
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//5. 新建线程池组管理的线程池,newChild为抽象方法,由子类实现,我们的例子中创建的为NioEventLoopGroup
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
//略...
} finally {
//略...
}
}
chooser = chooserFactory.newChooser(children);
//略...
}
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
2.1.2 线程池类型
1
2
3
4
5
6
7
8
9
10
11
12
13
//NioEventLoopGroup.java
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
/**
* 由构造函数入参可以看出,NioEventLoop线程池包含一下数据:
* 1. 所属的父线程池组
* 2. 真正执行任务的executor
* 3. NIO selector机制所需要的参数和拒绝handler
**/
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
2.1.3 线程池与channel绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//AbstractBootstrap.java
final ChannelFuture initAndRegister() {
//略...
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
//略...
return regFuture;
}
//MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
总结:
netty io线程池在内存中的关系
netty io线程池在内部实现是一个线程池组。
channel和线程池组中的一个线程池绑定,线程池与channel之间是1对多的关系。
2.1.4 线程池执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// SingleThreadEventExecutor.java
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
addTask(task);//1. 将任务放进队伍队列
if (!inEventLoop) {//2. 当current线程不是此IO线程池线程时
startThread();//3. 执行startThraed()方法,启动此IO线程执行任务队列中的任务
//略...
}
//略...
}
private void startThread() {
if (state == ST_NOT_STARTED) {//4. 当前的线程池还未启动过, 只执行一次
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();//5. 真正的启动地方
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
executor.execute(new Runnable() { // 6. 委托ThreadPerTaskExecutor执行当前
@Override
public void run() {
//略...
SingleThreadEventExecutor.this.run(); //7. 执行抽象的run方法
success = true;
} catch (Throwable t) {
//略...
}
//销毁钩子等操作
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//NioEventLoop.java
@Override
protected void run() {
for (;;) {//这个run方法用源
try {
//略... 阻塞于NIO select的操作
//8. 处理NIO中的selectKey集合
processSelectedKeys();
} finally {
// 12. 处理提交到IO线程池的任务
runAllTasks();
}
//略...
// 13. 当线程池关闭的时候这个run方法才会退出
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//略...
try {
int readyOps = k.readyOps();
//9. 先处理建连接请求
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//略...
unsafe.finishConnect();
}
//10. 处理写请求
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//11. 处理读请求
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
总结:
- 每个内部IO线程池中只有一个IO线程,这个IO线程在第一次向这个线程池提交任务的时候启动,在线程池关闭的时候退出
- IO线程主要执行两个任务:
- 处理跟这个线程池绑定的所有channel的所有的IO事件。
- 处理所有提交到这个线程池的任务。包括对channel的pineline修改,绑定新的channel等
2.2 处理链
2.2.1 NioServerSocketChannel
2.2.1.1 channel实例化
1
2
3
4
5
6
7
8
9
10
11
12
//AbstractBoostrap.java
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();//1. 新建channel实例,这里面包含了pipeline的实例化
init(channel);//2. 初始化连接,添加pipeline的handler
} catch (Throwable t) {
}
ChannelFuture regFuture = config().group().register(channel);//3. 将channel注册至IO线程池中,在channel注册完成后,会直接fire channel的注册完成回调函数。
return regFuture;
}
2.2.1.2 channel初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//ServerBootstrap.java
@Override
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {//2.1 添加一个特殊的handler:ChannelInitializer-会在channel注册到线程池后的回调事件中触发,也就是下面的initChannel的回调方法会在channel注册到线程池以后触发,然后把自己移除出handler队列。所以ChannelInitializer的作用就是一个初始化handler
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);//2.1.设置Bootstrap上的公共handler进连接的pipeline中
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(//2.2.添加服务端专属的Acceptor handelr,用于处理客户端新建连接
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
三. 行为解析
3.1 监听端口
ChannelFuture f = b.bind(PORT).sync();
AbstractBootstrap.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//bind方法实际执行
private ChannelFuture doBind(final SocketAddress localAddress) {
//1. 初始化channel,并将channel注册到对应的IO线程池中,返回一个异步future
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
//略...
if(regFuture.isDone()){
//2. 新建一个异步Promise,用于接收下面的dobind0的异步操作结果
ChannelPromise promise = channel.newPromise();
//3. 确保上面的连接已经初始化和register以后执行dobind0方法,这里为实际的端口监听操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
}else{//略...
}
}
final ChannelFuture initAndRegister() {
//生成channel
Channel channel = channelFactory.newChannel();
//抽象方法,初始化channel
init(channel);
//将这个channel注册至io线程池中
ChannelFuture regFuture = config().group().register(channel);
//略...
return regFuture;
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
AbstractChannel.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//略...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {//1. 判断当前线程是否是当前channel所属的线程池的线程
register0(promise);//2. 是的话使用此IO线程处理接下来的register操作
} else {
try {
eventLoop.execute(new Runnable() {//3. 不是当前channel所属的IO线程池的时候提交一个任务,将register的动作委托为此channel的IO线程去完成
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
private void register0(ChannelPromise promise) {
try {
doRegister();
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
3.2 空闲检测
netty中的空闲检测通过在channel的pipeline中添加IdlestatHandler进行实现。
1
2
3
4
5
6
7
8
9
10
11
12
//空闲检测实例代码
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new IdlestateHandler());
}
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
//IdlestateHandler.java
//初始化
private void initialize(ChannelHandlerContext ctx) {
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
}
//AbstractScheduledEventExecutor.java
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
//略...
return schedule(new ScheduledFutureTask<Void>(
this, command, deadlineNanos(unit.toNanos(delay))));
}
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
//略...
scheduledTaskQueue().add(task.setId(nextTaskId++));
return task;
}
//SingleThreadEventExecutor.java
//IO线程循环执行的时候会执行所有的task
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
//略...
}
//SingleThreadEventExecutor.java
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
//AbstractScheduledEventExecutor.java
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {//队列里面没有任务,或者队列里面的任务还没有到事件则不返回这个定时任务给IO线程执行
return null;
}
scheduledTaskQueue.remove();
return scheduledTask;
}
总结:
- idlestatehandler在初始化的时候会往定时任务队列里存放三个任务,read、write和all的空闲检测任务
- IO线程每一次循环都会将到时间的定时任务添加到自己的可执行任务队列里面I
四. coding技巧
4.1 handler链构造
netty的pipeline是编程模式,是直接pipeline.addLast(), 在pipeline中形成一个固定顺序的handler链条。
1
2
3
4
5
6
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
在运行时,netty会根据这个传播事件动态拼接能执行这个传播事件的handler链,这个拼接的精度不是接口级,而是传播方法级别的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class DefaultChannelPipeline implements ChannelPipeline {
//上面的addLast添加的handler在添加进pipeline以后会封装成一个ChannelHandlerContext,并且形成一个链条
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
}
//编程技巧:为每一个handler封装一个上下文,记录一些这个handler的元数据。
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
//每个context保存一个mask标记位,记录这个handler支持的传播事件有哪些
private final int executionMask;
}
final class ChannelHandlerMask {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelHandlerMask.class);
// 标记这个channelHandler那些方法必须被调用,一个int数据4字节,只用其中的一位表示一种状态。
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | ` MASK_DISCONNECT | MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
//设置标记的逻辑:1. 假设这个handler满足所有的标记。2. 一个方法一个方法地过滤这个标记位。当所有不满足的方法被过滤后,剩下的便是符合要求的handler。
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;//假设这个mask包含所有的方法
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {//如果没有
mask &= ~MASK_CHANNEL_REGISTERED;//
}
//略...
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
//略...
}
return mask;
}
}