飞道的博客

Flink DataSet API

389人阅读  评论(0)

一.简介

DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理。Flink先将接入数据(如可以通过读取文本或从本地集合)来创建转换成DataSet数据集,并行分布在集群的每个节点上;然后将DataSet数据集进行各种转换操作(map,filter,union,group等)最后通过DataSink操作将结果数据集输出到外部系统。

流程

  • 获得一个执行环境(ExecutionEnvironment)
  • 加载/创建初始数据 (Source)
  • 指定转换算子操作数据(Transformation)
  • 指定存放结果位置(Sink)

二.示例

广播变量

flink 支持广播变量,就是将数据广播到具体taskManager上,数据存储在内存中,这样可以减缓大量的shuffle操作。

def setBroadcast(env: ExecutionEnvironment): Unit ={
   
  import org.apache.flink.api.scala._
  val toBroadcast  = env.fromElements(1,2,3)
  val data = env.fromElements("a","b")
  /**
   * RichMapFunction 富函数上下文信息
   */
  val result = data.map(new RichMapFunction[String,String](){
   
    var mList: mutable.Buffer[String] = _
    override def open(config: Configuration): Unit = {
   
      import scala.collection.JavaConverters._
      mList  = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
    }
    override def map(in: String): String = {
   
      in +"--->广播数据"+mList.toString()
    }
  }).withBroadcastSet(toBroadcast,"broadcastSetName")
  result.print()
}

读文件

def main(args: Array[String]): Unit = {
   
  val env = ExecutionEnvironment.getExecutionEnvironment

  val filePath = "D:\\workspace\\open_source\\flinkmoduletest\\src\\main\\resources\\file.txt"
  val line =  env.readTextFile(filePath)
  import org.apache.flink.api.scala._
  val value = line.flatMap(x=>{
   
    x.split(" ")
  })

  println("函数")
  line.flatMap(new MyFun).collect().foreach(println(_))
}
class MyFun extends FlatMapFunction[String, String]{
   
  override def flatMap(value: String, out: Collector[String]): Unit = {
   
    val s = value.split(" ")
    for (e <- s){
   
      out.collect(e)
    }
  }
}
def fromCollection(env: ExecutionEnvironment) ={
   
  import org.apache.flink.api.scala._
  val data = 1 to 10
  env.fromCollection(data).print()
}

存储文件

val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = 1 to 10
val text = env.fromCollection(data)
val filePath = "fileResult.txt"
text.writeAsText(filePath,WriteMode.OVERWRITE)
env.execute("SinkApp")

常用转换算子

/**
 * 笛卡尔积
 * @param env
 */
def crossFunction(env:ExecutionEnvironment): Unit ={
   
  val info1 = List("曼联","曼城")
  val info2 = List(3,1,0)
  import org.apache.flink.api.scala._
  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)
  data1.cross(data2).print()
}
/**
 *  全连接
 * @param env
 */
def outerFullOuterJoinFunction(env:ExecutionEnvironment): Unit ={
   
  val info1 = ListBuffer[(Int, String)]() // 编号  名字
  info1.append((1,"PK哥"))
  info1.append((2,"J哥"))
  info1.append((3,"小队长"))
  info1.append((4,"猪头呼"))
  val info2 = ListBuffer[(Int, String)]() // 编号  城市
  info2.append((1, "北京"))
  info2.append((2, "上海"))
  info2.append((3, "成都"))
  info2.append((5, "杭州"))
  import org.apache.flink.api.scala._
  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)

  data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
   
    if (first == null) {
   
      (second._1, "-", second._2)
    } else if (second == null) {
   
      (first._1, first._2, "-")
    } else {
   
      (first._1, first._2, second._2)
    }
  }).print()
}
/**
 * 左外连接
 * @param env
 */
def outerLeftOuterJoinFunction(env:ExecutionEnvironment): Unit ={
   
  val info1 = ListBuffer[(Int, String)]() // 编号  名字
  info1.append((1,"PK哥"))
  info1.append((2,"J哥"))
  info1.append((3,"小队长"))
  info1.append((4,"猪头呼"))
  val info2 = ListBuffer[(Int, String)]() // 编号  城市
  info2.append((1, "北京"))
  info2.append((2, "上海"))
  info2.append((3, "成都"))
  info2.append((5, "杭州"))
  import org.apache.flink.api.scala._
  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)
  data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
   
    if(second == null){
   
      (first._1,first._2,"-")
    }else{
   
      (first._1,first._2,second._2)
    }
  }).print()
}
/**
 * 右外连接
 * @param env
 */
def outerRightOuterJoin(env:ExecutionEnvironment): Unit ={
   
  val info1 = ListBuffer[(Int, String)]() // 编号  名字
  info1.append((1,"PK哥"))
  info1.append((2,"J哥"))
  info1.append((3,"小队长"))
  info1.append((4,"猪头呼"))
  val info2 = ListBuffer[(Int, String)]() // 编号  城市
  info2.append((1, "北京"))
  info2.append((2, "上海"))
  info2.append((3, "成都"))
  info2.append((5, "杭州"))
  import org.apache.flink.api.scala._
  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)
  data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
   
    if(first == null){
   
      (second._1,"-",second._2)
    }else{
   
      (first._1,first._2,second._2)
    }
  }).print()
}
/**
 * 内连接
 * @param env
 */
