栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

Flink Mysql 操作(scalikejdbc)

Flink Mysql 操作(scalikejdbc)

Flink Mysql 操作

pom文件

            
                mysql
                mysql-connector-java
                5.1.28
            
CREATE TABLE IF NOT EXISTS `student`(
   `id` INT ,
   `name` VARCHAr(100) NOT NULL,
   `age` INT
)


INSERT INTO student
(id, name, age)
VALUES
("2", "fy", 25);
LOW B 版本

MySQL utils

import java.sql.{Connection, DriverManager}

object MySQLUtils {
  def  getConnection() ={
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("","root","")

  }

  def closeConnection(connection:Connection): Unit ={
    if(null != connection){
      connection.close()
    }
  }
}

定义一个类

object Domain {

    case class Student(id: Int, name: String, age: Int)
}

cityosMySQLSource

import com.cityos.bean.Domain.Student
import com.cityos.utils.MySQLUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

import java.sql.{Connection, PreparedStatement}

class cityosMySQLSource  extends  RichSourceFunction[Student]{

  var connection:Connection = _
  var pstmt:PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    connection = MySQLUtils.getConnection()
    pstmt = connection.prepareStatement("select * from student")
  }

  override def close(): Unit = {
    MySQLUtils.closeConnection(connection)
  }


  override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {
    val rs = pstmt.executeQuery()
    while(rs.next()){
      val student = Student(rs.getInt("id"), rs.getString("name"), rs.getInt("age"))
      ctx.collect(student)
    }
  }
  override def cancel(): Unit = ???
}

sourceApp

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object sourceApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.addSource(new cityosMySQLSource)

    println(stream.parallelism)

    val mapStream = stream.map(x => x.toString)

    println(mapStream.parallelism)

    mapStream.print()

    env.execute(getClass.getCanonicalName)
  }
}

高端版本
        
            org.scalikejdbc
            scalikejdbc-config_2.11
            3.4.0
        

application.conf

db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop001:3306/ruozedata"
db.default.user="root"
db.default.password="csz"

# Connection Pool settings
db.default.poolInitialSize=10
db.default.poolMaxSize=20
db.default.connectionTimeoutMillis=1000

# Connection Pool settings
db.default.poolInitialSize=5
db.default.poolMaxSize=7
db.default.poolConnectionTimeoutMillis=1000

sourceApp

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object sourceApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    
    val stream = env.addSource(new cityMySQLScalikeJDBC).setParallelism(1)

    println(stream.parallelism)

    val mapStream = stream.map(x => x.toString)

    println(mapStream.parallelism)

    mapStream.print()

    env.execute(getClass.getCanonicalName)
  }
}

scalikeJDBC

import com.cityos.bean.Domain.Student
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs

class cityMySQLScalikeJDBC extends RichParallelSourceFunction[Student]{
  
  override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {

    DBs.setupAll()
    DB.readOnly{ implicit session => {
      SQL("SELECt * FROM student").map(rs =>{
        val student = Student(rs.int("id"), rs.string("name"),rs.int("age"))
        ctx.collect(student)
      }).list().apply()
    }

    }
  }

  override def cancel(): Unit = ???
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278870.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号