栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark stream 3.0.0 scala版kafka消息按天统计并更新到mysql

spark stream 3.0.0 scala版kafka消息按天统计并更新到mysql

在电商等线上业务场景中,需要对新生成的数据进行实时统计,尽快体现在当天的数据统计中,以便反应出当前时间点的数据变动情况。

1. 添加依赖

        
            org.apache.spark
            spark-core_2.12
            3.0.0
        

        
            org.apache.spark
            spark-streaming_2.12
            3.0.0
        

        
            org.apache.spark
            spark-streaming-kafka-0-10_2.12
            3.1.0
        

        
            com.alibaba
            druid
            1.1.10
        
        
            mysql
            mysql-connector-java
            8.0.11
        
        
            com.fasterxml.jackson.core
            jackson-core
            2.10.1
        

    
2. 主程序代码
package com.demo.daystatis

import com.demo.PropertiesUtil
import com.demo.utils.{JdbcUtil, MyKafkaUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object RealTimeApp {

  def main(args: Array[String]): Unit = {
    //1.创建 SparkConf
    val sparkConf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("RealTimeApp")
    //2.创建 StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //3.读取 Kafka 数据 1583288137305 华南 深圳 4 3
    val topic: String =
      PropertiesUtil.load("config.properties").getProperty("kafka.topic")
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      MyKafkaUtil.getKafkaStream(topic, ssc)
    //4.将每一行数据转换为样例类对象
    val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(record => {
      //a.取出 value 并按照" "切分
      val arr: Array[String] = record.value().split(" ")
      //b.封装为样例类对象
      Ads_log(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
    })

    adsLogDStream.cache()

    //统计每天各大区各个城市广告点击总数并保存至 MySQL 中
    DayStatisHandler.saveDateAreaCityAdCountToMysql(adsLogDStream)

    //开启任务
    ssc.start()
    ssc.awaitTermination()
  }

}
3. 逻辑处理代码
package com.demo.daystatis

import com.demo.utils.JdbcUtil

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.streaming.dstream.DStream

case class Ads_log(timestamp: Long,
                   area: String,
                   city: String,
                   userid: String,
                   adid: String)

object DayStatisHandler {

  //时间格式化对象
  private val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
  
  def saveDateAreaCityAdCountToMysql(filterAdsLogDStream: DStream[Ads_log]): Unit
  = {
    //1.统计每天各大区各个城市广告点击总数
    val dateAreaCityAdToCount: DStream[((String, String, String, String), Long)] =
      filterAdsLogDStream.map(ads_log => {
        //a.取出时间戳
        val timestamp: Long = ads_log.timestamp
        //b.格式化为日期字符串
        val dt: String = sdf.format(new Date(timestamp))
        //c.组合,返回
        ((dt, ads_log.area, ads_log.city, ads_log.adid), 1L)
      }).reduceByKey(_ + _)

    //2.将单个批次统计之后的数据集合 MySQL 数据对原有的数据更新
    dateAreaCityAdToCount.foreachRDD(rdd => {
      //对每个分区单独处理
      rdd.foreachPartition(iter => {
        //a.获取连接
        val connection: Connection = JdbcUtil.getConnection
        //b.写库
        iter.foreach { case ((dt, area, city, adid), count) =>
          JdbcUtil.executeUpdate(connection,
            """
              |INSERT INTO area_city_ad_count (dt,area,city,adid,count)
              |VALUES(?,?,?,?,?)
              |ON DUPLICATE KEY
              |UPDATE count=count+?;
 """.stripMargin,
            Array(dt, area, city, adid, count, count))
        }
        //c.释放连接
        connection.close()
      })
    })
  }

}

将rdd中的数据进行汇总后,在与mysql数据库中的数据进行比对操作,如果不存在,则进行添加,如果存在,则累计统计数据。 为了提高更新效率,这里使用了批量更新操作。

3. kafka消费者工具类
package com.demo.utils

import com.demo.PropertiesUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import java.util.Properties

object MyKafkaUtil {

  //1.创建配置信息对象
  private val properties: Properties = PropertiesUtil.load("config.properties")
  //2.用于初始化链接到集群的地址
  val broker_list: String = properties.getProperty("kafka.broker.list")
  //3.kafka 消费者配置
  val kafkaParam = Map(
    "bootstrap.servers" -> broker_list,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    //消费者组
    "group.id" -> "commerce-consumer-group",
    //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
    //可以使用这个配置,latest 自动重置偏移量为最新的偏移量
    "auto.offset.reset" -> "latest",
    //如果是 true,则这个消费者的偏移量会在后台自动提交,但是 kafka 宕机容易丢失数据
    //如果是 false,会需要手动维护 kafka 偏移量
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )
  // 创建 DStream,返回接收到的输入数据
  // LocationStrategies:根据给定的主题和集群地址创建 consumer
  // LocationStrategies.PreferConsistent:持续的在所有 Executor 之间分配分区
  // ConsumerStrategies:选择如何在 Driver 和 Executor 上创建和配置 Kafka Consumer
  // ConsumerStrategies.Subscribe:订阅一系列主题
  def getKafkaStream(topic: String, ssc: StreamingContext):
  InputDStream[ConsumerRecord[String, String]] = {
    val dStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
          String](Array(topic), kafkaParam))
    dStream
  }

}
4. mysql读写工具类
package com.demo.utils

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Properties
import javax.sql.DataSource
import com.alibaba.druid.pool.DruidDataSourceFactory
import com.demo.PropertiesUtil


