小言_互联网的博客

Netty学习笔记(四)EventLoopGroup续篇

355人阅读  评论(0)

  
  1. @Override
  2. protected void run() {
  3. for (;;) {
  4. try {
  5. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  6. case SelectStrategy.CONTINUE:
  7. continue;
  8. case SelectStrategy.SELECT:
  9. //select轮询, 设置wakenUp为false并返回之前的wakenUp值
  10. select(wakenUp.getAndSet( false));
  11. if (wakenUp.get()) {
  12. selector.wakeup();
  13. }
  14. default:
  15. // fallthrough
  16. }
  17. //去除了无关紧要的代码
  18. processSelectedKeys();
  19. runAllTasks();
  20. } catch (Throwable t) {
  21. handleLoopException(t);
  22. }
  23. // Always handle shutdown even if the loop processing threw an exception.
  24. ...
  25. }
  26. }

前面讲到Reactor的核心是执行了NioEventLoop的run方法,主要做了上面三件事:

  • 轮询注册到reactor线程上的对应的selector的所有channel的IO事件
  • 根据不同的SelectKeys进行处理  processSelectedKeys();
  • 处理任务队列 runAllTasks();   

接下来再详细看下processSelectedKeys()和runAllTasks();  方法做了什么

processSelectedKeys


  
  1. private void processSelectedKeys() {
  2. if (selectedKeys != null) {
  3. processSelectedKeysOptimized(selectedKeys.flip());
  4. } else {
  5. processSelectedKeysPlain(selector.selectedKeys());
  6. }
  7. }

这里的processSelectedkeys()方法会根据selectedKeys是否为空,判断执行优化后的processSelectedKeysOptimized()还是普通的processSelectedKeysPlain()方法

这里的selectedKeys Netty在调用openSelector时对其进行了优化


  
  1. private SelectedSelectionKeySet selectedKeys;
  2. private Selector openSelector() {
  3. final Selector selector;
  4. selector = provider.openSelector();
  5. final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
  6. final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
  7. Object maybeException = AccessController.doPrivileged( new PrivilegedAction<Object>() {
  8. @Override
  9. public Object run() {
  10. Field selectedKeysField = selectorImplClass.getDeclaredField( "selectedKeys");
  11. Field publicSelectedKeysField = selectorImplClass.getDeclaredField( "publicSelectedKeys");
  12. selectedKeysField.setAccessible( true);
  13. publicSelectedKeysField.setAccessible( true);
  14. selectedKeysField.set(selector, selectedKeySet);
  15. publicSelectedKeysField.set(selector, selectedKeySet);
  16. return null;
  17. }
  18. });
  19. selectedKeys = selectedKeySet;
  20. return selector;
  21. }

先创建一个空的SelectedSelectionKeySet对象,然后通过反射获取jdk 底层的Selector 的class 对象的 selectedKeys和publicSelectedKeys字段,并将Netty的SelectedSelectionKeySet通过反射赋值,这样在底层调用jdk的api存储注册事件时,最后都会把事件保存到Netty的SelectedSelectionKeySet 对象里

可以看下替换前后有什么区别,jdk底层的SelectImpl对象的selectedKeys和publicSelectedKeys字段都是Set<SelectionKey>类型,而Netty里的SelectedSelectionKeySet对象是这样的一个结构:


  
  1. final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
  2. private SelectionKey[] keysA;
  3. private int keysASize;
  4. private SelectionKey[] keysB;
  5. private int keysBSize;
  6. private boolean isA = true;
  7. @Override
  8. public boolean add(SelectionKey o) {
  9. if (o == null) {
  10. return false;
  11. }
  12. //添加元素到数组的最后,如果数组满了,就进行扩容(*2)
  13. if (isA) {
  14. int size = keysASize;
  15. keysA[size ++] = o;
  16. keysASize = size;
  17. if (size == keysA.length) {
  18. doubleCapacityA();
  19. }
  20. } else {
  21. ...
  22. }
  23. return true;
  24. }
  25. //移除对应的SelectionKey数组的最后一个元素
  26. SelectionKey[] flip() {
  27. if (isA) {
  28. isA = false;
  29. keysA[keysASize] = null;
  30. keysBSize = 0;
  31. return keysA;
  32. } else {
  33. ...
  34. }
  35. }
  36. @Override
  37. public boolean remove(Object o) {
  38. return false;
  39. }
  40. @Override
  41. public boolean contains(Object o) {
  42. return false;
  43. }
  44. @Override
  45. public Iterator<SelectionKey> iterator() {
  46. throw new UnsupportedOperationException();
  47. }
  48. }

