飞道的博客

apache beam入门之旁路输入

371人阅读  评论(0)

目录:apache beam 个人使用经验总结目录和入门指导(Java)

如果我们希望给某个PCollection数据集输入1个 计算后 的结果,则就需要用到旁路输入。

例如要计算某个数据集的方差,其公式为

这个计算过程就需要用到旁路输入,来将平均值u传递给数据集中的每个数字进行计算。

我们下面这个数据集为例

PCollection<Integer> numbers = pipeline.apply(Create.of(10,20,40,70,80));

创建旁路输入视窗

数据集的平均值就是旁路输入,注意这个旁路输入的值必须是单个值,即数据集聚合后的1个值。创建方式如下:

// 使用bema自带的平均值sdk计算平局值
Combine.Globally<Double, Double> combineGlobally = Mean.<Double>globally();
// 加上asSingletonView()转为旁路输入视图
PCollectionView<Double> meanView = numbers.apply(combineGlobally.asSingletonView());

向PTransform对象组装旁路输入

接着建立均值差平方的转化类

// 建立均值差值 平方的转化类
ParDo.SingleOutput<Double, Double> meanSquareTrans = ParDo.<Double, Double>of(new DoFn<Double, Double>() {
    @ProcessElement
    public void precessElement(ProcessContext context) {
        // 根据旁路输入的引用,取出旁路输入值
        Double mean = context.sideInput(meanView);
        Double number = context.element();

        Double answer = Math.abs(number - mean) * Math.abs(number - mean);
        context.output(answer);
    }
});

然后在apply组装的时候,用withSideInput将旁路输入组装进去,便可进行计算

// 组装转化类
// 注意要把meanView作为旁路输入组装进去
// 得到 |数值-平均值|^2 的数据集
PCollection<Double> meanSquare = numbers.apply(meanSquareTrans.withSideInputs(meanView));

注意事项: 如果是基于分布式计算引擎例如spark、flink, 需要确认运行在各executor时, 能否加载到meanView这个PCollectionView引用,如果加载不到,则需要作为私有成员放进DoFn子类中去获取

combine聚合操作组装旁路输入

计算方差需要把meanSquare数据集里的元素一一相加再除去元素总数,因此也要用到旁路输入。

// 计算数据集内的元素数量
PCollectionView<Long> numberCount = numbers.apply(Combine.globally(Count.<Double>combineFn()).asSingletonView());

// 如果是自定义Combine类,numberCount需要作为成员传入,否则分布式计算时无法获取
PCollection<Double> variance = meanSquare.apply(
	Combine.globally(
		new CaculateVarianceCombineFn(numberCount)
	).withSideInputs(numberCount)
);

因为之前介绍的combine方法无法获取context,因为需要改用另一种combineFn,可以拿到上下文文本context,并获取旁路输入

static class CaculateVarianceCombineFn extends CombineWithContext.CombineFnWithContext<Double, Double, Double> {
    private PCollectionView<Long> numberCountView;

    public CaculateVarianceCombineFn(PCollectionView<Long> numberCountView) {
        this.numberCountView = numberCountView;
    }

    // 主要要实现defaultValue这个接口
    @Override
    public Double defaultValue() {
        return Double.valueOf(0);
    }

    @Override
    public Double createAccumulator(CombineWithContext.Context c) {
        return Double.valueOf(0);
    }

    @Override
    public Double addInput(Double accumulator, Double input, CombineWithContext.Context c) {
        return accumulator + input;
    }

    @Override
    public Double mergeAccumulators(Iterable<Double> accumulators, CombineWithContext.Context c) {
        Double mergeResult = createAccumulator(c);
        for(Double accmulator : accumulators) {
            mergeResult += accmulator;
        }
        return mergeResult;
    }

    // 计算最终聚合结果
    @Override
    public Double extractOutput(Double accumulator, CombineWithContext.Context c) {
        Long numberCount = c.sideInput(numberCountView);
        System.out.println("end result=" + accumulator / (numberCount - 1));
        return accumulator / (numberCount - 1);
    }
}

最终计算拓扑图

计算拓扑图如下:

numbers
mean
meanSquare
count
variance

转载:https://blog.csdn.net/a799581229/article/details/106557383
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场