-
@Override
-
protected void run() {
-
for (;;) {
-
try {
-
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
-
case SelectStrategy.CONTINUE:
-
continue;
-
case SelectStrategy.SELECT:
-
//select轮询, 设置wakenUp为false并返回之前的wakenUp值
-
select(wakenUp.getAndSet(
false));
-
if (wakenUp.get()) {
-
selector.wakeup();
-
}
-
default:
-
// fallthrough
-
}
-
//去除了无关紧要的代码
-
processSelectedKeys();
-
runAllTasks();
-
}
catch (Throwable t) {
-
handleLoopException(t);
-
}
-
// Always handle shutdown even if the loop processing threw an exception.
-
...
-
}
-
}
前面讲到Reactor的核心是执行了NioEventLoop的run方法,主要做了上面三件事:
- 轮询注册到reactor线程上的对应的selector的所有channel的IO事件
- 根据不同的SelectKeys进行处理 processSelectedKeys();
- 处理任务队列 runAllTasks();
接下来再详细看下processSelectedKeys()和runAllTasks(); 方法做了什么
processSelectedKeys
-
private void processSelectedKeys() {
-
if (selectedKeys !=
null) {
-
processSelectedKeysOptimized(selectedKeys.flip());
-
}
else {
-
processSelectedKeysPlain(selector.selectedKeys());
-
}
-
}
这里的processSelectedkeys()方法会根据selectedKeys是否为空,判断执行优化后的processSelectedKeysOptimized()还是普通的processSelectedKeysPlain()方法
这里的selectedKeys Netty在调用openSelector时对其进行了优化
-
private SelectedSelectionKeySet selectedKeys;
-
private Selector openSelector() {
-
final Selector selector;
-
selector = provider.openSelector();
-
-
final SelectedSelectionKeySet selectedKeySet =
new SelectedSelectionKeySet();
-
-
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
-
Object maybeException = AccessController.doPrivileged(
new PrivilegedAction<Object>() {
-
@Override
-
public Object run() {
-
Field selectedKeysField = selectorImplClass.getDeclaredField(
"selectedKeys");
-
Field publicSelectedKeysField = selectorImplClass.getDeclaredField(
"publicSelectedKeys");
-
selectedKeysField.setAccessible(
true);
-
publicSelectedKeysField.setAccessible(
true);
-
selectedKeysField.set(selector, selectedKeySet);
-
publicSelectedKeysField.set(selector, selectedKeySet);
-
return
null;
-
}
-
});
-
selectedKeys = selectedKeySet;
-
return selector;
-
}
先创建一个空的SelectedSelectionKeySet对象,然后通过反射获取jdk 底层的Selector 的class 对象的 selectedKeys和publicSelectedKeys字段,并将Netty的SelectedSelectionKeySet通过反射赋值,这样在底层调用jdk的api存储注册事件时,最后都会把事件保存到Netty的SelectedSelectionKeySet 对象里
可以看下替换前后有什么区别,jdk底层的SelectImpl对象的selectedKeys和publicSelectedKeys字段都是Set<SelectionKey>类型,而Netty里的SelectedSelectionKeySet对象是这样的一个结构:
-
final
class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
-
-
private SelectionKey[] keysA;
-
private
int keysASize;
-
private SelectionKey[] keysB;
-
private
int keysBSize;
-
private
boolean isA =
true;
-
-
@Override
-
public boolean add(SelectionKey o) {
-
if (o ==
null) {
-
return
false;
-
}
-
//添加元素到数组的最后,如果数组满了,就进行扩容(*2)
-
if (isA) {
-
int size = keysASize;
-
keysA[size ++] = o;
-
keysASize = size;
-
if (size == keysA.length) {
-
doubleCapacityA();
-
}
-
}
else {
-
...
-
}
-
-
return
true;
-
}
-
-
//移除对应的SelectionKey数组的最后一个元素
-
SelectionKey[] flip() {
-
if (isA) {
-
isA =
false;
-
keysA[keysASize] =
null;
-
keysBSize =
0;
-
return keysA;
-
}
else {
-
...
-
}
-
}
-
-
@Override
-
public boolean remove(Object o) {
-
return
false;
-
}
-
-
@Override
-
public boolean contains(Object o) {
-
return
false;
-
}
-
-
@Override
-
public Iterator<SelectionKey> iterator() {
-
throw
new UnsupportedOperationException();
-
}
-
}
SelectedSelectionKeySet是AbstractSet的一个子类,底层通过SelectionKey[]数组方法实现,并且将一些不需要的方法remove,contains方法进行重写,Netty里轮询事件的时候对操作进行了简化,不需要通过集合的Iterator进行移除,而直接通过flip方法去掉集合的最后一个SelectionKey就可以了(这样的操作的时间复杂度更低,可以直接定位到具体的下标),而我们在使用NIO的API的时候都需要进行remove操作
4.1.6.Final中的源码,这里的SelectionKey是两个数组交替遍历的,在4.1.9.Final 版本中,netty已经将SelectedSelectionKeySet底层使用一个数组了:SelectedSelectionKeySet
接着来看下 processSelectedKeysOptimized(selectedKeys.flip());方法
-
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
-
for (
int i =
0;; i ++) {
-
final SelectionKey k = selectedKeys[i];
-
if (k ==
null) {
-
break;
-
}
-
selectedKeys[i] =
null;
-
//拿到SelectionKey的attachment,并根据其类型做不同处理
-
final Object a = k.attachment();
-
if (a
instanceof AbstractNioChannel) {
-
processSelectedKey(k, (AbstractNioChannel) a);
-
}
else {
-
@SuppressWarnings(
"unchecked")
-
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
-
processSelectedKey(k, task);
-
}
-
-
if (needsToSelectAgain) {
-
//如果需要重新select,就将selectedKeys的元素都置为null恢复初始的状态
-
for (;;) {
-
i++;
-
if (selectedKeys[i] ==
null) {
-
break;
-
}
-
selectedKeys[i] =
null;
-
}
-
selectAgain();
-
// Need to flip the optimized selectedKeys to get the right reference to the array
-
// and reset the index to -1 which will then set to 0 on the for loop
-
// to start over again.
-
selectedKeys =
this.selectedKeys.flip();
-
i = -
1;
-
}
-
}
-
}
上述过程可以分为三步:
- 取出SelectionKey(包含channel,attachment等信息)
-
这里看到SelectionKey的attachment类型可能是AbstractNioChannel,猜测是不是在注册事件的时间添加的,根据ServerBootstrap的启动流程,最后会调用AbstractNioChannel的如下方法:
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
这里的最后一个参数也就是attachment,当前对象不就是AbstractNioChannel的子类
- 处理SelectionKey
-
-
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
-
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
-
if (!k.isValid()) {
-
final EventLoop eventLoop;
-
-
eventLoop = ch.eventLoop();
-
-
if (eventLoop != this || eventLoop == null) {
-
return;
-
}
-
unsafe.close(unsafe.voidPromise());
-
return;
-
}
-
int readyOps = k.readyOps();
-
//连接建立事件
-
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
-
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
-
int ops = k.interestOps();
-
ops &= ~SelectionKey.OP_CONNECT;
-
//1.将连接事件从interestOps中移除
-
k.interestOps(ops);
-
//2.调用pipeline().fireChannelActive()将连接建立完成通知给pipeline中的各个handler
-
unsafe.finishConnect();
-
}
-
-
//可写事件
-
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
-
ch.unsafe().forceFlush();
-
}
-
//可读事件
-
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
-
unsafe.read();
-
if (!ch.isOpen()) {
-
return;
-
}
-
}
-
-
}
-
可以看出这里就是一系列NIO的操作,分别对OP_READ, 可读事件, OP_WRITE, 可写事件, OP_CONNECT, 连接事件进行处理
以OP_READ事件为例
-
public final void read() {
-
final ChannelConfig config = config();
-
final ChannelPipeline pipeline = pipeline();
-
final ByteBufAllocator allocator = config.getAllocator();
-
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
-
allocHandle.reset(config);
-
-
ByteBuf byteBuf =
null;
-
boolean close =
false;
-
do {
-
//1.分配ByteBuf
-
byteBuf = allocHandle.allocate(allocator);
-
//2.从Channel读取数据
-
allocHandle.lastBytesRead(doReadBytes(byteBuf));
-
if (allocHandle.lastBytesRead() <=
0) {
-
// nothing was read. release the buffer.
-
byteBuf.release();
-
byteBuf =
null;
-
close = allocHandle.lastBytesRead() <
0;
-
break;
-
}
-
-
allocHandle.incMessagesRead(
1);
-
readPending =
false;
-
//3.通过pipeline.fireChannelRead事件通知给pipeline里的各个handler
-
pipeline.fireChannelRead(byteBuf);
-
byteBuf =
null;
-
}
while (allocHandle.continueReading());
-
-
allocHandle.readComplete();
-
pipeline.fireChannelReadComplete();
-
-
if (close) {
-
closeOnRead(pipeline);
-
}
-
-
}
-
}
- 判断是否需要重新Select并重置
-
void cancel(SelectionKey key) {
-
key.cancel();
-
cancelledKeys ++;
-
if (cancelledKeys >= CLEANUP_INTERVAL) {
-
cancelledKeys =
0;
-
needsToSelectAgain =
true;
-
}
-
}
这里的cancelledKeys会在调用cancel(SelectionKey)删除注册事件的时候计数,当他大于CLEANUP_INTERVAL(256)的时候,就会将needsToSelectAgain设置为true,进入对应的分支判断,先将原来的selectedKeys都置为Null,然后重新调用selectNow(),重新填充selectedKeys
总结:
netty的NioEventLoop线程第二步做的事情就是处理SelectionKey,netty使用数组替换掉jdk原生的HashSet来优化查询和更新SelectionKey的效率,每个SelectionKey上绑定了netty类AbstractNioChanne的具体实现子类对象作为attachment,在处理每个SelectionKey的时候,就可以找到对应的AbstractNioChannel,最后通过pipeline来处理通知给其他Handler
任务执行runAllTasks
任务添加
添加普通任务
前面的分析说过NioEventLoop 是Netty的核心线程,其添加任务是通过执行父类SingleThreadEventExecutor的execute方法,
通过addTask方法,将Runnable(即task)添加到对应的任务队列 Queue<Runnable> taskQueue;里
-
public void execute(Runnable task) {
-
boolean inEventLoop = inEventLoop();
-
if (inEventLoop) {
-
addTask(task);
-
}
else {
-
startThread();
-
addTask(task);
-
}
-
}
Netty的源码里的bind()流程中有通过如下方法添加对应的task到SingleThreadEventExecutor的任务队列里,如下:
-
private static void doBind0(
-
final ChannelFuture regFuture,
final Channel channel,
-
final SocketAddress localAddress,
final ChannelPromise promise) {
-
channel.eventLoop().execute(
new Runnable() {
-
@Override
-
public void run() {
-
if (regFuture.isSuccess()) {
-
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-
}
else {
-
promise.setFailure(regFuture.cause());
-
}
-
}
-
});
-
}
用户也可以通过如下方式自己添加task到TaskQueue
-
EventLoop eventLoop = channel.eventLoop();
-
eventLoop.execute(
new Runnable() {
-
@Override
-
public void run() {
-
//TODO
-
}
-
});
添加定时任务
除了上述方式,我们还可以通过如下方法添加定时任务到对应的任务队列
-
EventLoop eventLoop = channel.eventLoop();
-
eventLoop.schedule(
new Runnable() {
-
@Override
-
public void run() {
-
//TODO
-
}
-
},
30, TimeUnit.SECONDS);
具体的实现是在父类AbstractScheduledEventExecutor里,看下对应的源码
-
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit) {
-
...
-
if (delay <
0) {
-
throw
new IllegalArgumentException(
-
String.format(
"delay: %d (expected: >= 0)", delay));
-
}
-
return schedule(
new ScheduledFutureTask<Void>(
-
this, command,
null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
-
}
-
-
<V>
ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
-
if (inEventLoop()) {
-
scheduledTaskQueue().add(task);
-
}
else {
-
execute(
new Runnable() {
-
@Override
-
public void run() {
-
scheduledTaskQueue().add(task);
-
}
-
});
-
}
-
-
return task;
-
}
会将对应的Runnable和延迟时间封装成一个新的ScheduledFutureTask,然后调用重载的schedule方法,将对应的task添加到PriorityQueue<ScheduledFutureTask<?>>的优先队列里
这里对添加定时任务的Thread进行了判断,如果调用的发起方是reactor线程,那么就直接将Task添加到优先队列中;如果是外部线程调用的schedule,会将"添加定时任务到优先队列"封装成一个Runnable也就是新的task,然后调用上面的execute方法去添加任务,这样会访问PriorityQueue的就只有reactor线程了,变成了单线程
接下来我们来详细看下这个特殊的优先队列PriorityQueue<ScheduledFutureTask<?>>,所谓的优先队列与普通队列的区别在于每个元素都被赋予了优先级。当访问元素时,会将具有最高优先级的元素最先弹出。即优先队列具有最高级先出的特征。
看下这个优先队列里的元素ScheduledFutureTask,它实现了Comparable接口,定义了自己的compareTo方法,先比较deadlineNanos(也就是截止时间)的大小,如果一样则比较id,如果也相同就抛出异常
-
final
class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
-
private
static
final AtomicLong nextTaskId =
new AtomicLong();
-
private
final
long id = nextTaskId.getAndIncrement();
-
-
@Override
-
public int compareTo(Delayed o) {
-
if (
this == o) {
-
return
0;
-
}
-
-
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
-
long d = deadlineNanos() - that.deadlineNanos();
-
if (d <
0) {
-
return -
1;
-
}
else
if (d >
0) {
-
return
1;
-
}
else
if (id < that.id) {
-
return -
1;
-
}
else
if (id == that.id) {
-
throw
new Error();
-
}
else {
-
return
1;
-
}
-
}
-
}
既然ScheduledFutureTask本质也是一个Runnable,那么就看下它的run方法吧
这里对于不同的类型任务进行了不同的处理,periodNanos=0表示是只执行一次的任务,>0 表示是按照指定频率定期执行的任务,<0表示是每次执行完成后,延迟一段时间再次执行的任务(二者的区别在于一个是根据上次任务开始执行的时间计算间隔,一个是按照上次任务执行结束的时间计算间隔)
-
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
-
private
final
long periodNanos;
-
-
@Override
-
public void run() {
-
if (periodNanos ==
0) {
-
if (setUncancellableInternal()) {
-
V result = task.call();
-
setSuccessInternal(result);
-
}
-
}
else {
-
if (!isCancelled()) {
-
task.call();
-
if (!executor().isShutdown()) {
-
long p = periodNanos;
-
if (p >
0) {
-
//设置该任务的下一次截止时间为本次的截止时间加上间隔时间periodNanos
-
deadlineNanos += p;
-
}
else {
-
//设置下一次截止时间为当前时间加上延迟(因为p<0,所以要减去) 此时的当前时间就是本次任务之间结束的时间 task.call()是一个阻塞的方法
-
deadlineNanos = nanoTime() - p;
-
}
-
if (!isCancelled()) {
-
//将新的ScheduledFutureTask添加到任务队列等待下次执行
-
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
-
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
-
assert scheduledTaskQueue !=
null;
-
scheduledTaskQueue.add(
this);
-
}
-
}
-
}
-
}
-
-
}
Task任务的执行
有两个重载的runAllTasks方法,一个无参,一个带有long timeoutNanos参数,先来看下无参的方法
-
protected boolean runAllTasks() {
-
assert inEventLoop();
-
boolean fetchedAll;
-
boolean ranAtLeastOne =
false;
-
-
do {
-
fetchedAll = fetchFromScheduledTaskQueue();
-
if (runAllTasksFrom(taskQueue)) {
-
ranAtLeastOne =
true;
-
}
-
}
while (!fetchedAll);
// keep on processing until we fetched all scheduled tasks.
-
-
if (ranAtLeastOne) {
-
lastExecutionTime = ScheduledFutureTask.nanoTime();
-
}
-
afterRunningAllTasks();
-
return ranAtLeastOne;
-
}
主要做下面三件事情:
1.将优先队列里的ScheduledFutureTask取出放到taskQueue里
2.从taskQueue里取出task并执行
3.task任务执行完毕后执行后置处理逻辑
将任务从优先队列移动到taskQueue
-
private boolean fetchFromScheduledTaskQueue() {
-
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
-
Runnable scheduledTask = pollScheduledTask(nanoTime);
-
while (scheduledTask !=
null) {
-
if (!taskQueue.offer(scheduledTask)) {
-
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
-
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
-
return
false;
-
}
-
scheduledTask = pollScheduledTask(nanoTime);
-
}
-
return
true;
-
}
-
-
protected final Runnable pollScheduledTask(long nanoTime) {
-
assert inEventLoop();
-
-
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
this.scheduledTaskQueue;
-
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue ==
null ?
null : scheduledTaskQueue.peek();
-
if (scheduledTask ==
null) {
-
return
null;
-
}
-
-
if (scheduledTask.deadlineNanos() <= nanoTime) {
-
scheduledTaskQueue.remove();
-
return scheduledTask;
-
}
-
return
null;
-
}
先从scheduledTaskQueue优先队列里拿到对应优先级最高的task(截止时间最近的Task),判断当前是否已到达其截止时间,是的话就将其从优先队列中取出并删除元素,然后将其加入到taskQueue中,如果加入失败就重新加入到scheduledTaskQueue中,一直到所有的优先队列里的task都迁移成功
简单来说就是把已经到期的定时任务从PriorityQueue转移到taskQueue
执行task
-
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
-
Runnable task = pollTaskFrom(taskQueue);
-
if (task ==
null) {
-
return
false;
-
}
-
for (;;) {
-
safeExecute(task);
-
task = pollTaskFrom(taskQueue);
-
if (task ==
null) {
-
return
true;
-
}
-
}
-
}
-
-
-
protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
-
for (;;) {
-
Runnable task = taskQueue.poll();
-
if (task == WAKEUP_TASK) {
-
continue;
-
}
-
return task;
-
}
-
}
-
-
protected static void safeExecute(Runnable task) {
-
try {
-
task.run();
-
}
catch (Throwable t) {
-
logger.warn(
"A task raised an exception. Task: {}", task, t);
-
}
-
}
从taskQueue中取出非WAKEUP_TASK的任务,然后调用safeExecute() --内部之间调用task.run()来安全执行所有的task,一直到所有的task都执行完毕
后置处理
-
@Override
-
protected void afterRunningAllTasks() {
-
runAllTasksFrom(tailTasks);
-
}
当所有的task执行完毕之后,我们还可以执行一些自己的task,通过afterRunningAllTasks方法来执行在tailTasks队列里的所有任务,我们可以通过SingleThreadEventLoop的executeAfterEventLoopIteration向tailTasks里添加自己想要执行的业务逻辑
task的执行还有一个带有超时时间的重载方法,如下:
-
protected boolean runAllTasks(long timeoutNanos) {
-
fetchFromScheduledTaskQueue();
-
//从taskQueue poll获取任务
-
Runnable task = pollTask();
-
if (task ==
null) {
-
afterRunningAllTasks();
-
return
false;
-
}
-
//计算当前方法超时的截止时间
-
final
long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
-
long runTasks =
0;
-
long lastExecutionTime;
-
for (;;) {
-
safeExecute(task);
-
runTasks ++;
-
//位运算,说明runTasks是64的倍数 0x3F=0011 1111 (63)
-
if ((runTasks &
0x3F) ==
0) {
-
lastExecutionTime = ScheduledFutureTask.nanoTime();
-
if (lastExecutionTime >= deadline) {
-
break;
-
}
-
}
-
-
task = pollTask();
-
if (task ==
null) {
-
lastExecutionTime = ScheduledFutureTask.nanoTime();
-
break;
-
}
-
}
-
-
afterRunningAllTasks();
-
this.lastExecutionTime = lastExecutionTime;
-
return
true;
-
}
基本思路和不带参数的runAllTasks一样,区别在于会考虑所有任务执行的超时时间,为了提高执行效率,每执行64个任务都会比较下当前时间是否大于runAllTasks的截止时间,是的话就退出
从上面可以看出,我们的EventLoopGroup 既需要执行 IO 操作, 又需要执行 很多的task, 因此在调用对应execute 方法添加任务的时候, 不要提交耗时任务, 更不能提交一些会造成阻塞的任务, 不然会导致我们的 IO 线程得不到调度, 影响整个程序的并发量
总结一下:
- netty内的任务可分为普通任务和定时任务,分别保存在LinkedBlockingQueue和PriorityQueue
- netty执行任务之前,会将已经到期的定时任务从PriorityQueue转移到LinkedBlockingQueue
- 如果执行任务有超时时间,那么会每执行64个任务校验下是否达到截止时间
参考:
netty源码分析之揭开reactor线程的面纱(二)
netty源码分析之揭开reactor线程的面纱(三)
Netty 源码分析-EventLoop
转载:https://blog.csdn.net/qq_35448165/article/details/105164861