SelectedSelectionKeySet是AbstractSet的一个子类,底层通过SelectionKey[]数组方法实现,并且将一些不需要的方法remove,contains方法进行重写,Netty里轮询事件的时候对操作进行了简化,不需要通过集合的Iterator进行移除,而直接通过flip方法去掉集合的最后一个SelectionKey就可以了(这样的操作的时间复杂度更低,可以直接定位到具体的下标),而我们在使用NIO的API的时候都需要进行remove操作
4.1.6.Final中的源码,这里的SelectionKey是两个数组交替遍历的,在4.1.9.Final 版本中,netty已经将SelectedSelectionKeySet底层使用一个数组了:SelectedSelectionKeySet

接着来看下 processSelectedKeysOptimized(selectedKeys.flip());方法


  
  1. private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
  2. for ( int i = 0;; i ++) {
  3. final SelectionKey k = selectedKeys[i];
  4. if (k == null) {
  5. break;
  6. }
  7. selectedKeys[i] = null;
  8. //拿到SelectionKey的attachment,并根据其类型做不同处理
  9. final Object a = k.attachment();
  10. if (a instanceof AbstractNioChannel) {
  11. processSelectedKey(k, (AbstractNioChannel) a);
  12. } else {
  13. @SuppressWarnings( "unchecked")
  14. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  15. processSelectedKey(k, task);
  16. }
  17. if (needsToSelectAgain) {
  18. //如果需要重新select,就将selectedKeys的元素都置为null恢复初始的状态
  19. for (;;) {
  20. i++;
  21. if (selectedKeys[i] == null) {
  22. break;
  23. }
  24. selectedKeys[i] = null;
  25. }
  26. selectAgain();
  27. // Need to flip the optimized selectedKeys to get the right reference to the array
  28. // and reset the index to -1 which will then set to 0 on the for loop
  29. // to start over again.
  30. selectedKeys = this.selectedKeys.flip();
  31. i = - 1;
  32. }
  33. }
  34. }

上述过程可以分为三步:

  • 取出SelectionKey(包含channel,attachment等信息)
  • 这里看到SelectionKey的attachment类型可能是AbstractNioChannel,猜测是不是在注册事件的时间添加的,根据ServerBootstrap的启动流程,最后会调用AbstractNioChannel的如下方法:

    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
    

    这里的最后一个参数也就是attachment,当前对象不就是AbstractNioChannel的子类

  • 处理SelectionKey
  • 
        
    1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    2. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    3. if (!k.isValid()) {
    4. final EventLoop eventLoop;
    5. eventLoop = ch.eventLoop();
    6. if (eventLoop != this || eventLoop == null) {
    7. return;
    8. }
    9. unsafe.close(unsafe.voidPromise());
    10. return;
    11. }
    12. int readyOps = k.readyOps();
    13. //连接建立事件
    14. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    15. // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    16. int ops = k.interestOps();
    17. ops &= ~SelectionKey.OP_CONNECT;
    18. //1.将连接事件从interestOps中移除
    19. k.interestOps(ops);
    20. //2.调用pipeline().fireChannelActive()将连接建立完成通知给pipeline中的各个handler
    21. unsafe.finishConnect();
    22. }
    23. //可写事件
    24. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    25. ch.unsafe().forceFlush();
    26. }
    27. //可读事件
    28. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    29. unsafe.read();
    30. if (!ch.isOpen()) {
    31. return;
    32. }
    33. }
    34. }

可以看出这里就是一系列NIO的操作,分别对OP_READ, 可读事件, OP_WRITE, 可写事件, OP_CONNECT, 连接事件进行处理

