小言_互联网的博客

Java8 Stream API 之 IntPipeline(三) 源码解析

394人阅读  评论(0)

目录

1、reduce / collect

2、sum / min / max / count /  average / summaryStatistics

3、AbstractTask 

4、ReduceTask

5、AbstractShortCircuitTask

6、FindTask

7、MatchTask

8、Spliterator


       本篇博客继续上一篇《Java8 Stream API 之 IntPipeline(二) 源码解析》 讲解reduce / collect等其他方法的实现。

1、reduce / collect

      这两个方法可用于实现获取元素的最小值,最大值,总和和平均值等类似场景,参考下面一节,这两个方法的实现如下:


  
  1. @Override
  2. public final int reduce(int identity, IntBinaryOperator op) {
  3. //identity标识起始值,如果不传默认使用第一个元素作为起始值
  4. return evaluate(ReduceOps.makeInt(identity, op));
  5. }
  6. @Override
  7. public final OptionalInt reduce(IntBinaryOperator op) {
  8. return evaluate(ReduceOps.makeInt(op));
  9. }
  10. @Override
  11. public final <R> R collect(Supplier<R> supplier,
  12. ObjIntConsumer<R> accumulator,
  13. BiConsumer<R, R> combiner) {
  14. Objects.requireNonNull(combiner);
  15. BinaryOperator<R> operator = (left, right) -> {
  16. combiner.accept(left, right);
  17. return left;
  18. };
  19. return evaluate(ReduceOps.makeInt(supplier, accumulator, operator));
  20. }
  21. public static TerminalOp<Integer, Integer>
  22. makeInt (int identity, IntBinaryOperator operator) {
  23. Objects.requireNonNull(operator);
  24. class ReducingSink
  25. implements AccumulatingSink< Integer, Integer, ReducingSink>, Sink. OfInt {
  26. private int state;
  27. @Override
  28. public void begin(long size) {
  29. //identity是方法的入参
  30. state = identity;
  31. }
  32. @Override
  33. public void accept(int t) {
  34. //把state作为参数传递给operator方法,并更新state为执行结果
  35. state = operator.applyAsInt(state, t);
  36. }
  37. @Override
  38. public Integer get() {
  39. return state;
  40. }
  41. @Override
  42. //并行时调用,将不同线程处理的结果做合并
  43. public void combine(ReducingSink other) {
  44. accept(other.state);
  45. }
  46. }
  47. return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
  48. @Override
  49. public ReducingSink makeSink() {
  50. return new ReducingSink();
  51. }
  52. };
  53. }
  54. private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
  55. implements TerminalOp< T, R> {
  56. private final StreamShape inputShape;
  57. ReduceOp(StreamShape shape) {
  58. inputShape = shape;
  59. }
  60. public abstract S makeSink();
  61. @Override
  62. public StreamShape inputShape() {
  63. return inputShape;
  64. }
  65. @Override
  66. public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
  67. Spliterator<P_IN> spliterator) {
  68. //串行处理时调用
  69. return helper.wrapAndCopyInto(makeSink(), spliterator).get();
  70. }
  71. @Override
  72. public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
  73. Spliterator<P_IN> spliterator) {
  74. //并行处理时调用
  75. return new ReduceTask<>( this, helper, spliterator).invoke().get();
  76. }
  77. }
  78. private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
  79. extends TerminalSink< T, R> {
  80. public void combine(K other);
  81. }
  82. public static TerminalOp<Integer, OptionalInt>
  83. makeInt (IntBinaryOperator operator) {
  84. Objects.requireNonNull(operator);
  85. class ReducingSink
  86. implements AccumulatingSink< Integer, OptionalInt, ReducingSink>, Sink. OfInt {
  87. private boolean empty;
  88. private int state;
  89. public void begin(long size) {
  90. empty = true;
  91. state = 0;
  92. }
  93. @Override
  94. public void accept(int t) {
  95. if (empty) {
  96. //接受的第一个元素
  97. empty = false;
  98. state = t;
  99. }
  100. else {
  101. state = operator.applyAsInt(state, t);
  102. }
  103. }
  104. @Override
  105. public OptionalInt get() {
  106. return empty ? OptionalInt.empty() : OptionalInt.of(state);
  107. }
  108. @Override
  109. public void combine(ReducingSink other) {
  110. if (!other.empty) //other非空则执行合并
  111. accept(other.state);
  112. }
  113. }
  114. return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
  115. @Override
  116. public ReducingSink makeSink() {
  117. return new ReducingSink();
  118. }
  119. };
  120. }
  121. //combiner表示并行处理时合并不同线程处理结果的逻辑
  122. public static <R> TerminalOp<Integer, R>
  123. makeInt (Supplier<R> supplier,
  124. ObjIntConsumer<R> accumulator,
  125. BinaryOperator<R> combiner) {
  126. Objects.requireNonNull(supplier);
  127. Objects.requireNonNull(accumulator);
  128. Objects.requireNonNull(combiner);
  129. class ReducingSink extends Box<R>
  130. implements AccumulatingSink< Integer, R, ReducingSink>, Sink. OfInt {
  131. @Override
  132. public void begin(long size) {
  133. state = supplier.get();
  134. }
  135. @Override
  136. public void accept(int t) {
  137. //注意state并不会更新
  138. accumulator.accept(state, t);
  139. }
  140. @Override
  141. public void combine(ReducingSink other) {
  142. //执行合并的逻辑
  143. state = combiner.apply(state, other.state);
  144. }
  145. }
  146. return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
  147. @Override
  148. public ReducingSink makeSink() {
  149. return new ReducingSink();
  150. }
  151. };
  152. }
  153. private static abstract class Box<U> {
  154. U state;
  155. Box() {} // Avoid creation of special accessor
  156. public U get() {
  157. return state;
  158. }
  159. }

