结合Flink一周的使用经验,用到了三个模块 (source于kafka,sink到Mysql,kudu,kafka)
source kafka是最简单的
//导入的包
import scala.collection.JavaConverters._
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.util.Properties
//主要流程
val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092")
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafkagroup")
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//消费最新数据
prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val ds = env.addSource(
new FlinkKafkaConsumer[String](
"topic_name",
new SimpleStringSchema(),
prop
)
)
//开始转换 我Kafka是json数据 所以引入fastjson进行解析
//当不确定json内部是否有我们要的key值,我们可以使用filter + containsKey的方式进行过滤
val dataStream = ds.map(x=>{val line = JSON.parseObject(x)
//第一个map返回json
line
}).filter(x => x.containsKey("report")).map(x=>{
val appid = x.get("appid").toString
val userid = x.get("userid").toString
val time = x.get("time").toString
//sink mysql 采用case class的方式
Reading(appid,userid,time)
//sink kafka 可直接字符串拼接
appid +","+userid +","+time
//sink kudu 采用map的方式 自定义kudusink采用java语言编写 所以要将scala map转成java map
val map = Map("appid" -> appid, "userid" -> userid, "time" -> time)
map.asJava
})
case class Reading(appid: string, userid: string, time: string)
Sink MySQL
class JDBCSink() extends RichSinkFunction[SensorReading] {
// 定义sql连接、预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
// 初始化,创建连接和预编译语句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://IP:3306/库名", "用户名", "密码")
insertStmt = conn.prepareStatement("INSERT INTO salary_table (appid, userid, create_time) VALUES (?,?,?)")
}
// 调用连接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {
insertStmt.setString(1, value.appid)
insertStmt.setString(2, value.userid)
insertStmt.setString(4, value.create_time)
insertStmt.execute()
}
// 关闭时做清理工作
override def close(): Unit = {
insertStmt.close()
conn.close()
}
}
dataStream.addSink(new JDBCSink())
//开始执行
env.execute("job")
Sink KUDU
package org.example;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;
public class SinkKudu extends RichSinkFunction
Sink Kakfa 不需要自定义sink 原生支持
val prop2 = new Properties()
prop2.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.0.20.7:9092")
prop2.setProperty(ProducerConfig.RETRIES_CONFIG,"0")
prop2.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
prop2.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
//dwd_log new
dataStream.addSink( new FlinkKafkaProducer[String](
"ip:9092",
"topic",
new SimpleStringSchema()))
env.execute("event_attendees_ff")