以OP_READ事件为例


  
  1. public final void read() {
  2. final ChannelConfig config = config();
  3. final ChannelPipeline pipeline = pipeline();
  4. final ByteBufAllocator allocator = config.getAllocator();
  5. final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
  6. allocHandle.reset(config);
  7. ByteBuf byteBuf = null;
  8. boolean close = false;
  9. do {
  10. //1.分配ByteBuf
  11. byteBuf = allocHandle.allocate(allocator);
  12. //2.从Channel读取数据
  13. allocHandle.lastBytesRead(doReadBytes(byteBuf));
  14. if (allocHandle.lastBytesRead() <= 0) {
  15. // nothing was read. release the buffer.
  16. byteBuf.release();
  17. byteBuf = null;
  18. close = allocHandle.lastBytesRead() < 0;
  19. break;
  20. }
  21. allocHandle.incMessagesRead( 1);
  22. readPending = false;
  23. //3.通过pipeline.fireChannelRead事件通知给pipeline里的各个handler
  24. pipeline.fireChannelRead(byteBuf);
  25. byteBuf = null;
  26. } while (allocHandle.continueReading());
  27. allocHandle.readComplete();
  28. pipeline.fireChannelReadComplete();
  29. if (close) {
  30. closeOnRead(pipeline);
  31. }
  32. }
  33. }
  • 判断是否需要重新Select并重置

  
  1. void cancel(SelectionKey key) {
  2. key.cancel();
  3. cancelledKeys ++;
  4. if (cancelledKeys >= CLEANUP_INTERVAL) {
  5. cancelledKeys = 0;
  6. needsToSelectAgain = true;
  7. }
  8. }

这里的cancelledKeys会在调用cancel(SelectionKey)删除注册事件的时候计数,当他大于CLEANUP_INTERVAL(256)的时候,就会将needsToSelectAgain设置为true,进入对应的分支判断,先将原来的selectedKeys都置为Null,然后重新调用selectNow(),重新填充selectedKeys

总结:
netty的NioEventLoop线程第二步做的事情就是处理SelectionKey,netty使用数组替换掉jdk原生的HashSet来优化查询和更新SelectionKey的效率,每个SelectionKey上绑定了netty类AbstractNioChanne的具体实现子类对象作为attachment,在处理每个SelectionKey的时候,就可以找到对应的AbstractNioChannel,最后通过pipeline来处理通知给其他Handler

任务执行runAllTasks

任务添加

添加普通任务

前面的分析说过NioEventLoop 是Netty的核心线程,其添加任务是通过执行父类SingleThreadEventExecutor的execute方法,
通过addTask方法,将Runnable(即task)添加到对应的任务队列 Queue<Runnable> taskQueue;里


  
  1. public void execute(Runnable task) {
  2. boolean inEventLoop = inEventLoop();
  3. if (inEventLoop) {
  4. addTask(task);
  5. } else {
  6. startThread();
  7. addTask(task);
  8. }
  9. }

Netty的源码里的bind()流程中有通过如下方法添加对应的task到SingleThreadEventExecutor的任务队列里,如下:


  
  1. private static void doBind0(
  2. final ChannelFuture regFuture, final Channel channel,
  3. final SocketAddress localAddress, final ChannelPromise promise) {
  4. channel.eventLoop().execute( new Runnable() {
  5. @Override
  6. public void run() {
  7. if (regFuture.isSuccess()) {
  8. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  9. } else {
  10. promise.setFailure(regFuture.cause());
  11. }
  12. }
  13. });
  14. }

用户也可以通过如下方式自己添加task到TaskQueue


  
  1. EventLoop eventLoop = channel.eventLoop();
  2. eventLoop.execute( new Runnable() {
  3. @Override
  4. public void run() {
  5. //TODO
  6. }
  7. });

添加定时任务

除了上述方式,我们还可以通过如下方法添加定时任务到对应的任务队列


  
  1. EventLoop eventLoop = channel.eventLoop();
  2. eventLoop.schedule( new Runnable() {
  3. @Override
  4. public void run() {
  5. //TODO
  6. }
  7. }, 30, TimeUnit.SECONDS);

具体的实现是在父类AbstractScheduledEventExecutor里,看下对应的源码


  
  1. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
  2. ...
  3. if (delay < 0) {
  4. throw new IllegalArgumentException(
  5. String.format( "delay: %d (expected: >= 0)", delay));
  6. }
  7. return schedule( new ScheduledFutureTask<Void>(
  8. this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
  9. }
  10. <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
  11. if (inEventLoop()) {
  12. scheduledTaskQueue().add(task);
  13. } else {
  14. execute( new Runnable() {
  15. @Override
  16. public void run() {
  17. scheduledTaskQueue().add(task);
  18. }
  19. });
  20. }
  21. return task;
  22. }

会将对应的Runnable和延迟时间封装成一个新的ScheduledFutureTask,然后调用重载的schedule方法,将对应的task添加到PriorityQueue<ScheduledFutureTask<?>>的优先队列里

这里对添加定时任务的Thread进行了判断,如果调用的发起方是reactor线程,那么就直接将Task添加到优先队列中;如果是外部线程调用的schedule,会将"添加定时任务到优先队列"封装成一个Runnable也就是新的task,然后调用上面的execute方法去添加任务,这样会访问PriorityQueue的就只有reactor线程了,变成了单线程