2、sum / min / max / count /  average / summaryStatistics


  
  1. @Override
  2. public final int sum() {
  3. return reduce( 0, Integer::sum);
  4. }
  5. @Override
  6. public final OptionalInt min() {
  7. return reduce(Math::min);
  8. }
  9. @Override
  10. public final OptionalInt max() {
  11. return reduce(Math::max);
  12. }
  13. @Override
  14. public final long count() {
  15. return mapToLong(e -> 1L).sum();
  16. }
  17. @Override
  18. public final OptionalDouble average() {
  19. long[] avg = collect(() -> new long[ 2], //初始的state是一个长度为2的long数组
  20. (ll, i) -> { //state就是ll,i表示流中的待处理元素
  21. ll[ 0]++; //统计元素个数
  22. ll[ 1] += i; //统计总的值
  23. },
  24. (ll, rr) -> { //并行处理时不同线程的处理结果累加
  25. ll[ 0] += rr[ 0];
  26. ll[ 1] += rr[ 1];
  27. });
  28. //collect方法是返回state值,即构造的长度为2的数组,索引为0的元素大于0说明流中元素不为空
  29. return avg[ 0] > 0
  30. ? OptionalDouble.of(( double) avg[ 1] / avg[ 0])
  31. : OptionalDouble.empty();
  32. }
  33. @Override
  34. public final IntSummaryStatistics summaryStatistics() {
  35. //state是一个IntSummaryStatistics实例,每次遍历流中元素时都调用其accept方法
  36. return collect(IntSummaryStatistics:: new, IntSummaryStatistics::accept,
  37. IntSummaryStatistics::combine);
  38. }
  39. public class IntSummaryStatistics implements IntConsumer {
  40. private long count;
  41. private long sum;
  42. private int min = Integer.MAX_VALUE;
  43. private int max = Integer.MIN_VALUE;
  44. public IntSummaryStatistics() { }
  45. @Override
  46. public void accept(int value) {
  47. ++count; //元素个数
  48. sum += value; //总和
  49. min = Math.min(min, value); //最小值
  50. max = Math.max(max, value); //最大值
  51. }
  52. public void combine(IntSummaryStatistics other) {
  53. //同其他线程的处理结果累加
  54. count += other.count;
  55. sum += other.sum;
  56. min = Math.min(min, other.min);
  57. max = Math.max(max, other.max);
  58. }
  59. public final long getCount() {
  60. return count;
  61. }
  62. public final long getSum() {
  63. return sum;
  64. }
  65. public final int getMin() {
  66. return min;
  67. }
  68. public final int getMax() {
  69. return max;
  70. }
  71. public final double getAverage() {
  72. return getCount() > 0 ? ( double) getSum() / getCount() : 0.0d;
  73. }
  74. @Override
  75. public String toString() {
  76. return String.format(
  77. "%s{count=%d, sum=%d, min=%d, average=%f, max=%d}",
  78. this.getClass().getSimpleName(),
  79. getCount(),
  80. getSum(),
  81. getMin(),
  82. getAverage(),
  83. getMax());
  84. }
  85. }

