小言_互联网的博客

【Flink】整合Flink和Mysql数据库,将Mysql数据库作为source和sink

270人阅读  评论(0)


Mysql作为Flink的source

创建maven工程,导包

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.2</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.6.0</hadoop.version>
        <flink.version>1.7.2</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <iheart.version>1.4.3</iheart.version>
        <fastjson.version>1.2.7</fastjson.version>
    </properties>

    <dependencies>
        <!-- 指定json/xml转对象的依赖包 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.9</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.9.3</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.11</artifactId>
            <version>2.9.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- 导入flink streaming和scala的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 导入flink和scala的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 指定flink-client API的版本 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 导入flink-table的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <!--如果要保存到hdfs,必须要排除xml-apis,因为它和dom4j冲突-->
            <exclusions>
                <exclusion>
                    <groupId>xml-apis</groupId>
                    <artifactId>xml-apis</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 指定mysql-connector的依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <!-- 指定fastjson的依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>
        <dependency>
            <groupId>com.jayway.jsonpath</groupId>
            <artifactId>json-path</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- 指定flink-connector-kafka的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.5.1</version>
                <configuration>
                    <source>${maven.compiler.source}</source>
                    <target>${maven.compiler.target}</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>

            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <!--<arg>-make:transitive</arg>-->
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>

                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
                                        -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.itcast.batch.BatchWordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

开发代码

package cn.itcast.streaming

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object Mysql2Flink {
  def main(args: Array[String]): Unit = {
    /**
     * 1.获取流处理执行环境
     * 2.获取MySQL数据源
     *    a.定义一个继承RichSourceFunction的类
     *    b.重写open方法,配置连接MySQL数据库的属性和查询语句
     *    c.重写run方法,执行查询语句
     * 3.打印数据
     */
    // 1.获取流处理执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.获取Mysql数据源
    import org.apache.flink.streaming.api.scala._
    val mysqlDataStream: DataStream[Student] = env.addSource(new SqlAsSource)

    // 3.打印数据
    mysqlDataStream.print()

    env.execute("Mysql2Flink")
  }


  // 定义一个继承RichSourceFunction的类
  class SqlAsSource extends RichParallelSourceFunction[Student] {
    // 创建mysql的connection
    private var connection: Connection = null
    private var ps: PreparedStatement = null
    private var isRunning = true

    // 重写open方法
    override def open(parameters: Configuration): Unit = {
      // 指定驱动
      val driver = "com.mysql.jdbc.Driver"
      // 创建连接所需参数 url
      val url = "jdbc:mysql://node01:3306/test"
      // 创建连接所需参数 user
      val user = "root"
      // 创建连接所需参数 password
      val password = "123456"

      Class.forName(driver)
      // 创建数据库连接
      connection = DriverManager.getConnection(url, user, password)

      // 设置sql查询语句
      val sql = "select id,name,addr,sex from student"
      ps = connection.prepareStatement(sql)
    }

    // 重写close方法,任务停止时执行
    override def close(): Unit = {

      if (connection != null) {
        connection.close()
      }

      if (ps != null) {
        ps.close()
      }
    }

    override def run(sourceContext: SourceFunction.SourceContext[Student]): Unit = {
      // 执行查询语句
      val queryRequest: ResultSet = ps.executeQuery()
      // 循环查询数据
      while (isRunning) {
        while (queryRequest.next()) {
          val stuId: Int = queryRequest.getInt("id")
          val stuName: String = queryRequest.getString("name")
          val stuAddr: String = queryRequest.getString("addr")
          val stuSex: String = queryRequest.getString("sex")
          // 将student封装后返回
          val stu = new Student(stuId, stuName, stuAddr, stuSex)
          sourceContext.collect(stu)
        }
        Thread.sleep(1000)
      }
    }

    // 任务取消时执行
    override def cancel(): Unit = {
      isRunning = false
    }
  }

  // 定义一个样例类
  case class Student(id: Int, name: String, addr: String, sex: String) {
    override def toString: String = {
      "stuId:" + id + "  stuName:" + name + "  stuAddr:" + addr + "  stuSex:" + sex
    }
  }

}


Mysql作为Flink的sink

开发代码

package cn.itcast.streaming

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object Flink2Mysql {
  def main(args: Array[String]): Unit = {
    /**
     * 1.获取流处理执行环境
     * 2.获取数据源,生产数据
     * 3.sink到mysql数据库
     * 4.打印sink的数据
     */
    // 1.获取流处理执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 2.获取数据源,生产数据
    import org.apache.flink.api.scala._
    val stuDataStream: DataStream[Student] = env.fromElements(
      Student(1, "zhangsan", "beijing", "male"),
      Student(2, "lisi", "shanghai", "female"),
      Student(3, "wangwu", "chengdu", "male")
    )

    // 3.添加sink
    stuDataStream.addSink(new SqlAsSink)

    env.execute("Flink2Mysql")
  }

  // 定义一个Student样例类,用来封装字段类型
  case class Student(id: Int, name: String, addr: String, sex: String)

  // 定义一个继承RichSinkFunction的sink下沉对象
  class SqlAsSink extends RichSinkFunction[Student] {

    private var connection: Connection = null
    private var ps: PreparedStatement = null

    // open方法只在初始化时运行一次,所以在这里面配置连接mysql的各种参数
    override def open(parameters: Configuration): Unit = {
      // 连接mysql参数
      val url = "jdbc:mysql://node01:3306/test"
      val userName = "root"
      val password = "123456"
      connection = DriverManager.getConnection(url, userName, password)
      // 查询语句
      val sql = "insert into student values(?,?,?,?);"
      ps = connection.prepareStatement(sql)
    }

    override def close(): Unit = {
      if (connection != null){
        connection.close()
      }
      if (ps != null){
        ps.close()
      }
    }

    // 每个元素的插入,都要触发一次invoke,这里主要进行invoke插入
    override def invoke(stu: Student, context: SinkFunction.Context[_]): Unit = {
      try{
      ps.setInt(1,stu.id)
      ps.setString(2,stu.name)
      ps.setString(3,stu.addr)
      ps.setString(4,stu.sex)
      /*
      executeUpdate方法:
      执行此<code>PreparedStatement</code> object中的SQL语句
      该语句必须是一个SQL数据操作语言(DML)语句
      如<code>INSERT</code>, <code>UPDATE</code>或<code>DELETE</code>;
      或者不返回任何结果的SQL语句,例如DDL语句。
       */
      ps.executeUpdate()

      }catch {
        case e:Exception => println(e.getMessage)
      }
    }
  }

}


转载:https://blog.csdn.net/CODEROOKIE_RUN/article/details/106009350
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场