接下来我们来详细看下这个特殊的优先队列PriorityQueue<ScheduledFutureTask<?>>,所谓的优先队列与普通队列的区别在于每个元素都被赋予了优先级。当访问元素时,会将具有最高优先级的元素最先弹出。即优先队列具有最高级先出的特征

看下这个优先队列里的元素ScheduledFutureTask,它实现了Comparable接口,定义了自己的compareTo方法,先比较deadlineNanos(也就是截止时间)的大小,如果一样则比较id,如果也相同就抛出异常


  
  1. final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
  2. private static final AtomicLong nextTaskId = new AtomicLong();
  3. private final long id = nextTaskId.getAndIncrement();
  4. @Override
  5. public int compareTo(Delayed o) {
  6. if ( this == o) {
  7. return 0;
  8. }
  9. ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
  10. long d = deadlineNanos() - that.deadlineNanos();
  11. if (d < 0) {
  12. return - 1;
  13. } else if (d > 0) {
  14. return 1;
  15. } else if (id < that.id) {
  16. return - 1;
  17. } else if (id == that.id) {
  18. throw new Error();
  19. } else {
  20. return 1;
  21. }
  22. }
  23. }

既然ScheduledFutureTask本质也是一个Runnable,那么就看下它的run方法吧
这里对于不同的类型任务进行了不同的处理,periodNanos=0表示是只执行一次的任务,>0 表示是按照指定频率定期执行的任务,<0表示是每次执行完成后,延迟一段时间再次执行的任务(二者的区别在于一个是根据上次任务开始执行的时间计算间隔,一个是按照上次任务执行结束的时间计算间隔)


  
  1. /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
  2. private final long periodNanos;
  3. @Override
  4. public void run() {
  5. if (periodNanos == 0) {
  6. if (setUncancellableInternal()) {
  7. V result = task.call();
  8. setSuccessInternal(result);
  9. }
  10. } else {
  11. if (!isCancelled()) {
  12. task.call();
  13. if (!executor().isShutdown()) {
  14. long p = periodNanos;
  15. if (p > 0) {
  16. //设置该任务的下一次截止时间为本次的截止时间加上间隔时间periodNanos
  17. deadlineNanos += p;
  18. } else {
  19. //设置下一次截止时间为当前时间加上延迟(因为p<0,所以要减去) 此时的当前时间就是本次任务之间结束的时间 task.call()是一个阻塞的方法
  20. deadlineNanos = nanoTime() - p;
  21. }
  22. if (!isCancelled()) {
  23. //将新的ScheduledFutureTask添加到任务队列等待下次执行
  24. Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
  25. ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
  26. assert scheduledTaskQueue != null;
  27. scheduledTaskQueue.add( this);
  28. }
  29. }
  30. }
  31. }
  32. }

Task任务的执行

有两个重载的runAllTasks方法,一个无参,一个带有long timeoutNanos参数,先来看下无参的方法


  
  1. protected boolean runAllTasks() {
  2. assert inEventLoop();
  3. boolean fetchedAll;
  4. boolean ranAtLeastOne = false;
  5. do {
  6. fetchedAll = fetchFromScheduledTaskQueue();
  7. if (runAllTasksFrom(taskQueue)) {
  8. ranAtLeastOne = true;
  9. }
  10. } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
  11. if (ranAtLeastOne) {
  12. lastExecutionTime = ScheduledFutureTask.nanoTime();
  13. }
  14. afterRunningAllTasks();
  15. return ranAtLeastOne;
  16. }

主要做下面三件事情:

1.将优先队列里的ScheduledFutureTask取出放到taskQueue里
2.从taskQueue里取出task并执行
3.task任务执行完毕后执行后置处理逻辑

将任务从优先队列移动到taskQueue


  
  1. private boolean fetchFromScheduledTaskQueue() {
  2. long nanoTime = AbstractScheduledEventExecutor.nanoTime();
  3. Runnable scheduledTask = pollScheduledTask(nanoTime);
  4. while (scheduledTask != null) {
  5. if (!taskQueue.offer(scheduledTask)) {
  6. // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
  7. scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
  8. return false;
  9. }
  10. scheduledTask = pollScheduledTask(nanoTime);
  11. }
  12. return true;
  13. }
  14. protected final Runnable pollScheduledTask(long nanoTime) {
  15. assert inEventLoop();
  16. Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
  17. ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
  18. if (scheduledTask == null) {
  19. return null;
  20. }
  21. if (scheduledTask.deadlineNanos() <= nanoTime) {
  22. scheduledTaskQueue.remove();
  23. return scheduledTask;
  24. }
  25. return null;
  26. }