3、AbstractTask 

      AbstractTask是并行流处理的基类,其类继承关系如下:

我们以reduce / collect方法对应的并行流处理实现类 ReduceTask为例来说明其实现。AbstractTask定义的属性如下:


  
  1. //关联的流
  2. protected final PipelineHelper<P_OUT> helper;
  3. /**
  4. * 关联流的Spliterator实现
  5. */
  6. protected Spliterator<P_IN> spliterator;
  7. /**
  8. * 一个子任务的处理的流元素的个数,按照总元素个数除以4倍的核数来估算
  9. */
  10. protected long targetSize; // may be laziliy initialized
  11. /**
  12. * 左子任务,如果非空,则右子任务也非空
  13. */
  14. protected K leftChild;
  15. /**
  16. * 右子任务,如果非空,则右子任务也非空
  17. */
  18. protected K rightChild;
  19. /**
  20. * 任务执行的结果
  21. */
  22. private R localResult;

 其构造方法如下:


  
  1. //用来创建根节点
  2. protected AbstractTask(PipelineHelper<P_OUT> helper,
  3. Spliterator<P_IN> spliterator) {
  4. super( null);
  5. this.helper = helper;
  6. this.spliterator = spliterator;
  7. this.targetSize = 0L; //默认为0,惰性初始化,在实际的并行处理开始时才会计算
  8. }
  9. //用来创建子任务节点
  10. protected AbstractTask(K parent,
  11. Spliterator<P_IN> spliterator) {
  12. super(parent);
  13. this.spliterator = spliterator;
  14. this.helper = parent.helper;
  15. this.targetSize = parent.targetSize;
  16. }

  AbstractTask只有两个抽象方法需要子类实现,如下:

