source(读数据),transmation(处理数据),sink(写数据)
2、source段的代码(4类)(1)基于本地集合的 source
(2)基于文件的 source
(3)基于网络套接字的 source
(4)自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source
自定义Scorce,实现ScorceFunction接口,实现里面的run方法,执行对应的逻辑
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object Demo01Source {
def main(args: Array[String]): Unit = {
//创建flink的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// val listDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))
// listDS.print()
// val stuDS: DataStream[String] = env.readTextFile("flinkproject/data/students.txt")
// stuDS.map(stu => (stu.split(",")(4), 1))
// .keyBy(_._1)
// .sum(1)
// .print()
// val scoketDS: DataStream[String] = env.socketTextStream("master", 8888)
// scoketDS.print()
env.addSource(new MySource).print()
//启动flink
env.execute()
}
}
4、自定义source类读取mysql中的数据
自定义类实现SourceFunction接口,实现run方法,run方法只会执行一次
—自定义类实现ParallelSourceFunction接口,并行的source,有多少个并行度就有多少个source,数据会重复
—自定义类实现RichSourceFunction接口,多了open和close方法,可以在执行逻辑前执行open方法,之后执行close方法
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object Demo02Mysql {
def main(args: Array[String]): Unit = {
//创建flink的环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(2)
env.addSource(new MySources)
.print()
//启动flink
env.execute()
}
}
class MySources extends RichSourceFunction[(String, String, String, Int, String)] {
var conn: Connection = _
//parameters flink配置文件对象
override def open(parameters: Configuration): Unit = {
//建立JDBC的连接
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection("jdbc:mysql://master:3306/tour", "root", "123456")
}
override def close(): Unit = {
conn.close()
}
override def run(ctx: SourceFunction.SourceContext[(String, String, String, Int, String)]): Unit = {
val state: PreparedStatement = conn.prepareStatement("select * from usertag limit 2")
val res: ResultSet = state.executeQuery()
while (res.next()) {
val mdn: String = res.getString("mdn")
val name: String = res.getString("name")
val gender: String = res.getString("gender")
val age: Int = res.getInt("age")
val trmnl_brand: String = res.getString("trmnl_brand")
ctx.collect((mdn, name, gender, age, trmnl_brand))
}
}
override def cancel(): Unit = {}
}



