栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flinksql自定义mysql sink

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flinksql自定义mysql sink

因为我接下去想尝试搭建基于docker的mysql集群,因此连接的docker版本的mysql,出了几个小问题,一个是关闭ssl认证,一个修改mavn版本提升到8.0.11 否则会报caching_sha2_password 的错误

 
            mysql
            mysql-connector-java
            runtime
            8.0.11

回归正题,自定义jdbcsink难度不大,核心代码如下

class MyJdbcSinkFunc() extends RichSinkFunction[item_count]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _
  var count = 0

  override def open(parameters: Configuration): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3309/alibaba?characterEncoding=utf8&useSSL=false", "root", "root")
    insertStmt = conn.prepareStatement("insert into Item_detail (item_id,cate_id,price,sale_count,save_count) values (?, ?,?,?,?)")
    updateStmt = conn.prepareStatement("update Item_detail set sale_count =sale_count +1  where item_id = ?")
  }

  override def invoke(value: item_count): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setLong(1, value.item_id)
    updateStmt.execute()


    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setLong(1, value.item_id)
      insertStmt.setLong(2, value.cate_id)
      insertStmt.setDouble(3, value.price)
      insertStmt.setInt(4, value.sale_count)
      insertStmt.setInt(5,value.save_count)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

全部代码如下

package redo.source



import bag.day01.State.RichMapper
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.util.Collector

import java.sql.{Connection, DriverManager, PreparedStatement, Statement}
import java.util.{Date, Properties}
import scala.util.Random


case class User(user_id:Long,item_id:Long,cate_id:Long,pv:Int,fav:Int,cart:Int,buy:Int,times:Date)
case class item_count(item_id:Long,cate_id:Long,price:Double,sale_count:Int,save_count:Int)
case class fl(item_id:Long,cate_id:Long)
object ClickHouseSources {
  def main(args: Array[String]): Unit = {
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val sort4 =  env.addSource(new mysource_funtions())
    val flu:DataStream[item_count] = sort4.map(x=>{

      val price = scala.util.Random.nextDouble()

      item_count(x.cate_id,x.cate_id,price*1000,1,1000)
    })
    flu.print("avg")
    flu.addSink( new MyJdbcSinkFunc() )

    env.execute("connect clickhouse")







  }
}
class MyJdbcSinkFunc() extends RichSinkFunction[item_count]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _
  var count = 0

  override def open(parameters: Configuration): Unit = {
    Class.forName("com.mysql.jdbc.Driver")
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3309/alibaba?characterEncoding=utf8&useSSL=false", "root", "root")
    insertStmt = conn.prepareStatement("insert into Item_detail (item_id,cate_id,price,sale_count,save_count) values (?, ?,?,?,?)")
    updateStmt = conn.prepareStatement("update Item_detail set sale_count =sale_count +1  where item_id = ?")
  }

  override def invoke(value: item_count): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setLong(1, value.item_id)
    updateStmt.execute()


    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setLong(1, value.item_id)
      insertStmt.setLong(2, value.cate_id)
      insertStmt.setDouble(3, value.price)
      insertStmt.setInt(4, value.sale_count)
      insertStmt.setInt(5,value.save_count)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}
class mysource_funtions extends  SourceFunction[fl]{
  var running = true


  override def cancel(): Unit = running=false
  override def run(ctx: SourceFunction.SourceContext[fl]): Unit = {

    Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
    //连接
    val connection: Connection = DriverManager.getConnection( "jdbc:clickhouse://hadoop102:8123/alibaba")
    val statement: Statement = connection.createStatement()
    //建立查询
    val sql = "select item_id,cate_id from user "
    var res1 = statement.executeQuery(sql)
    while(running){
      while (res1.next()) {

        val item_id = res1.getLong(1)
        val cate_id = res1.getLong(2)


         ctx.collect(fl(item_id,cate_id))
      }
      connection.close()
      cancel()


  }
  }
}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/299306.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号