makeChild用来创建子任务,doLeaf用来计算子节点的执行结果,其核心方法是完成实际并行处理的compute方法,如下:


  
  1. public void compute() {
  2. Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
  3. long sizeEstimate = rs.estimateSize();
  4. long sizeThreshold = getTargetSize(sizeEstimate);
  5. boolean forkRight = false;
  6. @SuppressWarnings( "unchecked") K task = (K) this;
  7. //如果当前元素的总个数大于sizeThreshold,说明可以进一步切分子任务
  8. //trySplit不等于null说明切分成功,trySplit返回一半子任务,rs对应另一半待执行的子任务
  9. while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
  10. K leftChild, rightChild, taskToFork;
  11. //创建左右子任务
  12. task.leftChild = leftChild = task.makeChild(ls);
  13. task.rightChild = rightChild = task.makeChild(rs);
  14. //当前线程会处理其中一个子任务,只有另外一个子任务会提交到线程池处理,所以pendingCount是1
  15. task.setPendingCount( 1);
  16. if (forkRight) {
  17. forkRight = false;
  18. //由当前线程继续切分左子任务
  19. rs = ls;
  20. task = leftChild;
  21. //将右子任务提交到线程池
  22. taskToFork = rightChild;
  23. }
  24. else {
  25. forkRight = true;
  26. //由当前线程继续切分右子任务
  27. task = rightChild;
  28. //将左子任务提交到线程池
  29. taskToFork = leftChild;
  30. }
  31. taskToFork.fork();
  32. sizeEstimate = rs.estimateSize();
  33. }
  34. //无法继续切分了,则执行子任务并设置子任务的执行结果
  35. task.setLocalResult(task.doLeaf());
  36. //将当前子任务标记为已完成,当某个父任务下的子任务都执行完成会回调onCompletion方法将左右子任务都置为null
  37. task.tryComplete();
  38. }
  39. protected final long getTargetSize(long sizeEstimate) {
  40. long s;
  41. return ((s = targetSize) != 0 ? s : //如果targetSize不为0则直接返回,否则通过suggestTargetSize初始化
  42. (targetSize = suggestTargetSize(sizeEstimate)));
  43. }
  44. public static long suggestTargetSize(long sizeEstimate) {
  45. long est = sizeEstimate / LEAF_TARGET;
  46. return est > 0L ? est : 1L;
  47. }
  48. static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
  49. protected void setLocalResult(R localResult) {
  50. this.localResult = localResult;
  51. }
  52. @Override
  53. public void onCompletion(CountedCompleter<?> caller) {
  54. spliterator = null;
  55. leftChild = rightChild = null;
  56. }

4、ReduceTask

      ReduceTask继承自AbstractTask,用于实现并行的reduce或者collect方法,其调用如下:

其实现如下:


  
  1. @SuppressWarnings( "serial")
  2. private static final class ReduceTask<P_IN, P_OUT, R,
  3. S extends AccumulatingSink< P_OUT, R, S>>
  4. extends AbstractTask< P_IN, P_OUT, S, ReduceTask< P_IN, P_OUT, R, S>> {
  5. private final ReduceOp<P_OUT, R, S> op;
  6. //helper和spliterator都是调用reduce或者collect方法的流及其Spliterator实现
  7. //由evaluateParallel方法使用
  8. ReduceTask(ReduceOp<P_OUT, R, S> op,
  9. PipelineHelper<P_OUT> helper,
  10. Spliterator<P_IN> spliterator) {
  11. super(helper, spliterator);
  12. this.op = op;
  13. }
  14. //创建子任务makeChild方法使用
  15. ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
  16. Spliterator<P_IN> spliterator) {
  17. super(parent, spliterator);
  18. this.op = parent.op;
  19. }
  20. @Override
  21. protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
  22. //创建一个新的子任务
  23. return new ReduceTask<>( this, spliterator);
  24. }
  25. @Override
  26. protected S doLeaf() {
  27. //任务无法进一步切分了,则需要执行该任务,wrapAndCopyInto方法会遍历spliterator中包含的函数,传递给Sink
  28. return helper.wrapAndCopyInto(op.makeSink(), spliterator);
  29. }
  30. @Override
  31. //左右子任务节点都执行完成后会回调此方法
  32. public void onCompletion(CountedCompleter<?> caller) {
  33. if (!isLeaf()) {
  34. //获取左子任务节点的执行结果
  35. S leftResult = leftChild.getLocalResult();
  36. //同右子任务节点的执行结果做合并
  37. leftResult.combine(rightChild.getLocalResult());
  38. //设置当前父任务的执行结果
  39. setLocalResult(leftResult);
  40. }
  41. //调用父类方法,将spliterator, left and right child置为null
  42. super.onCompletion(caller);
  43. }
  44. }
  45. //是否叶子节点,左右子任务节点都为null时为叶子节点
  46. protected boolean isLeaf() {
  47. return leftChild == null;
  48. }