先从scheduledTaskQueue优先队列里拿到对应优先级最高的task(截止时间最近的Task),判断当前是否已到达其截止时间,是的话就将其从优先队列中取出并删除元素,然后将其加入到taskQueue中,如果加入失败就重新加入到scheduledTaskQueue中,一直到所有的优先队列里的task都迁移成功

简单来说就是把已经到期的定时任务从PriorityQueue转移到taskQueue

执行task


  
  1. protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
  2. Runnable task = pollTaskFrom(taskQueue);
  3. if (task == null) {
  4. return false;
  5. }
  6. for (;;) {
  7. safeExecute(task);
  8. task = pollTaskFrom(taskQueue);
  9. if (task == null) {
  10. return true;
  11. }
  12. }
  13. }
  14. protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
  15. for (;;) {
  16. Runnable task = taskQueue.poll();
  17. if (task == WAKEUP_TASK) {
  18. continue;
  19. }
  20. return task;
  21. }
  22. }
  23. protected static void safeExecute(Runnable task) {
  24. try {
  25. task.run();
  26. } catch (Throwable t) {
  27. logger.warn( "A task raised an exception. Task: {}", task, t);
  28. }
  29. }

从taskQueue中取出非WAKEUP_TASK的任务,然后调用safeExecute() --内部之间调用task.run()来安全执行所有的task,一直到所有的task都执行完毕

后置处理


  
  1. @Override
  2. protected void afterRunningAllTasks() {
  3. runAllTasksFrom(tailTasks);
  4. }

当所有的task执行完毕之后,我们还可以执行一些自己的task,通过afterRunningAllTasks方法来执行在tailTasks队列里的所有任务,我们可以通过SingleThreadEventLoop的executeAfterEventLoopIteration向tailTasks里添加自己想要执行的业务逻辑

task的执行还有一个带有超时时间的重载方法,如下:


  
  1. protected boolean runAllTasks(long timeoutNanos) {
  2. fetchFromScheduledTaskQueue();
  3. //从taskQueue poll获取任务
  4. Runnable task = pollTask();
  5. if (task == null) {
  6. afterRunningAllTasks();
  7. return false;
  8. }
  9. //计算当前方法超时的截止时间
  10. final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
  11. long runTasks = 0;
  12. long lastExecutionTime;
  13. for (;;) {
  14. safeExecute(task);
  15. runTasks ++;
  16. //位运算,说明runTasks是64的倍数 0x3F=0011 1111 (63)
  17. if ((runTasks & 0x3F) == 0) {
  18. lastExecutionTime = ScheduledFutureTask.nanoTime();
  19. if (lastExecutionTime >= deadline) {
  20. break;
  21. }
  22. }
  23. task = pollTask();
  24. if (task == null) {
  25. lastExecutionTime = ScheduledFutureTask.nanoTime();
  26. break;
  27. }
  28. }
  29. afterRunningAllTasks();
  30. this.lastExecutionTime = lastExecutionTime;
  31. return true;
  32. }

基本思路和不带参数的runAllTasks一样,区别在于会考虑所有任务执行的超时时间,为了提高执行效率,每执行64个任务都会比较下当前时间是否大于runAllTasks的截止时间,是的话就退出

从上面可以看出,我们的EventLoopGroup 既需要执行 IO 操作, 又需要执行 很多的task, 因此在调用对应execute 方法添加任务的时候, 不要提交耗时任务, 更不能提交一些会造成阻塞的任务, 不然会导致我们的 IO 线程得不到调度, 影响整个程序的并发量

总结一下:

  • netty内的任务可分为普通任务和定时任务,分别保存在LinkedBlockingQueue和PriorityQueue
  • netty执行任务之前,会将已经到期的定时任务从PriorityQueue转移到LinkedBlockingQueue
  • 如果执行任务有超时时间,那么会每执行64个任务校验下是否达到截止时间

参考:
netty源码分析之揭开reactor线程的面纱(二)
netty源码分析之揭开reactor线程的面纱(三)
Netty 源码分析-EventLoop​​​​​​​


转载:https://blog.csdn.net/qq_35448165/article/details/105164861
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场