[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0W6AIRw9-1647863509976)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317201723092.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YRcE2NNq-1647863509978)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317201811115.png)]
1.2. 官方介绍 Apache Flink - 数据流上的有状态计算
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jXZh15k3-1647863509979)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317201932826.png)]
1.3. 组件栈[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tKdhrmyL-1647863509981)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317202149651.png)]
1.4. 应用场景所有的流式处理
2. Flink的安装部署 2.1. local 本地模式 2.1.1. 原理- Flink程序由JobClient进行提交JobClient将作业提交给JobManagerJobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManagerTaskManager启动一一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。作业执行完成后,结果将发送回客户端(obClient)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bZ7xomA6-1647863509989)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317202931510.png)]
2.1.2. 操作下载安装包
https://archive.apache.org/dist/flink/
上传文件flink-1.12.0-bin-scala_2.12.tgz到指定位置
解压
tar -zxvf flink-1.12.0-bin-scala_ _2.12.tgz
如果出现权限问题,需要修改权限
chown -R root:root /export/server/flink-1.12.0
改名或创建软链接
mv flink-1.12.0 flink In -s /export/server/flink-1.12.0 /export/server/flink
准备文件/usr/local/soft/flink/data/words.txt
vim /usr/local/soft/flink/data/words.txt
hello me you her hello me you hello me hello
数据随便放一点进去,只是为了一个测试
启动flink本地“集群”
/usr/local/soft/flink/flink/bin/start-cluster.sh
使用jps查看以下两个进程
-TaskManagerRunner -StandaloneSessionClusterEntrypoint
访问Flink的Web UI
Ip:8081/#/overview
执行官方示例
/usr/local/soft/flink/flink/bin/flink run /usr/local/soft/flink/flink/examples/batch/WordCount.jar --input /usr/local/soft/flink/data/words.txt --output /usr/local/soft/flink/data/out
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uYc1LTPl-1647863509992)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317212440606.png)]
停止flink
/usr/local/soft/flink/flink/bin/stop-cluster.sh
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GSBsCwN8-1647863509994)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317212701681.png)]
启动shell交互窗口(目前所有的scala 2.12版本的安装包暂时都不支持Scala shell)
/usr/local/soft/flink/flink/bin/start-scala-shell.sh local
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Mb1r2Mal-1647863509997)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317213220899.png)]
执行如下命令
退出shell
:quit2.2. Standalone独立集群模式 2.2.1. 原理
- client客户端提交任务给JobManagerJobManager负责申请任务运行所需要的资源并管理任务和资源JobManager分发任务给TaskManager执行TaskManager定期向JobManager汇报状态
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QSJF8YqJ-1647863509998)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220317213341593.png)]
2.2.2. 操作1.集群规划: .
服务器:master(Master + Slave): JobManager + TaskManager
服务器:node1(Slave): TaskManager+ TaskManager服务器:node2(Slave): TaskManager
- 修改配置文件
2.1修改 flink-conf.yaml
vim /usr/local/soft/flink/flink/conf/flink-conf.yaml jobmanager.rpc.address: master taskmanager.numberofTaskSlots: 2 web.submit.enable: true #历史服务器 jobmanager.archive.fs.dir: hdfs://master:8082/flink/completed-jobs/ historyserver.web.address: node1 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://master:8082/flink/completed-jobs/
2.2 修改masters
vim /usr/local/soft/flink/flink/conf/masters master:8081
2.3修改slaves
master node1 node2
2.4添加HADOOP_CONF_DIR环境变量
vim /etc/profile export HADOOP_CONF_DIR=/usr/local/soft/hadoop/hadoop-2.7.7/etc/hadoop
2.5分发
scp -r /usr/local/soft/flink/flink node1:/usr/local/soft/flink/ scp -r /usr/local/soft/flink/flink node2:/usr/local/soft/flink/2.2.3.测试
- 启动集群,在master上面直接执行下面的命令
/usr/local/soft/flink/flink/bin/start-cluster.sh 或者单独启动
- 启动历史服务器
/usr/local/soft/flink/flink/bin/historyserver.sh start
测试案例
/usr/local/soft/flink/flink/bin/flink run /usr/local/soft/flink/flink/examples/batch/WordCount.jar
停止集群
/usr/local/soft/flink/flink/bin/stop-cluster.sh2.3. Standalone–HA高可用集群模 式 2.3.1 原理
从之前的架构中我们可以很明显的发现JobManager有明显的单点问题(SPOF, single point of failure)。
JobManager肩负着任务调度以及资源分配,一旦JobManager出现意外,其后果可想而知。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ku5O1ipG-1647863510002)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220318002639644.png)]
2.3.2 操作1.集群规划: .
服务器:master(Master + Slave): JobManager + TaskManager
服务器:node1(Slave): TaskManager+ TaskManager服务器:node2(Slave): TaskManager
启动ZooKeeper
zkServer.sh status zkServer.sh stop zkServer.sh start
启动HDFS
hadoop/sbin/start-dfs.sh
停止Flink集群
flink/bin/stop-cluster.sh
修改flink-conf.yaml
vim /usr/local/soft/flink/flink/conf/flink-conf.yaml
增加如下配置
state.backend: filesystem state.backend.fs.checkpointdir: hdfs://master:8081/flink-checkpoints high-availability: zookeeper high-availability.storageDir: hdfs://master:8020/flink/ha/ high-availability.zookeeper.quorum: master:2181,node1:2181,node2:2181
在conf文件中修改master
master:8081 node1:8081
- 同步,分发
scp -r conf/ node2:/usr/local/soft/flink/flink scp -r conf/ node1:/usr/local/soft/flink/flink2.3.3. 测试 2.4. Flink-On-Yarn 2.4.1. 原理
原理
为什么使用flink on yarn ?
在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
-1.Yarn的资源可以按需使用,提高集群的资源利用率
-2.Yarn的任务有优先级,根据优先级运行作业
-3.基于Yarn调度系统,能够自动化地处理各个角色的Failover(容错)
O JobManager进程和TaskManager进程都由Yarn NodeManager监控
O如果JobManager进程异常退出,则Yarn ResourceManager会重新调度JobManager到其他机器
O如果TaskManager进程异常退出,JobManager 会收到消息并重新向Yarn ResourceManager申请资源,重新启
动TaskManager
Flink如何和Yarn进行交互
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-58ZqsazZ-1647863510003)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320102650362.png)]
- Flink ON Yarn模式
3.1 session模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1yasnvyc-1647863510004)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320102959250.png)]
3.2 Per-Job模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-M6pHC8GU-1647863510005)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320103326813.png)]
2.4.2. 操作1、配置HADOOP_CONF_DIR
vim /etc/profile export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/
2、将hadoop依赖jar上传到flink lib目录
flink-shaded-hadoop-2-uber-2.6.5-10.0
flink和spark一样都是粗粒度资源申请
flink启动方式
1、yarn-session 在yarn里面启动一个flink集群 jobManager(ApplicationMaster) yarn-session是所有任务共享同一个jobmanager 先启动hadoop yarn-session.sh -jm 1024m -tm 1096m
提交任务 任务提交的是偶根据并行度动态申请taskmanager 1、在web页面提交任务 2、同flink命令提交任务 flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar 3、rpc方式提交任务 关闭yarn-session yarn application -kill application_1647657435495_0001
2、直接提交任务到yarn 每一个任务都会有一个
jobManager flink run -m yarn-cluster -yjm 1024m -ytm 1096m -c com.shujia.flink.core.Demo1WordCount flink-1.0.jar
杀掉yarn上的任务 yarn application -kill application_1599820991153_0005 查看日志 yarn logs -applicationId application_1647657435495_0002
yarn-session先在yarn中启动一个jobMansager ,所有的任务共享一个jobmanager (提交任务更快,任务之间共享jobmanager , 相互有影响) 直接提交任务模型,为每一个任务启动一个joibmanager (每一个任务独立jobmanager , 任务运行稳定)3. Flink入门介绍 3.1. 前置说明
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AW9rs1Aq-1647863510007)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320104118153.png)]
注意:入门案例使用DataSet后续就不再使用,而是统一使用流批一体的DataStream
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GTPmmeYS-1647863510009)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320124018188.png)]
3.1.1编程模型Flink应用程序结构主要包含三部分,Source/Transformation/Sink,如图三部分
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CFQFS65T-1647863510015)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320124233273.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pEz917YC-1647863510018)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320124402897.png)]
3.2. 准备环境创建maven项目
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mwGhdoU1-1647863510019)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320131126488.png)]
pom依赖文件
3.3.代码实现小案例,WordCountorg.apache.flink flink-walkthrough-common_${scala.binary.version}${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version}${flink.version} org.apache.flink flink-clients_${scala.binary.version}${flink.version} org.apache.logging.log4j log4j-slf4j-impl${log4j.version} org.apache.logging.log4j log4j-api${log4j.version} org.apache.logging.log4j log4j-core${log4j.version} mysql mysql-connector-java5.1.40 org.apache.maven.plugins maven-compiler-plugin3.1 1.8 1.8 org.scala-tools maven-scala-plugin2.15.2 compile testCompile
示例代码
不会写就先照葫芦画瓢,这里是官网的代码示例
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WKlFxIFD-1647863510022)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320131349179.png)]
自己实现
package blockhouse
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala._
object DemoWordCount {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(2)
//这里使用socket来读取数据
val dataDS: DataStream[String] = env.socketTextStream("master", 9999)
//打印一下结果
//dataDS.print()
//把数据展开
val wordsDS: DataStream[String] = dataDS.flatMap(line => line.split(","))
//装成kv格式
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//按照单词进行分组,和groupBy比较类似,kv => kv._1制定一个字段进行分组
//指定完成后,这里变成了KeyedStream流
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
//统计数量,对value进行求和,这里的value就是1,指定下标进行聚合。
val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
//打印最终的结果
countDS.print()
//这里的jobname是在启动flink的时候指定的,但是spark中是在启动环境的时候启动的
env.execute("workcount")
}
}
spark和flink的本质区别
主要是在suffer端不太一样
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iGAxQqwq-1647863510025)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320143540274.png)]
数据从上游的task发送到下游task的时候有缓冲
时间超过200毫秒数据量达到32kb
并不是一条数据发送一次,这样的话性能会很差。上面两个条件满足一个,就会把数据发送到下游
3.4. Source:数据源Flink在流处理和批处理上的source大概有4类
基于本地集合的source–>有界流
基于文件的source–>有界流
基于网络套接字的source–>无界流
自定义的source。自定义的source常见的有Apache kafka、Amazon Kinesis Streams、RabbitMQ、 Twitter Streaming API、Apache Nifi 等,当然你也可以定义自己的source。–>无界流
- 这个是通过fromCollection(List"“))这个方法来创建一个有界流的数据这个
listDS.flatMap(_.split(","))
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()
求WordCount
package blockhouse.source
import org.apache.flink.streaming.api.scala._
object DemoListSource {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val listDS: DataStream[String] = env.fromCollection(List("java,java","spark","hadoop","javba","hadoop,java,java,hadoop"))
listDS.flatMap(_.split(","))
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()
env.execute("WordCount")
}
}
这个是通过readTextFile方法来读取本地文件的有界流数据
下面的处理仍然是求WordCount
package blockhouse.source
import org.apache.flink.streaming.api.scala._
object DemoFIleDemo {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val fileDS: DataStream[String] = env.readTextFile("data/students.txt")
fileDS.map(stu => {
val clazz: String = stu.split(",")(4)
(clazz,1)
})
.keyBy(_._1)
.sum(1)
.print()
env.execute("DemoFIleDemo")
}
}
自定义的Source,这里需要继承SourceFunction方法,重写run方法和cancel方法
package blockhouse.source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object DemoSourceFunction {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val myDS: DataStream[Int] = env.addSource(new MySource)
myDS.print()
env.execute()
}
}
class MySource extends SourceFunction[Int]{
override def run(ctx: SourceFunction.SourceContext[Int]): Unit ={
var i = 0
while(i < 100) {
i += 1
ctx.collect(i)
}
}
//任务被取消的时候执行,一般用于回收资源
override def cancel(): Unit = {
}
}
- 自定义source的好处很明显,我们可以自定义的读取任何来源的数据,这里简单的介绍一下读取mysql的数据。
package blockhouse.source
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
object DemoMysqlSource {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val result: DataStream[(Int, String, String, String)] = env.addSource(new MysqlSource)
result.print()
env.execute("MysqlSource")
}
}
class MysqlSource extends RichSourceFunction[(Int,String,String,String)]{
var connDS: Connection = _
override def open(parameters: Configuration): Unit = {
//使用jdbc读取mysql
Class.forName("com.mysql.jdbc.Driver")
//建立连接
connDS = DriverManager.getConnection("jdbc:mysql//192.168.3.67:3306/kayleigh", "root", "123456")
}
override def close(): Unit ={
connDS.close()
}
override def run(ctx: SourceFunction.SourceContext[(Int,String,String,String)]): Unit = {
//查询数据
val sta1: PreparedStatement = connDS.prepareStatement("select * from runoob_tbl")
//执行查询
val resultSet: ResultSet = sta1.executeQuery()
while(resultSet.next()){
val id: Int = resultSet.getInt("runoob_id")
val title: String = resultSet.getString("runoob_title")
val author: String = resultSet.getString("runoob_author")
val date: String = resultSet.getString("submission_date")
//将数据发送到下游
ctx.collect((id,title,author,date))
}
}
override def cancel(): Unit = {
}
}
4. DataStream常用算子
上面讲的是Source数据源层,这里来介绍TransFormation,数据转换的各种操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Zl4gMbgy-1647863510027)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320214809664.png)]
在Flink应用程序中,无论你的应用程序是批程序,还是流程序,都是上图这种模型,有数据源(source),有数据下游(sink) ,我们写的应用程序多是对数据源过来的数据做一系列操作, 总结如下。
● Source:数据源,Flink在流处理和批处理.上的source大概有4类:基于本地集合的source、 基于文件的
source、基于网络套接字的source、自定义的source。自定义的source常见的有Apache kafka、Amazon
Kinesis Streams、RabbitMQ、 Twitter Streaming API、Apache NiFi等,当然你也可以义自己的
source。
●Transformation:数据转换的各种操作,有Map / FlatMap/ Filter 1 KeyBy / Reduce / Fold / Aggregations /Window / WindowAll / Union / Window join / Split / Select/ Project等,操作很多,可以将数据转换计算成
你想要的数据。
●Sink:接收器,Sink是指Flink将转换计算后的数据发送的地点,你可能需要存储下来。Flink常见的Sink大概有如下几类:写入文件、打印出来、写入Socket、自定义的Sink。 自定义的sink常见的有Apache kafka、RabbitMQ、 MySQL、ElasticSearch、 Apache Cassandra、Hadoop FileSystem 等,同理你也可以
定义自己的Sink.
Map 算子的输入流是 DataStream,经过 Map 算子后返回的数据格式是 SingleOutputStreamOperator 类型,获取一个元素并生成一个元素,举个例子:
SingleOutputStreamOperatormap = employeeStream.map(new MapFunction () { @Override public Employee map(Employee employee) throws Exception { employee.salary = employee.salary + 5000; return employee; } }); map.print();
新的一年给每个员工的工资加 5000。
4.2. FlatMapFlatMap 算子的输入流是 DataStream,经过 FlatMap 算子后返回的数据格式是 SingleOutputStreamOperator 类型,获取一个元素并生成零个、一个或多个元素,举个例子:
SingleOutputStreamOperatorflatMap = employeeStream.flatMap(new FlatMapFunction () { @Override public void flatMap(Employee employee, Collector out) throws Exception { if (employee.salary >= 40000) { out.collect(employee); } } }); flatMap.print();
将工资大于 40000 的找出来。
4.3. FilterSingleOutputStreamOperator filter = ds.filter(new FilterFunction() { @Override public boolean filter(Employee employee) throws Exception { if (employee.salary >= 40000) { return true; } return false; } }); filter.print();
对每个元素都进行判断,返回为 true 的元素,如果为 false 则丢弃数据,上面找出工资大于 40000 的员工其实也可以用 Filter 来做:
4.4. KeyBy[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E8DeEuhd-1647863510030)(/Users/kayleigh/Library/Application Support/typora-user-images/image-20220320222244115.png)]
KeyBy 在逻辑上是基于 key 对流进行分区,相同的 Key 会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。举个例子:
KeyedStreamkeyBy = productStream.keyBy(new KeySelector () { @Override public Integer getKey(ProductEvent product) throws Exception { return product.shopId; } }); keyBy.print();
根据商品的店铺 id 来进行分区。
4.5. ReduceReduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average、sum、min、max、count,使用 Reduce 方法都可实现。
SingleOutputStreamOperatorreduce = employeeStream.keyBy(new KeySelector () { @Override public Integer getKey(Employee employee) throws Exception { return employee.shopId; } }).reduce(new ReduceFunction () { @Override public Employee reduce(Employee employee1, Employee employee2) throws Exception { employee1.salary = (employee1.salary + employee2.salary) / 2; return employee1; } }); reduce.print();
上面先将数据流进行 keyby 操作,因为执行 Reduce 操作只能是 KeyedStream,然后将员工的工资做了一个求平均值的操作。
4.6. AggregationsDataStream API 支持各种聚合,例如 min、max、sum 等。 这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。
KeyedStream.sum(0)
KeyedStream.sum("key")
KeyedStream.min(0)
KeyedStream.min("key")
KeyedStream.max(0)
KeyedStream.max("key")
KeyedStream.minBy(0)
KeyedStream.minBy("key")
KeyedStream.maxBy(0)
KeyedStream.maxBy("key")
max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。



