飞道的博客

2021年大数据Flink(十五):流批一体API Connectors Kafka

350人阅读  评论(0)

目录

Kafka

pom依赖

参数设置

​​​​​​​参数说明

​​​​​​​Kafka命令

​​​​​​​代码实现-Kafka Consumer

​​​​​​​代码实现-Kafka Producer

​​​​​​​代码实现-实时ETL


​​​​​​​Kafka

pom依赖

Flink 里已经提供了一些绑定的 Connector,例如 kafka source 和 sink,Es sink 等。读写 kafka、es、rabbitMQ 时可以直接使用相应 connector 的 api 即可,虽然该部分是 Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

 

 

 

参数设置

以下参数都必须/建议设置上

 

1.订阅的主题

2.反序列化规则

3.消费者属性-集群地址

4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)

5.消费者属性-offset重置规则,如earliest/latest...

6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)

7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中

 

​​​​​​​参数说明

 

 

 

 

实际的生产环境中可能有这样一些需求,比如:

l场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。

l场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如何在不重启作业情况下动态感知新扩容的 partition?

针对上面的两种场景,首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时 FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。

l针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。

l针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的 partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。

 

注意:

开启 checkpoint 时 offset 是 Flink 通过状态 state 管理和恢复的,并不是从 kafka 的 offset 位置恢复。在 checkpoint 机制下,作业从最近一次checkpoint 恢复,本身是会回放部分历史数据,导致部分数据重复消费,Flink 引擎仅保证计算状态的精准一次,要想做到端到端精准一次需要依赖一些幂等的存储系统或者事务操作。

 

​​​​​​​Kafka命令

  ● 查看当前服务器中的所有topic

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper  node1:2181

  ● 创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka

  ● 查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181

  ● 删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka

  ● 通过shell命令发送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka

  ● 通过shell消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning

  ● 修改分区

 /export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181

 

​​​​​​​代码实现-Kafka Consumer


  
  1. package cn.itcast.connectors;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.api.java.tuple.Tuple;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  11. import org.apache.flink.util.Collector;
  12. import java.util.Properties;
  13. /**
  14.  * Author itcast
  15.  * Desc
  16.  * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount
  17.  * 需要设置如下参数:
  18.  * 1.订阅的主题
  19.  * 2.反序列化规则
  20.  * 3.消费者属性-集群地址
  21.  * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
  22.  * 5.消费者属性-offset重置规则,如earliest/latest...
  23.  * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
  24.  * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
  25.  */
  26. public class ConnectorsDemo_KafkaConsumer {
  27.      public static void main( String[] args) throws Exception {
  28.          //1.env
  29.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  30.          //2.Source
  31.         Properties props  = new Properties();
  32.         props.setProperty( "bootstrap.servers", "node1:9092");
  33.         props.setProperty( "group.id", "flink");
  34.         props.setProperty( "auto.offset.reset", "latest");
  35.         props.setProperty( "flink.partition-discovery.interval-millis", "5000"); //会开启一个后台线程每隔5s检测一下Kafka的分区情况
  36.         props.setProperty( "enable.auto.commit", "true");
  37.         props.setProperty( "auto.commit.interval.ms", "2000");
  38.          //kafkaSource就是KafkaConsumer
  39.         FlinkKafkaConsumer< String> kafkaSource = new FlinkKafkaConsumer<>( "flink_kafka", new SimpleStringSchema(), props);
  40.         kafkaSource.setStartFromGroupOffsets(); //设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费
  41.          //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关
  42.         DataStreamSource< String> kafkaDS = env.addSource(kafkaSource);
  43.          //3.Transformation
  44.          //3.1切割并记为1
  45.         SingleOutputStreamOperator<Tuple2< String, Integer>> wordAndOneDS = kafkaDS.flatMap( new FlatMapFunction< String, Tuple2< String, Integer>>() {
  46.              @Override
  47.              public void flatMap( String value, Collector<Tuple2< String, Integer>> out) throws Exception {
  48.                  String[] words = value.split( " ");
  49.                  for ( String word : words) {
  50.                     out.collect(Tuple2.of(word, 1));
  51.                 }
  52.             }
  53.         });
  54.          //3.2分组
  55.         KeyedStream<Tuple2< String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy( 0);
  56.          //3.3聚合
  57.         SingleOutputStreamOperator<Tuple2< String, Integer>> result = groupedDS.sum( 1);
  58.          //4.Sink
  59.         result.print();
  60.          //5.execute
  61.         env.execute();
  62.     }
  63. }

 

​​​​​​​代码实现-Kafka Producer

  • 需求:

