Mysql作为Flink的source
创建maven工程,导包
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.2</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<hadoop.version>2.6.0</hadoop.version>
<flink.version>1.7.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<iheart.version>1.4.3</iheart.version>
<fastjson.version>1.2.7</fastjson.version>
</properties>
<dependencies>
<!-- 指定json/xml转对象的依赖包 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 导入flink streaming和scala的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 导入flink和scala的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 指定flink-client API的版本 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 导入flink-table的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<!--如果要保存到hdfs,必须要排除xml-apis,因为它和dom4j冲突-->
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 指定mysql-connector的依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- 指定fastjson的依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.3.0</version>
</dependency>
<!-- 指定flink-connector-kafka的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!--<arg>-make:transitive</arg>-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
-->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.itcast.batch.BatchWordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
开发代码
package cn.itcast.streaming
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Mysql2Flink {
def main(args: Array[String]): Unit = {
/**
* 1.获取流处理执行环境
* 2.获取MySQL数据源
* a.定义一个继承RichSourceFunction的类
* b.重写open方法,配置连接MySQL数据库的属性和查询语句
* c.重写run方法,执行查询语句
* 3.打印数据
*/
// 1.获取流处理执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2.获取Mysql数据源
import org.apache.flink.streaming.api.scala._
val mysqlDataStream: DataStream[Student] = env.addSource(new SqlAsSource)
// 3.打印数据
mysqlDataStream.print()
env.execute("Mysql2Flink")
}
// 定义一个继承RichSourceFunction的类
class SqlAsSource extends RichParallelSourceFunction[Student] {
// 创建mysql的connection
private var connection: Connection = null
private var ps: PreparedStatement = null
private var isRunning = true
// 重写open方法
override def open(parameters: Configuration): Unit = {
// 指定驱动
val driver = "com.mysql.jdbc.Driver"
// 创建连接所需参数 url
val url = "jdbc:mysql://node01:3306/test"
// 创建连接所需参数 user
val user = "root"
// 创建连接所需参数 password
val password = "123456"
Class.forName(driver)
// 创建数据库连接
connection = DriverManager.getConnection(url, user, password)
// 设置sql查询语句
val sql = "select id,name,addr,sex from student"
ps = connection.prepareStatement(sql)
}
// 重写close方法,任务停止时执行
override def close(): Unit = {
if (connection != null) {
connection.close()
}
if (ps != null) {
ps.close()
}
}
override def run(sourceContext: SourceFunction.SourceContext[Student]): Unit = {
// 执行查询语句
val queryRequest: ResultSet = ps.executeQuery()
// 循环查询数据
while (isRunning) {
while (queryRequest.next()) {
val stuId: Int = queryRequest.getInt("id")
val stuName: String = queryRequest.getString("name")
val stuAddr: String = queryRequest.getString("addr")
val stuSex: String = queryRequest.getString("sex")
// 将student封装后返回
val stu = new Student(stuId, stuName, stuAddr, stuSex)
sourceContext.collect(stu)
}
Thread.sleep(1000)
}
}
// 任务取消时执行
override def cancel(): Unit = {
isRunning = false
}
}
// 定义一个样例类
case class Student(id: Int, name: String, addr: String, sex: String) {
override def toString: String = {
"stuId:" + id + " stuName:" + name + " stuAddr:" + addr + " stuSex:" + sex
}
}
}
Mysql作为Flink的sink
开发代码
package cn.itcast.streaming
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object Flink2Mysql {
def main(args: Array[String]): Unit = {
/**
* 1.获取流处理执行环境
* 2.获取数据源,生产数据
* 3.sink到mysql数据库
* 4.打印sink的数据
*/
// 1.获取流处理执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2.获取数据源,生产数据
import org.apache.flink.api.scala._
val stuDataStream: DataStream[Student] = env.fromElements(
Student(1, "zhangsan", "beijing", "male"),
Student(2, "lisi", "shanghai", "female"),
Student(3, "wangwu", "chengdu", "male")
)
// 3.添加sink
stuDataStream.addSink(new SqlAsSink)
env.execute("Flink2Mysql")
}
// 定义一个Student样例类,用来封装字段类型
case class Student(id: Int, name: String, addr: String, sex: String)
// 定义一个继承RichSinkFunction的sink下沉对象
class SqlAsSink extends RichSinkFunction[Student] {
private var connection: Connection = null
private var ps: PreparedStatement = null
// open方法只在初始化时运行一次,所以在这里面配置连接mysql的各种参数
override def open(parameters: Configuration): Unit = {
// 连接mysql参数
val url = "jdbc:mysql://node01:3306/test"
val userName = "root"
val password = "123456"
connection = DriverManager.getConnection(url, userName, password)
// 查询语句
val sql = "insert into student values(?,?,?,?);"
ps = connection.prepareStatement(sql)
}
override def close(): Unit = {
if (connection != null){
connection.close()
}
if (ps != null){
ps.close()
}
}
// 每个元素的插入,都要触发一次invoke,这里主要进行invoke插入
override def invoke(stu: Student, context: SinkFunction.Context[_]): Unit = {
try{
ps.setInt(1,stu.id)
ps.setString(2,stu.name)
ps.setString(3,stu.addr)
ps.setString(4,stu.sex)
/*
executeUpdate方法:
执行此<code>PreparedStatement</code> object中的SQL语句
该语句必须是一个SQL数据操作语言(DML)语句
如<code>INSERT</code>, <code>UPDATE</code>或<code>DELETE</code>;
或者不返回任何结果的SQL语句,例如DDL语句。
*/
ps.executeUpdate()
}catch {
case e:Exception => println(e.getMessage)
}
}
}
}
转载:https://blog.csdn.net/CODEROOKIE_RUN/article/details/106009350
查看评论