5、AbstractShortCircuitTask

     AbstractShortCircuitTask继承自AbstractTask,表示一个在满足特定条件后会终止流元素遍历的并行任务,如findAny方法,找到一个满足条件的元素就终止遍历返回true,其实现如下:


  
  1. @SuppressWarnings( "serial")
  2. abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,
  3. K extends AbstractShortCircuitTask< P_IN, P_OUT, R, K>>
  4. extends AbstractTask< P_IN, P_OUT, R, K> {
  5. /**
  6. * 执行的结果
  7. */
  8. protected final AtomicReference<R> sharedResult;
  9. /**
  10. * 遍历任务是否被取消了
  11. */
  12. protected volatile boolean canceled;
  13. /**
  14. * 创建根节点任务
  15. */
  16. protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper,
  17. Spliterator<P_IN> spliterator) {
  18. super(helper, spliterator);
  19. sharedResult = new AtomicReference<>( null);
  20. }
  21. /**
  22. * 创建子节点任务
  23. */
  24. protected AbstractShortCircuitTask(K parent,
  25. Spliterator<P_IN> spliterator) {
  26. super(parent, spliterator);
  27. sharedResult = parent.sharedResult;
  28. }
  29. /**
  30. * 返回默认值
  31. */
  32. protected abstract R getEmptyResult();
  33. /**
  34. * 执行并行任务的核心逻辑,重写了父类的逻辑
  35. */
  36. @Override
  37. public void compute() {
  38. Spliterator<P_IN> rs = spliterator, ls;
  39. long sizeEstimate = rs.estimateSize();
  40. //计算单个子任务的元素个数
  41. long sizeThreshold = getTargetSize(sizeEstimate);
  42. boolean forkRight = false;
  43. @SuppressWarnings( "unchecked") K task = (K) this;
  44. AtomicReference<R> sr = sharedResult;
  45. R result;
  46. while ((result = sr.get()) == null) {
  47. //如果未获取满足条件的结果
  48. if (task.taskCanceled()) { //如果任务已取消,则返回默认值
  49. result = task.getEmptyResult();
  50. break;
  51. }
  52. //如果当前任务的元素个数较少或者无法继续切分了,则调用doLeaf执行当前任务并终止循环
  53. if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
  54. result = task.doLeaf();
  55. break;
  56. }
  57. K leftChild, rightChild, taskToFork;
  58. //创建左右子任务节点
  59. task.leftChild = leftChild = task.makeChild(ls);
  60. task.rightChild = rightChild = task.makeChild(rs);
  61. task.setPendingCount( 1);
  62. if (forkRight) {
  63. forkRight = false;
  64. //继续切分左子任务,将右子任务提交到线程池处理
  65. rs = ls;
  66. task = leftChild;
  67. taskToFork = rightChild;
  68. }
  69. else {
  70. //继续切分右子任务,将左子任务提交到线程池处理
  71. forkRight = true;
  72. task = rightChild;
  73. taskToFork = leftChild;
  74. }
  75. taskToFork.fork();
  76. sizeEstimate = rs.estimateSize();
  77. }
  78. //保存任务的执行结果
  79. task.setLocalResult(result);
  80. //将当前任务标记为已执行完成
  81. task.tryComplete();
  82. }
  83. /**
  84. * 设置执行结果
  85. */
  86. protected void shortCircuit(R result) {
  87. if (result != null)
  88. sharedResult.compareAndSet( null, result);
  89. }
  90. /**
  91. * 保存执行结果
  92. */
  93. @Override
  94. protected void setLocalResult(R localResult) {
  95. if (isRoot()) {
  96. //如果是根节点,则设置sharedResult
  97. if (localResult != null)
  98. sharedResult.compareAndSet( null, localResult);
  99. }
  100. else
  101. //非根节点,调用父类的setLocalResult
  102. super.setLocalResult(localResult);
  103. }
  104. /**
  105. * 获取执行结果
  106. */
  107. @Override
  108. public R getRawResult() {
  109. return getLocalResult();
  110. }
  111. @Override
  112. public R getLocalResult() {
  113. if (isRoot()) {
  114. //根节点时,获取sharedResult中的执行结果
  115. R answer = sharedResult.get();
  116. return (answer == null) ? getEmptyResult() : answer;
  117. }
  118. else
  119. return super.getLocalResult();
  120. }
  121. /**
  122. * 将当前任务标记为已取消
  123. */
  124. protected void cancel() {
  125. canceled = true;
  126. }
  127. /**
  128. * 判断当前任务是否已取消,如果没有则向上遍历判断父节点任务是否被取消了
  129. */
  130. protected boolean taskCanceled() {
  131. boolean cancel = canceled;
  132. if (!cancel) {
  133. //cancel为false时则遍历父节点,看有没有父节点的canceled是否为true
  134. for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())
  135. cancel = parent.canceled;
  136. }
  137. return cancel;
  138. }
  139. protected void cancelLaterNodes() {
  140. // Go up the tree, cancel right siblings of this node and all parents
  141. for ( @SuppressWarnings( "unchecked") K parent = getParent(), node = (K) this;
  142. parent != null;
  143. node = parent, parent = parent.getParent()) {
  144. //向上遍历直到根节点,将所有的右子节点设置为已取消
  145. if (parent.leftChild == node) {
  146. K rightSibling = parent.rightChild;
  147. if (!rightSibling.canceled)
  148. rightSibling.cancel(); //如果未取消,则取消掉
  149. }
  150. }
  151. }
  152. }
  153. //父节点为null,则是根节点
  154. protected boolean isRoot() {
  155. return getParent() == null;
  156. }
  157. protected K getParent() {
  158. return (K) getCompleter();
  159. }

