- 环境
- 解析
- 完整代码
- 源码下载
简单实现从一个数据源获取数据,根据特定标识字段,将数据sink到不同mysql表中。
| 组件 | 版本 |
|---|---|
| scala | 2.12 |
| netcat | * |
| kafka | * |
| flink | 1.13.3 |
创建环境、输入流,从socket获取输入数据
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.socketTextStream("server120", 9999)
测试表、测试数据。 将stu开头的数据插入到stu表,project开头的表,插入到project表。
--测试表 create table stu(sid int,name varchar(25))charset=utf8; create table project(pid int,sid int,pro varchar(25),score int)charset=utf8; --测试数据 stu,1001,张三 stu,1002,李四 project,10001,1001,语文,90 project,10002,1002,语文,99 project,10003,1001,数学,79 project,10004,1002,数学,120
加工数据,根据数据的第一个字符串,将数据拆分到不同的侧输出流中
val resultStream = inputStream
.filter(x => {
x != null && !"".equals(x)
})//filter 算子,过滤为空数据
.map(v => {
v
})//map算子,未进行任何处理
.process(new ProcessFunction[String, String]() {//process算子,定义输入输出类型为String
override def processElement(v: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
val vs: Array[String] = v.split(",")//根据逗号分割
vs(0).toString().toLowerCase match {
case "stu" => {//第一个值匹配stu,走这个逻辑
ctx.output(new OutputTag[String]("stu"), v)
}
case "project" => {//第一个值匹配project,走这个逻辑
ctx.output(new OutputTag[String]("project"), v)
}
}
}
})
获取侧输出流
val stu = new OutputTag[String]("stu")
val project = new OutputTag[String]("project")
//获取侧输出流
val stuStream = resultStream.getSideOutput(stu)
val projectStream = resultStream.getSideOutput(project)
定义stu数据流的sink
//定义Jdbc sink
stuStream.addSink(JdbcSink.sink("insert into stu (sid,name) values(?,?)",
new JdbcStatementBuilder[String]() {
override def accept(t: PreparedStatement, u: String): Unit = {
t.setInt(1, Integer.parseInt(u.split(",")(1)))
t.setString(2, u.split(",")(2))
}
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
.withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("flink_test")
.withPassword("flink_test")
.build()))
定义project流的sink
//定义Jdbc sink
projectStream.addSink(JdbcSink.sink("insert into project (pid,sid,pro,score) values(?,?,?,?)",
new JdbcStatementBuilder[String]() {
override def accept(t: PreparedStatement, u: String): Unit = {
t.setInt(1, Integer.parseInt(u.split(",")(1)))
t.setInt(2, Integer.parseInt(u.split(",")(2)))
t.setString(3, u.split(",")(3).toString)
t.setInt(4, Integer.parseInt(u.split(",")(4)))
}
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
.withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("flink_test")
.withPassword("flink_test")
.build()))
阻塞进程,等待数据
env.execute()完整代码
package com.z.sink
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import java.sql.PreparedStatement
object SocketSinkMysql {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.socketTextStream("server120", 9999)
val resultStream = inputStream
.filter(x => {
x != null && !"".equals(x)
})//filter 算子,过滤为空数据
.map(v => {
v
})//map算子,未进行任何处理
.process(new ProcessFunction[String, String]() {//process算子,定义输入输出类型为String
override def processElement(v: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
val vs: Array[String] = v.split(",")//根据逗号分割
vs(0).toString().toLowerCase match {
case "stu" => {//第一个值匹配stu,走这个逻辑
ctx.output(new OutputTag[String]("stu"), v)
}
case "project" => {//第一个值匹配project,走这个逻辑
ctx.output(new OutputTag[String]("project"), v)
}
}
}
})
//定义侧输出流
val stu = new OutputTag[String]("stu")
val project = new OutputTag[String]("project")
//获取侧输出流
val stuStream = resultStream.getSideOutput(stu)
val projectStream = resultStream.getSideOutput(project)
//定义Jdbc sink
stuStream.addSink(JdbcSink.sink("insert into stu (sid,name) values(?,?)",
new JdbcStatementBuilder[String]() {
override def accept(t: PreparedStatement, u: String): Unit = {
t.setInt(1, Integer.parseInt(u.split(",")(1)))
t.setString(2, u.split(",")(2))
}
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
.withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("flink_test")
.withPassword("flink_test")
.build()))
//定义Jdbc sink
projectStream.addSink(JdbcSink.sink("insert into project (pid,sid,pro,score) values(?,?,?,?)",
new JdbcStatementBuilder[String]() {
override def accept(t: PreparedStatement, u: String): Unit = {
t.setInt(1, Integer.parseInt(u.split(",")(1)))
t.setInt(2, Integer.parseInt(u.split(",")(2)))
t.setString(3, u.split(",")(3).toString)
t.setInt(4, Integer.parseInt(u.split(",")(4)))
}
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
.withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("flink_test")
.withPassword("flink_test")
.build()))
env.execute()
}
}
flink jdbc依赖
org.apache.flink flink-connector-jdbc_${scala.version} ${flink.version} mysql mysql-connector-java 5.1.47
flink 依赖
源码下载org.apache.flink flink-clients_${scala.version} ${flink.version} org.apache.flink flink-scala_${scala.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.version} ${flink.version}
https://download.csdn.net/download/sinat_25528181/44038825



