栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

11.1.3、flink概述

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

11.1.3、flink概述

1、flink的代码分段(编程模型)

source(读数据),transmation(处理数据),sink(写数据)

2、source段的代码(4类)

(1)基于本地集合的 source
(2)基于文件的 source
(3)基于网络套接字的 source
(4)自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source

3、代码实现

自定义Scorce,实现ScorceFunction接口,实现里面的run方法,执行对应的逻辑

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

object Demo01Source {

  def main(args: Array[String]): Unit = {
    //创建flink的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)
    
//    val listDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))
//    listDS.print()

    
//    val stuDS: DataStream[String] = env.readTextFile("flinkproject/data/students.txt")
//    stuDS.map(stu => (stu.split(",")(4), 1))
//      .keyBy(_._1)
//      .sum(1)
//      .print()

    
//    val scoketDS: DataStream[String] = env.socketTextStream("master", 8888)
//    scoketDS.print()

    
    env.addSource(new MySource).print()


    //启动flink
    env.execute()
  }

}
4、自定义source类读取mysql中的数据

自定义类实现SourceFunction接口,实现run方法,run方法只会执行一次
—自定义类实现ParallelSourceFunction接口,并行的source,有多少个并行度就有多少个source,数据会重复
—自定义类实现RichSourceFunction接口,多了open和close方法,可以在执行逻辑前执行open方法,之后执行close方法

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._

object Demo02Mysql {

  def main(args: Array[String]): Unit = {
    //创建flink的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度
    env.setParallelism(2)
    env.addSource(new MySources)
      .print()
    //启动flink
    env.execute()
  }
}


class MySources extends RichSourceFunction[(String, String, String, Int, String)] {
  var conn: Connection = _

  //parameters flink配置文件对象
  override def open(parameters: Configuration): Unit = {
    //建立JDBC的连接
    Class.forName("com.mysql.jdbc.Driver")
    conn = DriverManager.getConnection("jdbc:mysql://master:3306/tour", "root", "123456")

  }

  override def close(): Unit = {
    conn.close()
  }

  override def run(ctx: SourceFunction.SourceContext[(String, String, String, Int, String)]): Unit = {

    val state: PreparedStatement = conn.prepareStatement("select * from usertag limit 2")
    val res: ResultSet = state.executeQuery()

    while (res.next()) {
      val mdn: String = res.getString("mdn")
      val name: String = res.getString("name")
      val gender: String = res.getString("gender")
      val age: Int = res.getInt("age")
      val trmnl_brand: String = res.getString("trmnl_brand")
      ctx.collect((mdn, name, gender, age, trmnl_brand))
    }

  }
  override def cancel(): Unit = {}
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/687194.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号