6、FindTask

      FindTask用来实现findFirst / findAny的并行处理,其调用如下:

 其实现如下:


  
  1. @SuppressWarnings( "serial")
  2. private static final class FindTask<P_IN, P_OUT, O>
  3. extends AbstractShortCircuitTask< P_IN, P_OUT, O, FindTask< P_IN, P_OUT, O>> {
  4. private final FindOp<P_OUT, O> op;
  5. //evaluateParallel方法使用
  6. FindTask(FindOp<P_OUT, O> op,
  7. PipelineHelper<P_OUT> helper,
  8. Spliterator<P_IN> spliterator) {
  9. super(helper, spliterator);
  10. this.op = op;
  11. }
  12. //下面的创建子任务makeChild方法使用
  13. FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) {
  14. super(parent, spliterator);
  15. this.op = parent.op;
  16. }
  17. @Override
  18. protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) {
  19. return new FindTask<>( this, spliterator);
  20. }
  21. @Override
  22. protected O getEmptyResult() { //返回默认值
  23. return op.emptyValue;
  24. }
  25. //只有mustFindFirst为true时调用此方法
  26. private void foundResult(O answer) {
  27. if (isLeftmostNode())
  28. //如果是最左边的子任务节点,则设置sharedResult
  29. shortCircuit(answer);
  30. else
  31. //当前节点不是最左边的子任务节点,则取消后续的子任务节点
  32. cancelLaterNodes();
  33. }
  34. @Override
  35. protected O doLeaf() {
  36. //执行子任务并获取结果
  37. O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();
  38. if (!op.mustFindFirst) { //如果mustFindFirst为false,即为findAny方法
  39. if (result != null)
  40. shortCircuit(result); //尝试设置sharedResult,如果成功则会终止子任务的切割
  41. return null;
  42. }
  43. else {
  44. //如果mustFindFirst为true,即为findFirst方法
  45. if (result != null) {
  46. foundResult(result);
  47. return result;
  48. }
  49. else
  50. return null;
  51. }
  52. }
  53. @Override
  54. //子任务都执行完成时,回调此方法
  55. public void onCompletion(CountedCompleter<?> caller) {
  56. if (op.mustFindFirst) {
  57. for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;
  58. p = child, child = rightChild) {
  59. //child先是左子任务节点,然后是右子任务节点,然后因为p等于child终止遍历
  60. O result = child.getLocalResult();
  61. if (result != null && op.presentPredicate.test(result)) {
  62. //如果子任务节点的执行结果满足要求,则保存执行结果并终止遍历
  63. setLocalResult(result);
  64. foundResult(result);
  65. break;
  66. }
  67. }
  68. }
  69. //调用父类方法,将leftChild,rightChild等置为null
  70. super.onCompletion(caller);
  71. }
  72. }
  73. //判断是否最左边的节点
  74. protected boolean isLeftmostNode() {
  75. @SuppressWarnings( "unchecked")
  76. K node = (K) this;
  77. while (node != null) {
  78. K parent = node.getParent();
  79. if (parent != null && parent.leftChild != node)
  80. return false; //如果node不是parent的左节点则返回false
  81. node = parent;
  82. }
  83. return true;
  84. }

