项目说明本文是在基于搭建好集群执行
需要配置如下
1、完整CDH集群
2、Mysql开启binlog日志,如没开启参考此链接:Mysql开启binlog日志
3、启动Maxwell,如没有安装参考此链接:Maxwell安装及配置
- kafka实时接收Maxwell监控到的日志
- 使用flink实时消费kakfa数据,处理json日志并拿到想要字段进行滚动窗口计算
- 把计算出来的数据存入Mysql数据库(也可以换成其他数据库,比如Tidb,具体看需求)
{"database":"test","table":"person","type":"insert","ts":1638343178,"xid":10873875,"commit":true,"data":{"id":69,"name":"sd","age":null}}
{"database":"test","table":"person","type":"delete","ts":1638341838,"xid":10838691,"commit":true,"data":{"id":32,"name":"rr","age":3}}
{"database":"test","table":"person","type":"update","ts":1638343457,"xid":10877187,"commit":true,"data":{"id":66,"name":"de","age":null},"old":{"name":"的"}}
项目需求
- flink实时统计当天新增日志中有(insert)操作人数
- 写入到 person_count 表
- 包含字段(数据库名称,表名,执行操作,日期,人数)
package com.jt.flink.Count
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.kafka.clients.consumer.ConsumerConfig
import com.alibaba.fastjson.JSON
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import java.util.Properties
import com.jt.util.ConfigUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object FlinkCountKafkaDemo1 {
def main(args: Array[String]): Unit = {
//创建流处理环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//之后开启CheckPointing可以开启重启策略
environment.enableCheckpointing(5000)
//设置重启策略为,出现三次异常重启3次,隔10秒一次
environment.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))
//系统异常退出或者人为退出,不删除checkpoint数据
environment.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint模式(与Kafka整合,要设置Checkpoint模式为Exactly_Once)
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
environment.setParallelism(1)
// environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//配置kafka信息
val properties = new Properties()
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "srv191:9092,srv192:9092,srv193:9092")
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-3145")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
//如果没有记录偏移量,第一次从最开始消费:earliest 从最新的位置开始消费:latest
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
//kafka的消费者,不自动提交偏移量
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
// 获取topic数据
val valueTopic: DataStream[String] = environment.addSource(new FlinkKafkaConsumer[String]("kafka", new SimpleStringSchema(), properties))
//查看获取到得topic数据
// valueTopic.map(t => t.toString).print()
val dataStream: DataStreamSink[((String, String, String, String), Int)] = valueTopic.map(line => {
val database = JSON.parseObject(line).getString("database").toString.replaceAll("\"", "")
val table = JSON.parseObject(line).getString("table").toString.replaceAll("\"", "")
val Type = JSON.parseObject(line).getString("type").toString.replaceAll("\"", "")
val dataTs = JSON.parseObject(line).getString("ts").toLong
val time = new SimpleDateFormat("yyyy-MM-dd").format(dataTs * 1000L)
(database, table, Type, time)
})
.keyBy(k => k._3)
.filter(_._3 == "insert")
.map(a => (a, 1))
.keyBy(0)
//设置5秒滚动窗口,每隔5秒计算一次窗口内数据
.timeWindow(Time.seconds(5))
.sum(1)//聚合累加算子
.addSink(new MysqlSink)
// .print()
//提交flink任务job
environment.execute()
}
class MysqlSink extends RichSinkFunction[((String, String, String,String), Int)] {
//获取配置文件
val driver = ConfigUtil.getString("mysql-driver")
val url = ConfigUtil.getString("mysql-url")
val user = ConfigUtil.getString("mysql-user")
val password = ConfigUtil.getString("mysql-password")
private var connection: Connection = null
private var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
//1:加载驱动
Class.forName(driver)
//2:创建连接
connection = DriverManager.getConnection(url, user, password)
//3:获得执行语句
val sql = "insert into person_count(db,TableName,TypeTable,timeStamp,person_count) values(?,?,?,?,?);"
ps = connection.prepareStatement(sql)
}
override def invoke(value: ((String, String, String,String), Int)): Unit = {
try {
//4.组装数据,执行插入操作
ps.setString(1, value._1._1)
ps.setString(2, value._1._2)
ps.setString(3, value._1._3)
ps.setString(4, value._1._4)
ps.setInt(5, value._2)
ps.executeUpdate()
} catch {
case e: Exception => println(e.getMessage)
}
}
//关闭连接操作
override def close(): Unit = {
if (connection != null) {
connection.close()
}
if (ps != null) {
ps.close()
}
}
}
}
代码案例二
package com.jt.flink.Count
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
import com.jt.util.ConfigUtil
import org.apache.flink.streaming.util.serialization.JSonKeyValueDeserializationSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object FlinkCountKafkaDemo2 {
def main(args: Array[String]): Unit = {
//创建flink流计算执行环境
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//开启CheckPointing可以开启重启策略
environment.enableCheckpointing(5000)
//设置重启策略为,出现三次异常重启3次,隔10秒一次
environment.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))
//系统异常退出或者人为退出,不删除checkpoint数据
environment.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置Checkpoint模式(与Kafka整合,要设置Checkpoint模式为Exactly_Once)端到端的一致性
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//测试环境设置并行度为1,生产环境可以调大也可以不设置
environment.setParallelism(1)
//创建事件创建时间
// environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//配置kafka信息
val properties = new Properties()
//配置kafkaIP地址以及kafka端口号
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "srv191:9092,srv192:9092,srv193:9092")
//设置kakfa消费者组名称
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-3145")
//开启序列化配置
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//开启反序列化配置
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
//如果没有记录偏移量,第一次从最开始消费:earliest 最新消息进行消费:latest
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
//kafka的消费者,不自动提交偏移量
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
//添加数据来源为kafka
val valueTopic: DataStream[ObjectNode] = environment.addSource(new FlinkKafkaConsumer[ObjectNode]("kafka", new JSONKeyValueDeserializationSchema(false), properties))
// valueTopic.map(t => t.toString).print()
val dataStream: DataStreamSink[((String, String, String, String), Int)] =
valueTopic.map(t =>{
val node = t.get("value")
val database = node.get("database").toString.replaceAll("\"", "")
val table = node.get("table").toString.replaceAll("\"", "")
val Type = node.get("type").toString.replaceAll("\"", "")
val ts = node.get("ts").asLong()
val time = new SimpleDateFormat("yyyy-MM-dd").format(ts * 1000L)
(database, table, Type, time)
})
//根据Type进行分组把相同的Key放进同一个里面
.keyBy(k => k._3)
//过滤除insert以外所有的类型操作
.filter(_._3 == "insert")
//计数
.map(a => (a, 1))
// 分组算子 0或1代表的是下标,就是上面DataStreamSink返回的二元组下标
// 0代表上面返回元组中的数据
// 1代表的是元组数据中出现的次数
.keyBy(0)
//定义一个5秒钟滚动窗口,每5秒钟统计一次
.timeWindow(Time.seconds(5))
//聚合
.sum(1)
//添加自定义Mysqlsink
// .addSink(new MysqlSink)
.print()
//提交flink任务job
environment.execute()
}
}
class MysqlSink extends RichSinkFunction[((String, String, String, String), Int)] {
//获取配置文件
val driver = ConfigUtil.getString("mysql-driver")
val url = ConfigUtil.getString("mysql-url")
val user = ConfigUtil.getString("mysql-user")
val password = ConfigUtil.getString("mysql-password")
private var connection: Connection = null
private var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
//1:加载驱动
Class.forName(driver)
//2:创建连接
connection = DriverManager.getConnection(url, user, password)
//3:获得执行语句
val sql = "insert into person_count(db,TableName,TypeTable,timeStamp,person_count) values(?,?,?,?,?);"
ps = connection.prepareStatement(sql)
}
override def invoke(value: ((String, String, String, String), Int)): Unit = {
try {
//4.组装数据,执行插入操作
ps.setString(1, value._1._1)
ps.setString(2, value._1._2)
ps.setString(3, value._1._3)
ps.setString(4, value._1._4)
ps.setInt(5, value._2)
ps.executeUpdate()
} catch {
case e: Exception => println(e.getMessage)
}
}
//关闭连接操作
override def close(): Unit = {
if (connection != null) {
connection.close()
}
if (ps != null) {
ps.close()
}
}
}
建议使用第二方案代码,flink自带的解析json字符串反序列化有问题
封装的驱动连接如下package com.jt.util;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.log4j.Logger;
public class ConfigUtil {
private static final Logger logger = Logger.getLogger(ConfigUtil.class);
private static PropertiesConfiguration config = null;
private ConfigUtil() {
}
static {
try {
//初始化配置
config = new PropertiesConfiguration("config.properties");
//文件修改之后自动加载
config.setReloadingStrategy(new FileChangedReloadingStrategy());
//配置文件自动保存
config.setAutoSave(true);
} catch (ConfigurationException ex) {
logger.error(ex.getMessage());
}
}
public static String getString(String key) {
return config.getString(key, "");
}
public static String[] getStringArray(String key) {
return config.getStringArray(key);
}
public static Integer getInteger(String key) {
return config.getInteger(key, 0);
}
public static Double getDouble(String key) {
return config.getDouble(key, 0D);
}
public static Long getLong(String key) {
return config.getLong(key, 0L);
}
public static void setProperty(String key, Object value) {
config.setProperty(key, value);
}
}
配置文件
mysql-driver=com.mysql.jdbc.Driver mysql-url=jdbc:mysql://xxxx.xxxx.xxxx.xxxx:3306/test01?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull mysql-user=username mysql-password=password #datasource parameters initSize=1 maxSize=4pom.xml依赖配置
com.alibaba fastjson 1.2.66 org.apache.flink flink-scala_2.11 1.7.2 org.apache.flink flink-streaming-scala_2.11 1.7.2 org.apache.flink flink-connector-kafka_2.11 1.10.0 org.apache.flink flink-connector-jdbc_2.12 1.11.0 org.apache.flink flink-cep_2.11 1.11.2 net.sf.json-lib json-lib 2.4 jdk15 commons-beanutils commons-beanutils 1.7.0 commons-collections commons-collections 3.1 commons-lang commons-lang 2.5 net.sf.ezmorph ezmorph 1.0.3 org.apache.spark spark-sql_2.11 2.1.0 org.apache.spark spark-streaming_2.11 ${spark.version} com.typesafe config 1.3.3 org.apache.kafka kafka-clients 0.11.0.0 org.apache.spark spark-core_2.11 2.2.0 mysql mysql-connector-java 5.1.48 org.apache.spark spark-streaming_2.11 2.2.0 org.scala-lang scala-library 2.11.8 org.apache.maven.plugins maven-compiler-plugin 3.5.1 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies



