飞道的博客

2021年大数据Flink(十二):流批一体API Transformation

376人阅读  评论(0)

目录

Transformation

官网API列表

基本操作-略

map

flatMap

​​​​​​​keyBy

​​​​​​​filter

​​​​​​​sum

​​​​​​​reduce

​​​​​​​代码演示

​​​​​​​合并-拆分

​​​​​​​union和connect

​​​​​​​split、select和Side Outputs

分区

rebalance重平衡分区

​​​​​​​其他分区


Transformation

官网API列表

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/

 

 

整体来说,流式数据上的操作可以分为四类。

l第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)

l第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理

l第三类是对多个流进行操作并转换为单个流。例如,多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。

l最后, DataStream 还支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理。

 

基本操作-略

map

  • API

map:将函数作用在集合中的每一个元素上,并返回作用后的结果

 

 

​​​​​​​flatMap

  • API

flatMap:将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

 

 

​​​​​​​keyBy

按照指定的key来对流中的数据进行分组,前面入门案例中已经演示过

注意:

流处理中没有groupBy,而是keyBy

 

​​​​​​​filter

  • API

filter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

 

​​​​​​​sum

  • API

sum:按照指定的字段对集合中的元素进行求和

​​​​​​​reduce

  • API

reduce:对集合中的元素进行聚合

 

​​​​​​​代码演示

  • 需求:

对流数据中的单词进行统计,排除敏感词TMD

  • 代码演示

  
  1. package cn.itcast.transformation;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FilterFunction;
  4. import org.apache.flink.api.common.functions.FlatMapFunction;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.common.functions.ReduceFunction;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.KeyedStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.util.Collector;
  12. /**
  13.  * Author itcast
  14.  * Desc
  15.  */
  16. public class TransformationDemo01 {
  17.      public static void main( String[] args) throws Exception {
  18.          //1.env
  19.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  21.          //2.source
  22.         DataStream< String> linesDS = env.socketTextStream( "node1", 9999);
  23.          //3.处理数据-transformation
  24.         DataStream< String> wordsDS = linesDS.flatMap( new FlatMapFunction< String, String>() {
  25.             @Override
  26.              public void flatMap( String value, Collector< String> out) throws Exception {
  27.                  //value就是一行行的数据
  28.                  String[] words = value.split( " ");
  29.                  for ( String word : words) {
  30.                     out.collect(word); //将切割处理的一个个的单词收集起来并返回
  31.                 }
  32.             }
  33.         });
  34.         DataStream< String> filtedDS = wordsDS.filter( new FilterFunction< String>() {
  35.             @Override
  36.              public boolean filter( String value) throws Exception {
  37.                  return !value.equals( "heihei");
  38.             }
  39.         });
  40.         DataStream<Tuple2< String, Integer>> wordAndOnesDS = filtedDS.map( new MapFunction< String, Tuple2< String, Integer>>() {
  41.             @Override
  42.              public Tuple2< String, Integer> map( String value) throws Exception {
  43.                  //value就是进来一个个的单词
  44.                  return Tuple2.of(value, 1);
  45.             }
  46.         });
  47.          //KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);
  48.         KeyedStream<Tuple2< String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);
  49.         DataStream<Tuple2< String, Integer>> result1 = groupedDS.sum( 1);
  50.         DataStream<Tuple2< String, Integer>> result2 = groupedDS.reduce( new ReduceFunction<Tuple2< String, Integer>>() {
  51.             @Override
  52.              public Tuple2< String, Integer> reduce(Tuple2< String, Integer> value1, Tuple2< String, Integer> value2) throws Exception {
  53.                  return Tuple2.of(value1.f0, value1.f1 + value1.f1);
  54.             }
  55.         });
  56.          //4.输出结果-sink
  57.         result1. print( "result1");
  58.         result2. print( "result2");
  59.          //5.触发执行-execute
  60.         env.execute();
  61.     }
  62. }

 

​​​​​​​合并-拆分

​​​​​​​union和connect

  • API

union:

union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。

 

connect:

connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

connect只能连接两个数据流,union可以连接多个数据流。

connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

 

  • 需求

将两个String类型的流进行union

