##关闭flink集群 [root@hadoop01 flink-1.10.1]# bin/stop-cluster.sh ### zookeeper bin/zkServer.sh start ##关闭所有服务 [root@hadoop01 flink-1.10.1]# stop-all.sh
- 数据下沉后为什么会生成多个文件?作业完成后生成的文件数与什么有关?并行度如何设置?优先级?
批,流一体化的框架
批:离线处理
流:实时处理
tar-xvzf
-x 从档案文件中释放文件。
-v 详细报告tar处理的文件信息。如无此选项,tar不报告文件信息。
-z 用gzip来压缩/解压缩文件,加上该选项后可以将档案文件进行压缩,
但还原时也一定要使用该选项进行解压缩。
-f 使用档案文件或设备,这个选项通常是必选的。
上传Flink 压缩包到指定目录
解压缩flink 到/opt/servers 目录
cd /opt/servers tar -xvzf flink-1.10.1-bin-scala_2.12.tgz -C ../servers/修改配置文件
修改安装目录下conf 文件夹内的flink-conf.yaml 配置文件,指定JobManager
cd /opt/servers/flink-1.10.1/conf/ ##创建目录 mkdir -p /opt/servers/flink-1.10.1/tmp
修改配置文件:flink-conf.yaml
#配置Master 的机器名(IP 地址) jobmanager.rpc.address: hadoop01 #配置每个taskmanager 生成的临时文件夹 taskmanager.tmp.dirs: /opt/servers/flink-1.10.1/tmp
修改安装目录下conf 文件夹内的slave 配置文件,指定TaskManager
hadoop01 hadoop02 hadoop03
使用vi 修改/etc/profile 系统环境变量配置文件,添加HADOOP_CONF_DIR 目录
export HADOOP_CONF_DIR=/opt/servers/hadoop-2.7.7/etc/hadoop
YARN_CONF_DIR 或者HADOOP_CONF_DIR 必须将环境变量设置为读取YARN 和HDFS 配置
新版本需要增加hadoop的附加组件,下载一个jar包放在Flink的lib目录下
下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/
分发/etc/profile 到其他两个节点
scp -r /etc/profile hadoop02:/etc scp -r /etc/profile hadoop03:/etc
每个节点重新加载环境变量
source /etc/profile
将配置好的Flink 目录分发给其他的两台节点
cd /opt/servers scp -r flink-1.10.1/ hadoop02:$PWD scp -r flink-1.10.1/ hadoop03:$PWD
启动Flink 集群
cd /opt/servers/flink-1.10.1 bin/start-cluster.sh
启动HDFS 集群
start-all.sh1.4 使用 Flink 测试词频统计
在HDFS 中创建/test/input 目录
hadoop fs -mkdir -p /test/input
上传wordcount.txt 文件到HDFS /test/input 目录
hadoop fs -put /root/wordcount.txt /test/input
并运行测试任务
bin/flink run examples/batch/WordCount.jar --input hdfs://hadoop01:8020/test/input/wordcount.txt --output hdfs://hadoop01:8020/test/output/001
不output直接打印控制台
浏览Flink Web UI 界面http://hadoop01:8081
启动flink集群
cd /opt/servers/flink-1.10.1 bin/start-cluster.sh高可用HA
修改slots
taskmanager.numberOfTaskSlots:
2 2 4
cd /opt/servers/flink-1.10.1/conf/ [root@hadoop03 conf]# vim flink-conf.yaml
org.apache.flink flink-core 1.10.1 org.apache.flink flink-java 1.10.1 org.apache.flink flink-streaming-java_2.12 1.10.1
package cn.tedu;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
//构建批处理的运行环境
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
//读取文件
DataSource source = env.readTextFile("flink_java/data/wordcount.txt");
// source.print();
//词频统计 <输入类型,输出类型>
FlatMapOperator> wordAndOne = source.flatMap(new FlatMapFunction>() {
//每来一条数据,调用一次flatmap处理数据
@Override
public void flatMap(String value, Collector> out) throws Exception {
//获取每个词的数组
String[] words = value.split(" ");
//循环输出
for (String word : words) {
out.collect(new Tuple2(word, 1));
}
}
});
// wordAndOne.print(); (hello,1)(hadoop,1)
UnsortedGrouping> groupeWords = wordAndOne.groupBy(0);
AggregateOperator> result = groupeWords.sum(1);
//数据sink保存
result.print();
}
}
2.2 scala
数据下沉后为什么会生成多个文件?
package cn.tedu.batch
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
object BatchWordCount {
def main(args: Array[String]): Unit = {
//构建Flink批处理的运行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//读取数据
//val source: DataSet[String] = env.readTextFile("FlinkSCALA/data/wordcount.txt")
val source: DataSet[String]
= env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
//source.print()
//hello flink kafka flink kafka flink --> hello ,1
//数据转化
//需要导入隐式转换
import org.apache.flink.api.scala._
val words: DataSet[String] = source.flatMap(_.split(" "))
val wordAndOne = words.map((_, 1))
//分组
val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
val result = groupedDataSet.sum(1)
//val result = source.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
//数据下沉
//result.print()
result.writeAsText("hdfs://hadoop01:8020/test/output/102")
//执行批处理,flink中是惰性加载
env.execute()
}
}
文件数 根据计算机处理器数生成
作业完成后生成的文件数与什么有关?
package cn.tedu.batch
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
object BatchWordCount {
def main(args: Array[String]): Unit = {
//构建Flink批处理的运行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//读取数据
//val source: DataSet[String] = env.readTextFile("FlinkSCALA/data/wordcount.txt")
val source: DataSet[String]
= env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
//source.print()
//hello flink kafka flink kafka flink --> hello ,1
//数据转化
//需要导入隐式转换
import org.apache.flink.api.scala._
val words: DataSet[String] = source.flatMap(_.split(" "))
val wordAndOne = words.map((_, 1))
//分组
val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
val result = groupedDataSet.sum(1)
//val result = source.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
//数据下沉
//result.print()
result.writeAsText("hdfs://hadoop01:8020/test/output/101",WriteMode.OVERWRITE)
//执行批处理,flink中是惰性加载
env.execute()
}
}
打包
上传服务器
[root@hadoop01 flink-1.10.1]# bin/flink run -m yarn-cluster /opt/data/flink_scala-1.0-SNAPSHOT.jar
并行度设置为2,submit
算子的并行度 > env环境的并行度 > 运行参数的并行度 > 配置文件的默认并行度
2.3 源的读取(scala)package cn.tedu.batch.source
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object BatchFromFile {
def main(args: Array[String]): Unit = {
//构建数据源
val env=ExecutionEnvironment.getExecutionEnvironment
//1.读取本地文件
val source1: DataSet[String]= env.readTextFile("flink_scala/data/wordcount.txt")
// source1.print()
//2.读取hdfs文件
val source2: DataSet[String] = env.readTextFile("hdfs://hadoop01:8020/test/input/wordcount.txt")
// source2.print()
//3.读取csv文件
import org.apache.flink.api.scala._
val source3: DataSet[Subject] = env.readCsvFile[Subject]("flink_scala/data/subject.csv")
// source3.print()
//4.读取压缩文件
val source4: DataSet[String]= env.readTextFile("flink_scala/data/wordcount.txt.gz")
source4.print()
}
case class Subject(id:Int,subjectName:String)
}