object JdbcUtil {

  //初始化连接池
  var dataSource: DataSource = init()
  //初始化连接池方法
  def init(): DataSource = {
    val properties = new Properties()
    val config: Properties = PropertiesUtil.load("config.properties")
    properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
    properties.setProperty("url", config.getProperty("jdbc.url"))
    properties.setProperty("username", config.getProperty("jdbc.user"))
    properties.setProperty("password", config.getProperty("jdbc.password"))
    properties.setProperty("maxActive",
      config.getProperty("jdbc.datasource.size"))
    DruidDataSourceFactory.createDataSource(properties)
  }
  //获取 MySQL 连接
  def getConnection: Connection = {
    dataSource.getConnection
  }
  //执行 SQL 语句,单条数据插入
  def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int
  = {
    var rtn = 0
    var pstmt: PreparedStatement = null
    try {
      connection.setAutoCommit(false)
      pstmt = connection.prepareStatement(sql)
      if (params != null && params.length > 0) {
        for (i <- params.indices) {
          pstmt.setObject(i + 1, params(i))
        }
      }
      rtn = pstmt.executeUpdate()
      connection.commit()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    rtn
  }
  //执行 SQL 语句,批量数据插入
  def executeBatchUpdate(connection: Connection, sql: String, paramsList:
  Iterable[Array[Any]]): Array[Int] = {
    var rtn: Array[Int] = null
    var pstmt: PreparedStatement = null
    try {
      connection.setAutoCommit(false)
      pstmt = connection.prepareStatement(sql)
      for (params <- paramsList) {
        if (params != null && params.length > 0) {
          for (i <- params.indices) {
            pstmt.setObject(i + 1, params(i))
          }
          pstmt.addBatch()
        }
      }
      rtn = pstmt.executeBatch()
      connection.commit()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    rtn
  }
  //判断一条数据是否存在
  def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean =
  {
    var flag: Boolean = false
    var pstmt: PreparedStatement = null
    try {
      pstmt = connection.prepareStatement(sql)
      for (i <- params.indices) {
        pstmt.setObject(i + 1, params(i))
      }
      flag = pstmt.executeQuery().next()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    flag
  }
  //获取 MySQL 的一条数据
  def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]):
  Long = {
    var result: Long = 0L
    var pstmt: PreparedStatement = null
    try {
      pstmt = connection.prepareStatement(sql)
      for (i <- params.indices) {
        pstmt.setObject(i + 1, params(i))
      }
      val resultSet: ResultSet = pstmt.executeQuery()
      while (resultSet.next()) {
        result = resultSet.getLong(1)
      }
      resultSet.close()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    result
  }
  //主方法,用于测试上述方法
  def main(args: Array[String]): Unit = {
  }

}
6. 配置信息(config.properties)
#jdbc 配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://localhost:3306/spark2020?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=username
jdbc.password=password

# Kafka 配置
kafka.broker.list=localhost:9092
kafka.topic=test
7. mysql建表脚本
CREATE TABLE area_city_ad_count (
dt VARCHAr(255),
area VARCHAr(255),
city VARCHAr(255),
adid VARCHAr(255),
 count BIGINT,
PRIMARY KEY (dt,area,city,adid)
);
8. 通用工具类(PropertiesUtil.scala)
package com.demo

import java.io.InputStreamReader
import java.util.Properties


object PropertiesUtil {

  def load(propertiesName:String): Properties ={
    val prop=new Properties()
    prop.load(new
        InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) , "UTF-8"))
    prop
  }

}
9. 执行测试

在kafka队列中持续输入类似下面数据的情况下。

1645164248400 华东 上海 1 1
1645164248400 华东 上海 5 4
1645164248400 华北 北京 3 1
1645164248400 华北 天津 6 1
1645164248400 华南 深圳 4 3
1645164248400 华南 深圳 5 1
1645164248400 华北 北京 3 4
1645164248400 华北 北京 6 5
1645164248400 华北 北京 4 3
1645164248400 华东 上海 4 2
1645164248400 华北 北京 2 3
1645164248400 华南 深圳 3 5

可以在数据库表中,看到统计的数据。

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741930.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号