def joinFunction(env:ExecutionEnvironment): Unit ={
   
  val info1 = ListBuffer[(Int, String)]() // 编号  名字
  info1.append((1, "P哥"));
  info1.append((2, "J哥"));
  info1.append((3, "小队长"));
  info1.append((4, "猪头呼"));
  val info2 = ListBuffer[(Int, String)]() // 编号  城市
  info2.append((1, "北京"));
  info2.append((2, "上海"));
  info2.append((3, "成都"));
  info2.append((5, "杭州"));
  import org.apache.flink.api.scala._

  val data1 = env.fromCollection(info1)
  val data2 = env.fromCollection(info2)
  //where(0).equalTo(0) 分别代表两个数据集要进行join的字段
  //(first, second) 分别代表的两个数据集
  data1.join(data2).where(0).equalTo(0).apply((first, second) => {
   
    (first._1, first._2, second._2)
  }).print()
}
/**
 * 去重
 */
def distinctFunction(env:ExecutionEnvironment): Unit ={
   
  val  info = ListBuffer[String]()
  info.append("hadoop,spark")
  info.append("hadoop,flink")
  info.append("flink,flink")
  import org.apache.flink.api.scala._
  val data = env.fromCollection(info)

  data.flatMap(_.split(",")).distinct().print()
}
/**
 * flatMap 将一个拆分多个
 * @param env
 */
def flatMapFunction(env: ExecutionEnvironment): Unit ={
   
  val info = ListBuffer[String]()
  info.append("hadoop,spark")
  info.append("hadoop,flink")
  info.append("flink,flink")
  import org.apache.flink.api.scala._
  val data = env.fromCollection(info)
  data.flatMap(_.split(",")).map((_, 1)).groupBy(0).sum(1).print()
}
/**
 * 转换算子  first获取前n个元素
 * @param env
 */
def firstFunction(env: ExecutionEnvironment): Unit ={
   
  val info = ListBuffer[(Int, String)]()
  info.append((1, "Hadoop"))
  info.append((1, "Spark"))
  info.append((1, "Flink"))
  info.append((2, "Java"))
  info.append((2, "Spring Boot"))
  info.append((3, "Linux"))
  info.append((4, "VUE"))
  import org.apache.flink.api.scala._
  val data = env.fromCollection(info)
  println("first")
  data.first(5).print()
  println("first groupby1")
  data.groupBy(0).first(2).print()
  println("first groupby2")
  data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print()

}
/**
 * DataSource 100个元素,把结果存储到数据库中
 * @param env
 */
def mapPartitionFunction(env: ExecutionEnvironment): Unit ={
   
  val students = new ListBuffer[String]
  for (i <- 1 to 100) {
   
    students.append("student: " + i)
  }
  import org.apache.flink.api.scala._
  //设置并行度
  val data = env.fromCollection(students).setParallelism(5)
  //每个分区获取一个connection
  data.mapPartition(x => {
   
    val connection = DBUtils.getConection()
    println(connection + "....")
    DBUtils.returnConnection(connection)
    x
  }).print()
}
/**
 * filter 转换算子
 * @param env
 */
def filterFunction(env: ExecutionEnvironment): Unit ={
   
  import org.apache.flink.api.scala._
  env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    .map(_ + 1)
    .filter(_ > 5)
    .print()
}
/**
 * map 转换算子
 */
def mapFunction(env: ExecutionEnvironment): Unit = {
   
  import org.apache.flink.api.scala._
  val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  // 对data中的每个元素都做一个+1的操作
  // data.map((x:Int) => x + 1).print()
  // data.map((x) => x + 1).print()
  // data.map(x => x + 1).print()
  data.map(_ + 1).print()
}

分布式缓存

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。

当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。其实分布式缓存就相当于spark的广播,把一个变量广播到所有的executor上,也可以看做是Flink的广播流,只不过这里广播的是一个文件。

def main(args: Array[String]): Unit = {
   
  val env = ExecutionEnvironment.getExecutionEnvironment
  val filePath = "fileCache.txt"
  //step1: 注册一个本地/HDFS文件
  env.registerCachedFile(filePath,"pk-scala-dc")
  import org.apache.flink.api.scala._
  val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")
  data.map(new RichMapFunction[String,String] {
   
    // step2:在open方法中获取到分布式缓存的内容即可
    override def open(parameters: Configuration): Unit = {
   
      val dcFile = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc")
      val lines = FileUtils.readLines(dcFile)
      /**
       * 此时会出现一个异常:java集合和scala集合不兼容的问题
       */
      import scala.collection.JavaConverters._
      for (ele <- lines.asScala) {
   
        println(ele)
      }
    }
    override def map(value: String): String = value
  }).print()
}

转载:https://blog.csdn.net/qq_19968255/article/details/108671805
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场