7、MatchTask

      MatchTask用来实现anyMatch / allMatch / noneMatch的并行处理,其实现如下:


  
  1. private static final class MatchTask<P_IN, P_OUT>
  2. extends AbstractShortCircuitTask< P_IN, P_OUT, Boolean, MatchTask< P_IN, P_OUT>> {
  3. private final MatchOp<P_OUT> op;
  4. /**
  5. * evaluateParallel方法使用,创建根节点
  6. */
  7. MatchTask(MatchOp<P_OUT> op, PipelineHelper<P_OUT> helper,
  8. Spliterator<P_IN> spliterator) {
  9. super(helper, spliterator);
  10. this.op = op;
  11. }
  12. /**
  13. * makeChild方法使用,创建子任务节点
  14. */
  15. MatchTask(MatchTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
  16. super(parent, spliterator);
  17. this.op = parent.op;
  18. }
  19. @Override
  20. protected MatchTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
  21. return new MatchTask<>( this, spliterator);
  22. }
  23. @Override
  24. protected Boolean doLeaf() {
  25. //执行子任务
  26. boolean b = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).getAndClearState();
  27. if (b == op.matchKind.shortCircuitResult)
  28. //如果执行结果满足条件,则设置sharedResult,终止子任务切割
  29. shortCircuit(b);
  30. return null;
  31. }
  32. @Override
  33. protected Boolean getEmptyResult() {
  34. return !op.matchKind.shortCircuitResult;
  35. }
  36. }

8、Spliterator

     Spliterator是Java8引入的接口,其定义的方法的调用场景和用途在之前的源码分析中已经陆续的说明了,此处做一个总结,并以ArrayList中Spliterator接口的实现类ArrayListSpliterator进一步说明实现该接口的相关细节。所有必须实现的方法说明如下:

  • tryAdvance:用来处理单个元素,会通过一个外层循环来调用tryAdvance实现所有流元素的处理
  • trySplit:用来切分当前任务的,只有并行处理时调用,该方法返回一个新的切分子任务,剩余的一半待处理任务还由原来的Spliterator实例处理
  • estimateSize:获取待处理的流元素的个数,如果数量未知则返回-1
  • characteristics:获取当前流元素的特点,比如是否排序,是否个数有限等,每个特点对应一个特定的位。

可使用的表示流元素的特点的常量如下:


  
  1. //流元素遍历时的顺序是固定的,比如List中的元素
  2. public static final int ORDERED = 0x00000010;
  3. //流元素经过去重的
  4. public static final int DISTINCT = 0x00000001;
  5. //流元素已经经过排序了,比如SortedSet中的元素
  6. public static final int SORTED = 0x00000004;
  7. //流元素的个数是有限的
  8. public static final int SIZED = 0x00000040;
  9. //流元素都是非空的
  10. public static final int NONNULL = 0x00000100;
  11. //流元素在遍历的过程中不能被修改
  12. public static final int IMMUTABLE = 0x00000400;
  13. //流元素在遍历的过程中可以被并发的线程安全的修改
  14. public static final int CONCURRENT = 0x00001000;
  15. //表明当前流中的元素是一个切分出来的子流
  16. public static final int SUBSIZED = 0x00004000;

