本文主要想了解两个地方:如何窃取任务、task如何等待(join)
代码基于 OpenJDK 12
窃取算法(work-stealing)
从ForkJoin-Paper-DougLea(paper见文末)中可以看出:
到了这里就可以知道,窃取任务从其他线程队列的尾部窃取的了。
窃取算法优缺点
工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。
工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
Task 等待(join)
Join方法的主要作用是阻塞当前线程并等待获取结果。具体代码如下:
1 2 3 4 5 6
|
public final V join() { int s; if (((s = doJoin()) & ABNORMAL) != 0) reportException(s); return getRawResult(); }
|
首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。
让我们再来分析一下doJoin()方法的实现代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
/** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and * unfork+exec. Others are relayed to ForkJoinPool.awaitJoin. * * @return status upon completion */ private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return //已完成,返回status (s = status) < 0 ? s : //未完成,如果当前线程是ForkJoinWorkerThread,从该线程中取出workQueue,并尝试将 //当前task出队然后执行,执行的结果是完成则返回状态,否则使用当线程池所在的ForkJoinPool的awaitJoin方法等待 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : //当前线程不是ForkJoinWorkerThread,调用externalAwaitDone方法 //externalAwaitDone: Blocks a non-worker-thread until completion. externalAwaitDone(); } /** * Pops the given task only if it is at the current top. */ final boolean tryUnpush(ForkJoinTask<?> task) { boolean popped = false; int s, cap; ForkJoinTask<?>[] a; if ((a = array) != null && (cap = a.length) > 0 && (s = top) != base && (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null))) TOP.setOpaque(this, s); return popped; } /** * Primary execution method for stolen tasks. Unless done, calls * exec and records status if completed, but doesn't wait for * completion otherwise. * * @return status on exit from this method */ final int doExec() { int s; boolean completed; // 仅未完成的任务会运行,其他情况会忽略. if ((s = status) >= 0) { try { //exec是abstract方法 //调用ForkJoinTask子类中exec completed = exec(); } catch (Throwable rex) { completed = false; s = setExceptionalCompletion(rex); } if (completed) s = setDone(); } return s; }
|
在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;如果没有执行完,则从任务队列中取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。
ForkJoin-Paper-DougLea paper:
https://www.jiankunking.com/attachments/ForkJoin-Paper-DougLea.pdf
< END >
喜欢就点个在看 or 转发个朋友圈呗
衣舞晨风