将一个String类型和一个Long类型的流进行connect

 

  • 代码实现

  
  1. package cn.itcast.transformation;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
  7. /**
  8.  * Author itcast
  9.  * Desc
  10.  */
  11. public class TransformationDemo02 {
  12.      public static void main( String[] args) throws Exception {
  13.          //1.env
  14.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  16.          //2.Source
  17.         DataStream< String> ds1 = env.fromElements( "hadoop", "spark", "flink");
  18.         DataStream< String> ds2 = env.fromElements( "hadoop", "spark", "flink");
  19.         DataStream<Long> ds3 = env.fromElements( 1L, 2L, 3L);
  20.          //3.Transformation
  21.         DataStream< String> result1 = ds1.union(ds2); //合并但不去重 https://blog.csdn.net/valada/article/details/104367378
  22.         ConnectedStreams< String, Long> tempResult = ds1.connect(ds3);
  23.          //interface CoMapFunction<IN1, IN2, OUT>
  24.         DataStream< String> result2 = tempResult.map( new CoMapFunction< String, Long, String>() {
  25.              @Override
  26.              public String map1( String value) throws Exception {
  27.                  return "String->String:" + value;
  28.             }
  29.              @Override
  30.              public String map2(Long value) throws Exception {
  31.                  return "Long->String:" + value.toString();
  32.             }
  33.         });
  34.          //4.Sink
  35.         result1.print();
  36.         result2.print();
  37.          //5.execute
  38.         env.execute();
  39.     }
  40. }

 

​​​​​​​split、select和Side Outputs

  • API

Split就是将一个流分成多个流

Select就是获取分流后对应的数据

注意:split函数已过期并移除

 

Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中

 

  • 需求:

对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

 

  • 代码实现:

  
  1. package cn.itcast.transformation;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.ProcessFunction;
  9. import org.apache.flink.util.Collector;
  10. import org.apache.flink.util.OutputTag;
  11. /**
  12.  * Author itcast
  13.  * Desc
  14.  */
  15. public class TransformationDemo03 {
  16.      public static void main( String[] args) throws Exception {
  17.          //1.env
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  20.          //2.Source
  21.         DataStreamSource< Integer> ds = env.fromElements( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  22.          //3.Transformation
  23.          /*SplitStream<Integer> splitResult = ds.split(new OutputSelector<Integer>() {
  24.             @Override
  25.             public Iterable<String> select(Integer value) {
  26.                 //value是进来的数字
  27.                 if (value % 2 == 0) {
  28.                     //偶数
  29.                     ArrayList<String> list = new ArrayList<>();
  30.                     list.add("偶数");
  31.                     return list;
  32.                 } else {
  33.                     //奇数
  34.                     ArrayList<String> list = new ArrayList<>();
  35.                     list.add("奇数");
  36.                     return list;
  37.                 }
  38.             }
  39.         });
  40.         DataStream<Integer> evenResult = splitResult.select("偶数");
  41.         DataStream<Integer> oddResult = splitResult.select("奇数");*/
  42.          //定义两个输出标签
  43.         OutputTag< Integer> tag_even = new OutputTag< Integer>( "偶数", TypeInformation.of( Integer. class));
  44.         OutputTag< Integer> tag_odd = new OutputTag< Integer>( "奇数"){};
  45.          //对ds中的数据进行处理
  46.         SingleOutputStreamOperator< Integer> tagResult = ds.process( new ProcessFunction< Integer, Integer>() {
  47.             @Override
  48.              public void processElement( Integer value, Context ctx, Collector< Integer> out) throws Exception {
  49.                  if (value % 2 == 0) {
  50.                      //偶数
  51.                     ctx.output(tag_even, value);
  52.                 } else {
  53.                      //奇数
  54.                     ctx.output(tag_odd, value);
  55.                 }
  56.             }
  57.         });
  58.          //取出标记好的数据
  59.         DataStream< Integer> evenResult = tagResult.getSideOutput(tag_even);
  60.         DataStream< Integer> oddResult = tagResult.getSideOutput(tag_odd);
  61.          //4.Sink
  62.         evenResult. print( "偶数");
  63.         oddResult. print( "奇数");
  64.          //5.execute
  65.         env.execute();
  66.     }
  67. }

 

分区

rebalance重平衡分区

  • API

类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;

 

所以在实际的工作中,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)

 

  • 代码演示:

  
  1. package cn.itcast.transformation;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FilterFunction;
  4. import org.apache.flink.api.common.functions.RichMapFunction;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. /**
  9.  * Author itcast
  10.  * Desc
  11.  */
  12. public class TransformationDemo04 {
  13.      public static void main( String[] args) throws Exception {
  14.          //1.env
  15.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC).setParallelism( 3);
  17.          //2.source
  18.         DataStream<Long> longDS = env.fromSequence( 0, 100);
  19.          //3.Transformation
  20.          //下面的操作相当于将数据随机分配一下,有可能出现数据倾斜
  21.         DataStream<Long> filterDS = longDS.filter( new FilterFunction<Long>() {
  22.             @Override
  23.              public boolean filter(Long num) throws Exception {
  24.                  return num > 10;
  25.             }
  26.         });
  27.          //接下来使用map操作,将数据转为(分区编号/子任务编号, 数据)
  28.          //Rich表示多功能的,比MapFunction要多一些API可以供我们使用
  29.         DataStream<Tuple2< Integer, Integer>> result1 = filterDS
  30.                 .map( new RichMapFunction<Long, Tuple2< Integer, Integer>>() {
  31.                     @Override
  32.                      public Tuple2< Integer, Integer> map(Long value) throws Exception {
  33.                          //获取分区编号/子任务编号
  34.                          int id = getRuntimeContext().getIndexOfThisSubtask();
  35.                          return Tuple2.of(id, 1);
  36.                     }
  37.                 }).keyBy(t -> t.f0).sum( 1);
  38.         DataStream<Tuple2< Integer, Integer>> result2 = filterDS.rebalance()
  39.                 .map( new RichMapFunction<Long, Tuple2< Integer, Integer>>() {
  40.                     @Override
  41.                      public Tuple2< Integer, Integer> map(Long value) throws Exception {
  42.                          //获取分区编号/子任务编号
  43.                          int id = getRuntimeContext().getIndexOfThisSubtask();
  44.                          return Tuple2.of(id, 1);
  45.                     }
  46.                 }).keyBy(t -> t.f0).sum( 1);
  47.          //4.sink
  48.          //result1.print();//有可能出现数据倾斜
  49.         result2. print(); //在输出前进行了rebalance重分区平衡,解决了数据倾斜
  50.          //5.execute
  51.         env.execute();
  52.     }
  53. }

 

