栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink实时计算kafka数据,存储到Mysql(Scala版本)

flink实时计算kafka数据,存储到Mysql(Scala版本)

Flink实时计算kafka数据,存储到Mysql(Scala版本)

本文是在基于搭建好集群执行
需要配置如下
1、完整CDH集群
2、Mysql开启binlog日志,如没开启参考此链接:Mysql开启binlog日志
3、启动Maxwell,如没有安装参考此链接:Maxwell安装及配置

项目说明
  1. kafka实时接收Maxwell监控到的日志
  2. 使用flink实时消费kakfa数据,处理json日志并拿到想要字段进行滚动窗口计算
  3. 把计算出来的数据存入Mysql数据库(也可以换成其他数据库,比如Tidb,具体看需求)
部分kafka数据样例(插入,更新,删除三条样例数据)
{"database":"test","table":"person","type":"insert","ts":1638343178,"xid":10873875,"commit":true,"data":{"id":69,"name":"sd","age":null}}
{"database":"test","table":"person","type":"delete","ts":1638341838,"xid":10838691,"commit":true,"data":{"id":32,"name":"rr","age":3}}
{"database":"test","table":"person","type":"update","ts":1638343457,"xid":10877187,"commit":true,"data":{"id":66,"name":"de","age":null},"old":{"name":"的"}}
项目需求
  1. flink实时统计当天新增日志中有(insert)操作人数
  2. 写入到 person_count 表
  3. 包含字段(数据库名称,表名,执行操作,日期,人数)
执行成功图

架构图

代码案例一
package com.jt.flink.Count

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.kafka.clients.consumer.ConsumerConfig
import com.alibaba.fastjson.JSON
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import java.util.Properties
import com.jt.util.ConfigUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer


object FlinkCountKafkaDemo1 {
  def main(args: Array[String]): Unit = {
    //创建流处理环境
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //之后开启CheckPointing可以开启重启策略
    environment.enableCheckpointing(5000)
    //设置重启策略为,出现三次异常重启3次,隔10秒一次
    environment.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))
    //系统异常退出或者人为退出,不删除checkpoint数据
    environment.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置Checkpoint模式(与Kafka整合,要设置Checkpoint模式为Exactly_Once)
    environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    environment.setParallelism(1)
//    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //配置kafka信息
    val properties = new Properties()
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "srv191:9092,srv192:9092,srv193:9092")
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-3145")
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    //如果没有记录偏移量,第一次从最开始消费:earliest 从最新的位置开始消费:latest
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    //kafka的消费者,不自动提交偏移量
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")

    // 获取topic数据
    val valueTopic: DataStream[String] = environment.addSource(new FlinkKafkaConsumer[String]("kafka", new SimpleStringSchema(), properties))
    //查看获取到得topic数据
//    valueTopic.map(t => t.toString).print()

    val dataStream: DataStreamSink[((String, String, String, String), Int)] = valueTopic.map(line => {
      val database = JSON.parseObject(line).getString("database").toString.replaceAll("\"", "")
      val table = JSON.parseObject(line).getString("table").toString.replaceAll("\"", "")
      val Type = JSON.parseObject(line).getString("type").toString.replaceAll("\"", "")
      val dataTs = JSON.parseObject(line).getString("ts").toLong
      val time = new SimpleDateFormat("yyyy-MM-dd").format(dataTs * 1000L)
      (database, table, Type, time)
    })
      .keyBy(k => k._3)
      
      .filter(_._3 == "insert")
      .map(a => (a, 1))
      
      .keyBy(0)
      //设置5秒滚动窗口,每隔5秒计算一次窗口内数据
      .timeWindow(Time.seconds(5))
      .sum(1)//聚合累加算子
      .addSink(new MysqlSink)
