目录:apache beam 个人使用经验总结目录和入门指导(Java)
如果我们希望给某个PCollection数据集输入1个 计算后 的结果,则就需要用到旁路输入。
例如要计算某个数据集的方差,其公式为
这个计算过程就需要用到旁路输入,来将平均值u传递给数据集中的每个数字进行计算。
我们下面这个数据集为例
PCollection<Integer> numbers = pipeline.apply(Create.of(10,20,40,70,80));
创建旁路输入视窗
数据集的平均值就是旁路输入,注意这个旁路输入的值必须是单个值,即数据集聚合后的1个值。创建方式如下:
// 使用bema自带的平均值sdk计算平局值
Combine.Globally<Double, Double> combineGlobally = Mean.<Double>globally();
// 加上asSingletonView()转为旁路输入视图
PCollectionView<Double> meanView = numbers.apply(combineGlobally.asSingletonView());
向PTransform对象组装旁路输入
接着建立均值差平方的转化类
// 建立均值差值 平方的转化类
ParDo.SingleOutput<Double, Double> meanSquareTrans = ParDo.<Double, Double>of(new DoFn<Double, Double>() {
@ProcessElement
public void precessElement(ProcessContext context) {
// 根据旁路输入的引用,取出旁路输入值
Double mean = context.sideInput(meanView);
Double number = context.element();
Double answer = Math.abs(number - mean) * Math.abs(number - mean);
context.output(answer);
}
});
然后在apply组装的时候,用withSideInput将旁路输入组装进去,便可进行计算
// 组装转化类
// 注意要把meanView作为旁路输入组装进去
// 得到 |数值-平均值|^2 的数据集
PCollection<Double> meanSquare = numbers.apply(meanSquareTrans.withSideInputs(meanView));
注意事项: 如果是基于分布式计算引擎例如spark、flink, 需要确认运行在各executor时, 能否加载到meanView这个PCollectionView引用,如果加载不到,则需要作为私有成员放进DoFn子类中去获取
combine聚合操作组装旁路输入
计算方差需要把meanSquare数据集里的元素一一相加再除去元素总数,因此也要用到旁路输入。
// 计算数据集内的元素数量
PCollectionView<Long> numberCount = numbers.apply(Combine.globally(Count.<Double>combineFn()).asSingletonView());
// 如果是自定义Combine类,numberCount需要作为成员传入,否则分布式计算时无法获取
PCollection<Double> variance = meanSquare.apply(
Combine.globally(
new CaculateVarianceCombineFn(numberCount)
).withSideInputs(numberCount)
);
因为之前介绍的combine方法无法获取context,因为需要改用另一种combineFn,可以拿到上下文文本context,并获取旁路输入
static class CaculateVarianceCombineFn extends CombineWithContext.CombineFnWithContext<Double, Double, Double> {
private PCollectionView<Long> numberCountView;
public CaculateVarianceCombineFn(PCollectionView<Long> numberCountView) {
this.numberCountView = numberCountView;
}
// 主要要实现defaultValue这个接口
@Override
public Double defaultValue() {
return Double.valueOf(0);
}
@Override
public Double createAccumulator(CombineWithContext.Context c) {
return Double.valueOf(0);
}
@Override
public Double addInput(Double accumulator, Double input, CombineWithContext.Context c) {
return accumulator + input;
}
@Override
public Double mergeAccumulators(Iterable<Double> accumulators, CombineWithContext.Context c) {
Double mergeResult = createAccumulator(c);
for(Double accmulator : accumulators) {
mergeResult += accmulator;
}
return mergeResult;
}
// 计算最终聚合结果
@Override
public Double extractOutput(Double accumulator, CombineWithContext.Context c) {
Long numberCount = c.sideInput(numberCountView);
System.out.println("end result=" + accumulator / (numberCount - 1));
return accumulator / (numberCount - 1);
}
}
最终计算拓扑图
计算拓扑图如下:
转载:https://blog.csdn.net/a799581229/article/details/106557383
查看评论