​​​​​​​其他分区

  • API

 

 

说明:

recale分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。

举例:

上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

 

  • 需求:

对流中的元素使用各种分区,并输出

 

  • 代码实现

  
  1. package cn.itcast.transformation;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.Partitioner;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.util.Collector;
  10. /**
  11.  * Author itcast
  12.  * Desc
  13.  */
  14. public class TransformationDemo05 {
  15.      public static void main( String[] args) throws Exception {
  16.          //1.env
  17.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  19.          //2.Source
  20.         DataStream< String> linesDS = env.readTextFile( "data/input/words.txt");
  21.         SingleOutputStreamOperator<Tuple2< String, Integer>> tupleDS = linesDS.flatMap( new FlatMapFunction< String, Tuple2< String, Integer>>() {
  22.             @Override
  23.              public void flatMap( String value, Collector<Tuple2< String, Integer>> out) throws Exception {
  24.                  String[] words = value.split( " ");
  25.                  for ( String word : words) {
  26.                     out.collect(Tuple2.of(word, 1));
  27.                 }
  28.             }
  29.         });
  30.          //3.Transformation
  31.         DataStream<Tuple2< String, Integer>> result1 = tupleDS. global();
  32.         DataStream<Tuple2< String, Integer>> result2 = tupleDS.broadcast();
  33.         DataStream<Tuple2< String, Integer>> result3 = tupleDS.forward();
  34.         DataStream<Tuple2< String, Integer>> result4 = tupleDS.shuffle();
  35.         DataStream<Tuple2< String, Integer>> result5 = tupleDS.rebalance();
  36.         DataStream<Tuple2< String, Integer>> result6 = tupleDS.rescale();
  37.         DataStream<Tuple2< String, Integer>> result7 = tupleDS.partitionCustom( new Partitioner< String>() {
  38.             @Override
  39.              public int partition( String key, int numPartitions) {
  40.                  return key.equals( "hello") ? 0 : 1;
  41.             }
  42.         }, t -> t.f0);
  43.          //4.sink
  44.          //result1.print();
  45.          //result2.print();
  46.          //result3.print();
  47.          //result4.print();
  48.          //result5.print();
  49.          //result6.print();
  50.         result7. print();
  51.          //5.execute
  52.         env.execute();
  53.     }
  54. }

 


转载:https://blog.csdn.net/xiaoweite1/article/details/116212964
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场