//      .print()

    //提交flink任务job
    environment.execute()
  }
  class MysqlSink extends RichSinkFunction[((String, String, String,String), Int)] {

    //获取配置文件
    val driver = ConfigUtil.getString("mysql-driver")
    val url = ConfigUtil.getString("mysql-url")
    val user = ConfigUtil.getString("mysql-user")
    val password = ConfigUtil.getString("mysql-password")

    private var connection: Connection = null
    private var ps: PreparedStatement = null

    override def open(parameters: Configuration): Unit = {
      //1:加载驱动
      Class.forName(driver)
      //2:创建连接
      connection = DriverManager.getConnection(url, user, password)
      //3:获得执行语句
      val sql = "insert into person_count(db,TableName,TypeTable,timeStamp,person_count) values(?,?,?,?,?);"
      ps = connection.prepareStatement(sql)
    }

    override def invoke(value: ((String, String, String,String), Int)): Unit = {
      try {
        //4.组装数据,执行插入操作
        ps.setString(1, value._1._1)
        ps.setString(2, value._1._2)
        ps.setString(3, value._1._3)
        ps.setString(4, value._1._4)
        ps.setInt(5, value._2)
        ps.executeUpdate()
      } catch {
        case e: Exception => println(e.getMessage)
      }
    }

    //关闭连接操作
    override def close(): Unit = {
      if (connection != null) {
        connection.close()
      }
      if (ps != null) {
        ps.close()
      }
    }
  }
}
代码案例二
package com.jt.flink.Count

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
import com.jt.util.ConfigUtil
import org.apache.flink.streaming.util.serialization.JSonKeyValueDeserializationSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object FlinkCountKafkaDemo2 {
  def main(args: Array[String]): Unit = {
    //创建flink流计算执行环境
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //开启CheckPointing可以开启重启策略
    environment.enableCheckpointing(5000)
    //设置重启策略为,出现三次异常重启3次,隔10秒一次
    environment.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))
    //系统异常退出或者人为退出,不删除checkpoint数据
    environment.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //设置Checkpoint模式(与Kafka整合,要设置Checkpoint模式为Exactly_Once)端到端的一致性
    environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //测试环境设置并行度为1,生产环境可以调大也可以不设置
    environment.setParallelism(1)
    //创建事件创建时间
    //    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //配置kafka信息
    val properties = new Properties()
    //配置kafkaIP地址以及kafka端口号
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "srv191:9092,srv192:9092,srv193:9092")
    //设置kakfa消费者组名称
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-3145")
    //开启序列化配置
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    //开启反序列化配置
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    //如果没有记录偏移量,第一次从最开始消费:earliest 最新消息进行消费:latest
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    //kafka的消费者,不自动提交偏移量
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    //添加数据来源为kafka
    val valueTopic: DataStream[ObjectNode] = environment.addSource(new FlinkKafkaConsumer[ObjectNode]("kafka", new JSONKeyValueDeserializationSchema(false), properties))
    
//    valueTopic.map(t => t.toString).print()
    val dataStream: DataStreamSink[((String, String, String, String), Int)] =
      valueTopic.map(t =>{
        
        val node = t.get("value")
        val database = node.get("database").toString.replaceAll("\"", "")
        val table = node.get("table").toString.replaceAll("\"", "")
        val Type = node.get("type").toString.replaceAll("\"", "")
        val ts = node.get("ts").asLong()
        val time = new SimpleDateFormat("yyyy-MM-dd").format(ts * 1000L)
        (database, table, Type, time)
      })
        //根据Type进行分组把相同的Key放进同一个里面
        .keyBy(k => k._3)
        //过滤除insert以外所有的类型操作
        .filter(_._3 == "insert")
        //计数
        .map(a => (a, 1))
        // 分组算子  0或1代表的是下标,就是上面DataStreamSink返回的二元组下标
        // 0代表上面返回元组中的数据
        // 1代表的是元组数据中出现的次数
        .keyBy(0)
        //定义一个5秒钟滚动窗口,每5秒钟统计一次
        .timeWindow(Time.seconds(5))
        //聚合
        .sum(1)
        //添加自定义Mysqlsink
//        .addSink(new MysqlSink)
        .print()

    //提交flink任务job
    environment.execute()

  }
}
class MysqlSink extends RichSinkFunction[((String, String, String, String), Int)] {
  //获取配置文件
  val driver = ConfigUtil.getString("mysql-driver")
  val url = ConfigUtil.getString("mysql-url")
  val user = ConfigUtil.getString("mysql-user")
  val password = ConfigUtil.getString("mysql-password")

  private var connection: Connection = null
  private var ps: PreparedStatement = null