ArrayListSpliterator的实现如下:


  
  1. @Override
  2. public Spliterator<E> spliterator() {
  3. //expectedModCount传递的是0,在具体遍历时会初始化成一个大于0的值
  4. return new ArrayListSpliterator<>( this, 0, - 1, 0);
  5. }
  6. /** Index-based split-by-two, lazily initialized Spliterator */
  7. static final class ArrayListSpliterator<E> implements Spliterator<E> {
  8. private final ArrayList<E> list; //关联的list
  9. private int index; //下一个遍历的元素的索引
  10. private int fence; //允许遍历的最大索引,-1表示无限制,即遍历所有的数组元素
  11. private int expectedModCount; //保存初始化时的ModCount,如果遍历时此属性与list的modCount不一致则抛出异常
  12. /** Create new spliterator covering the given range */
  13. ArrayListSpliterator(ArrayList<E> list, int origin, int fence,
  14. int expectedModCount) {
  15. this.list = list; // OK if null unless traversed
  16. this.index = origin;
  17. this.fence = fence;
  18. this.expectedModCount = expectedModCount;
  19. }
  20. private int getFence() { // initialize fence to size on first use
  21. int hi; // (a specialized variant appears in method forEach)
  22. ArrayList<E> lst;
  23. if ((hi = fence) < 0) { //fence为-1时
  24. if ((lst = list) == null)
  25. hi = fence = 0;
  26. else {
  27. //不为null时初始化expectedModCount和fence
  28. expectedModCount = lst.modCount;
  29. hi = fence = lst.size;
  30. }
  31. }
  32. return hi;
  33. }
  34. public ArrayListSpliterator<E> trySplit() {
  35. //获取中间值
  36. int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
  37. return (lo >= mid) ? null : // divide range in half unless too small
  38. //返回一个新的实例,遍历的元素范围是从index到mid之间,当前实例的index被修改成mid,元素范围就是mid到hi
  39. new ArrayListSpliterator<E>(list, lo, index = mid,
  40. expectedModCount);
  41. }
  42. public boolean tryAdvance(Consumer<? super E> action) {
  43. if (action == null)
  44. throw new NullPointerException();
  45. int hi = getFence(), i = index;
  46. if (i < hi) {
  47. //修改index
  48. index = i + 1;
  49. //获取原index对应的元素并调用action
  50. @SuppressWarnings( "unchecked") E e = (E)list.elementData[i];
  51. action.accept(e);
  52. if (list.modCount != expectedModCount) //list发生修改了抛出异常
  53. throw new ConcurrentModificationException();
  54. return true;
  55. }
  56. return false;
  57. }
  58. //改写了默认的forEachRemaining实现
  59. public void forEachRemaining(Consumer<? super E> action) {
  60. int i, hi, mc; // hoist accesses and checks from loop
  61. ArrayList<E> lst; Object[] a;
  62. if (action == null)
  63. throw new NullPointerException();
  64. if ((lst = list) != null && (a = lst.elementData) != null) {
  65. if ((hi = fence) < 0) { //fence为-1时
  66. mc = lst.modCount;
  67. hi = lst.size;
  68. }
  69. else
  70. //fence大于0时
  71. mc = expectedModCount;
  72. if ((i = index) >= 0 && (index = hi) <= a.length) {
  73. //遍历index到fence之间的元素
  74. for (; i < hi; ++i) {
  75. @SuppressWarnings( "unchecked") E e = (E) a[i];
  76. action.accept(e);
  77. }
  78. //如果没有发生修改则返回,否则抛出异常
  79. if (lst.modCount == mc)
  80. return;
  81. }
  82. }
  83. throw new ConcurrentModificationException();
  84. }
  85. public long estimateSize() {
  86. return ( long) (getFence() - index);
  87. }
  88. public int characteristics() {
  89. return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED;
  90. }
  91. }

 


转载:https://blog.csdn.net/qq_31865983/article/details/106795114
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场