将Flink集合中的数据通过自定义Sink保存到Kafka

  • 代码实现

  
  1. package cn.itcast.connectors;
  2. import com.alibaba.fastjson.JSON;
  3. import lombok.AllArgsConstructor;
  4. import lombok.Data;
  5. import lombok.NoArgsConstructor;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  12. import java.util.Properties;
  13. /**
  14.  * Author itcast
  15.  * Desc
  16.  * 使用自定义sink-官方提供的flink-connector-kafka_2.12-将数据保存到Kafka
  17.  */
  18. public class ConnectorsDemo_KafkaProducer {
  19.      public static void main( String[] args) throws Exception {
  20.          //1.env
  21.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22.          //2.Source
  23.         DataStreamSource<Student> studentDS = env.fromElements( new Student( 1, "tonyma", 18));
  24.          //3.Transformation
  25.          //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
  26.          //可以直接调用Student的toString,也可以转为JSON
  27.         SingleOutputStreamOperator< String> jsonDS = studentDS.map( new MapFunction<Student, String>() {
  28.              @Override
  29.              public String map(Student value) throws Exception {
  30.                  //String str = value.toString();
  31.                  String jsonStr = JSON.toJSONString(value);
  32.                  return jsonStr;
  33.             }
  34.         });
  35.          //4.Sink
  36.         jsonDS.print();
  37.          //根据参数创建KafkaProducer/KafkaSink
  38.         Properties props = new Properties();
  39.         props.setProperty( "bootstrap.servers", "node1:9092");
  40.         FlinkKafkaProducer< String> kafkaSink = new FlinkKafkaProducer<>( "flink_kafka",   new SimpleStringSchema(),  props);
  41.         jsonDS.addSink(kafkaSink);
  42.          //5.execute
  43.         env.execute();
  44.          // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
  45.     }
  46.      @Data
  47.      @NoArgsConstructor
  48.      @AllArgsConstructor
  49.      public static class Student {
  50.          private Integer id;
  51.          private String name;
  52.          private Integer age;
  53.     }
  54. }

 

​​​​​​​代码实现-实时ETL


  
  1. package cn.itcast.connectors;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FilterFunction;
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  9. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  10. import java.util.Properties;
  11. /**
  12.  * Author itcast
  13.  * Desc 演示Flink-Connectors-KafkaComsumer/Source + KafkaProducer/Sink
  14.  */
  15. public class KafkaETLDemo {
  16.      public static void main( String[] args) throws Exception {
  17.          //TODO 0.env
  18.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  19.         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  20.          //TODO 1.source
  21.          //准备kafka连接参数
  22.         Properties props  = new Properties();
  23.         props.setProperty( "bootstrap.servers", "node1:9092"); //集群地址
  24.         props.setProperty( "group.id", "flink"); //消费者组id
  25.         props.setProperty( "auto.offset.reset", "latest"); //latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
  26.         props.setProperty( "flink.partition-discovery.interval-millis", "5000"); //会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
  27.         props.setProperty( "enable.auto.commit", "true"); //自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
  28.         props.setProperty( "auto.commit.interval.ms", "2000"); //自动提交的时间间隔
  29.          //使用连接参数创建FlinkKafkaConsumer/kafkaSource
  30.         FlinkKafkaConsumer< String> kafkaSource = new FlinkKafkaConsumer< String>( "flink_kafka", new SimpleStringSchema(), props);
  31.          //使用kafkaSource
  32.         DataStream< String> kafkaDS = env.addSource(kafkaSource);
  33.          //TODO 2.transformation
  34.         SingleOutputStreamOperator< String> etlDS = kafkaDS.filter( new FilterFunction< String>() {
  35.              @Override
  36.              public boolean filter( String value) throws Exception {
  37.                  return value.contains( "success");
  38.             }
  39.         });
  40.          //TODO 3.sink
  41.         etlDS.print();
  42.         Properties props2 = new Properties();
  43.         props2.setProperty( "bootstrap.servers", "node1:9092");
  44.         FlinkKafkaProducer< String> kafkaSink = new FlinkKafkaProducer<>( "flink_kafka2", new SimpleStringSchema(), props2);
  45.         etlDS.addSink(kafkaSink);
  46.          //TODO 4.execute
  47.         env.execute();
  48.     }
  49. }
  50. //控制台生成者 ---> flink_kafka主题 --> Flink -->etl ---> flink_kafka2主题--->控制台消费者
  51. //准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
  52. //准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka2
  53. //启动控制台生产者发送数据 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
  54. //log:2020-10-10 success xxx
  55. //log:2020-10-10 success xxx
  56. //log:2020-10-10 success xxx
  57. //log:2020-10-10 fail xxx
  58. //启动控制台消费者消费数据 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2 --from-beginning
  59. //启动程序FlinkKafkaConsumer
  60. //观察控制台输出结果

 


转载:https://blog.csdn.net/xiaoweite1/article/details/116245568
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场