  override def open(parameters: Configuration): Unit = {
    //1:加载驱动
    Class.forName(driver)
    //2:创建连接
    connection = DriverManager.getConnection(url, user, password)
    //3:获得执行语句
    val sql = "insert into person_count(db,TableName,TypeTable,timeStamp,person_count) values(?,?,?,?,?);"
    ps = connection.prepareStatement(sql)
  }

  override def invoke(value: ((String, String, String, String), Int)): Unit = {
    try {
      //4.组装数据,执行插入操作
      ps.setString(1, value._1._1)
      ps.setString(2, value._1._2)
      ps.setString(3, value._1._3)
      ps.setString(4, value._1._4)
      ps.setInt(5, value._2)
      ps.executeUpdate()
    } catch {
      case e: Exception => println(e.getMessage)
    }
  }
  //关闭连接操作
  override def close(): Unit = {
    if (connection != null) {
      connection.close()
    }
    if (ps != null) {
      ps.close()
    }
  }
}

建议使用第二方案代码,flink自带的解析json字符串反序列化有问题

封装的驱动连接如下
package com.jt.util;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
import org.apache.log4j.Logger;

public class ConfigUtil {
    private static final Logger logger = Logger.getLogger(ConfigUtil.class);
    private static PropertiesConfiguration config = null;

    private ConfigUtil() {

    }

    static {
        try {
            //初始化配置
            config = new PropertiesConfiguration("config.properties");
            //文件修改之后自动加载
            config.setReloadingStrategy(new FileChangedReloadingStrategy());
            //配置文件自动保存
            config.setAutoSave(true);
        } catch (ConfigurationException ex) {
            logger.error(ex.getMessage());
        }
    }

    
    public static String getString(String key) {
        return config.getString(key, "");
    }

    
    public static String[] getStringArray(String key) {
        return config.getStringArray(key);
    }

    
    public static Integer getInteger(String key) {
        return config.getInteger(key, 0);
    }

    
    public static Double getDouble(String key) {
        return config.getDouble(key, 0D);
    }

    
    public static Long getLong(String key) {
        return config.getLong(key, 0L);
    }

    
    public static void setProperty(String key, Object value) {
        config.setProperty(key, value);
    }
}

配置文件
mysql-driver=com.mysql.jdbc.Driver
mysql-url=jdbc:mysql://xxxx.xxxx.xxxx.xxxx:3306/test01?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull
mysql-user=username
mysql-password=password

#datasource parameters
initSize=1
maxSize=4
pom.xml依赖配置


        
            com.alibaba
            fastjson
            1.2.66
        

        
            org.apache.flink
            flink-scala_2.11
            1.7.2
        

        
            org.apache.flink
            flink-streaming-scala_2.11
            1.7.2
        
        
            org.apache.flink
            flink-connector-kafka_2.11
            1.10.0
        

        
            org.apache.flink
            flink-connector-jdbc_2.12
            1.11.0
        


        
            org.apache.flink
            flink-cep_2.11
            1.11.2
        

        
        
            net.sf.json-lib
            json-lib
            2.4
            jdk15
        
        
            commons-beanutils
            commons-beanutils
            1.7.0
        
        
            commons-collections
            commons-collections
            3.1
        
        
            commons-lang
            commons-lang
            2.5
        
        
            net.sf.ezmorph
            ezmorph
            1.0.3
        



        
            org.apache.spark
            spark-sql_2.11
            2.1.0

        



        
            org.apache.spark
            spark-streaming_2.11
            ${spark.version}
        


        
            com.typesafe
            config
            1.3.3
        


        
            org.apache.kafka
            kafka-clients
            0.11.0.0
        

        
            org.apache.spark
            spark-core_2.11
            2.2.0
        
        
            mysql
            mysql-connector-java
            5.1.48
        

        
            org.apache.spark
            spark-streaming_2.11
            2.2.0
        
        
            org.scala-lang
            scala-library
            2.11.8
        
    
    
        
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.5.1
            
            
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        
                            compile
                            testCompile
                        
                        
                            
                                -dependencyfile
                                ${project.build.directory}/.scala_dependencies
                            
                        
                    
                
            

        
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/650583.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号