Flink支持多种安装模式
- Local—本地单机模式,学习测试时使用
- Standalone—独立集群模式,Flink自带集群,开发测试环境使用
- StandaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用
- On Yarn—计算资源统一由Hadoop YARN管理,生产环境使用
1、Local本地模式 1.1 原理- Flink程序由JobClient进行提交
- JobClient将作业提交给JobManager
- JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
- TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。
- 作业执行完成后,结果将发送回客户端(JobClient)
1.下载安装包
Index of /dist/flinkhttps://archive.apache.org/dist/flink/
2.上传flink-1.14.0-bin-scala_2.11.tgz到node1的指定目录
https://archive.apache.org/dist/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.12.tgzhttps://archive.apache.org/dist/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.12.tgz3.解压
tar -zxvf flink-1.14.0-bin-scala_2.11.tgz
4.如果出现权限问题,需要修改权限
chown -R root:root flink-1.14.0
5.配置环境变量
export JAVA_HOME=/data/soft/jdk1.8.0_201 export Flink_HOME=/data/soft/flink-1.14.0 export PATH=$Flink_HOME/bin:$JAVA_HOME/bin:$PATH
6.启动flink以local本地方式
[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host node1. Starting taskexecutor daemon on host node1. [root@node1 flink-1.14.0]# jps 29090 NameNode 29286 DataNode 24182 Jps 23785 StandaloneSessionClusterEntrypoint 24106 TaskManagerRunner 29614 SecondaryNameNode
flink默认的web ui界面的端口为8081!!!
http://192.168.56.15:8081/#/overview
slot在Flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。
7.执行官方示例
[root@common soft]# /data/soft/flink-1.14.0/bin/flink run /data/soft/flink-1.14.0/examples/batch/WordCount.jar --input /data/soft/words.txt --output /data/soft/out.txt Job has been submitted with JobID 3a0f1f4c39da135aa6f994c78942a33b Program execution finished Job with JobID 3a0f1f4c39da135aa6f994c78942a33b has finished. Job Runtime: 1976 ms
vim /data/soft/words.txt hello me you her hello me you hello me hello
2、Standalone独立集群模式 2.1 原理
- client客户端提交任务给JobManager
- JobManager负责申请任务运行所需要的资源并管理任务和资源,
- JobManager分发任务给TaskManager执行
- TaskManager定期向JobManager汇报状态
- 服务器: node1(Master + Slave): JobManager + TaskManager
- 服务器: node2(Slave): TaskManager
- 服务器: node3(Slave): TaskManager
需要hadoop环境,则要安装hadoop和yarn!!!
安装教程如下:
Hadoop最新版本安装分布式集群部署_wtl1992的博客-CSDN博客1、首先从http://hadoop.apache.org/下载最新版本的hadoop2、解压到linux3、cd /mine/hadoop-3.3.0/etc/hadoop4、修改其中的一些文件(1)hadoop-env.sh,修改JAVS_HOME为具体的路径(2)core-site.xml vim /etc/profile 然后在node2、node3上执行: 或者单独启动 启动日志里会报错: 报错信息如下: Hadoop is not in the classp ath/dependencies. The extended set of supported File Systems via Hadoop is not available. 解决办法: 下载如下jar包: flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道.https://download.csdn.net/download/wtl1992/41702901 放到$Flink_HOME/lib下面即可!!! http://192.168.56.10:8081/#/overview http://192.168.56.10:8082/#/overview TaskManager界面:可以查看到当前Flink集群中有多少个TaskManager,每个TaskManager的slots、内存、CPU Core是多少 从之前的架构中我们可以很明显的发现 JobManager 有明显的单点问题(SPOF,single point of failure)。JobManager 肩负着任务调度以及资源分配,一旦 JobManager 出现意外,其后果可想而知。 在 Zookeeper 的帮助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。 - 服务器: node1(Master + Slave): JobManager + TaskManager - 服务器: node2(Master + Slave): JobManager + TaskManager - 服务器: node3(Slave): TaskManager 安装教程参照如下: Zookeeper集群的安装_wtl1992的博客-CSDN博客下载Releases · vran-dev/PrettyZoo · GitHubhttps://github.com/vran-dev/PrettyZoo/releaseshttps://blog.csdn.net/wtl1992/article/details/120392696?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522163680284816780264046740%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=163680284816780264046740&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_v2~rank_v29-3-120392696.pc_v2_rank_blog_default&utm_term=zookeeper&spm=1018.2226.3001.4450 增加如下内容: 配置解释: 9.重新启动Flink集群,node1上执行 注意: 使用jps命令查看 有可能发现没有Flink相关进程被启动!!! 因为在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar。 解决办法: 下载如下jar包: flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道.https://download.csdn.net/download/wtl1992/41702901 放到$Flink_HOME/lib下面即可!!! 然后分发到node2、node3即可。 http://192.168.56.10:8081/#/job-manager/config http://192.168.56.11:8081/#/job-manager/config 在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下: -1.Yarn的资源可以按需使用,提高集群的资源利用率 -2.Yarn的任务有优先级,根据优先级运行作业 -3.基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错) ○ JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控 ○ 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器 ○ 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager 1.Client上传jar包和配置文件到HDFS集群上 2.Client向Yarn ResourceManager提交任务并申请资源 3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager JobManager和ApplicationMaster运行在同一个container上。 一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。 它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。 这个配置文件也被上传到HDFS上。 此外,AppMaster容器也提供了Flink的web服务接口。 YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink 4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager 5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务 (1)Session模式 特点:需要事先申请资源,启动JobManager和TaskManger 优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率 缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源 应用场景:适合作业递交比较频繁的场景,小作业比较多的场景 (2)Per-Job模式 特点:每次递交作业都需要申请一次资源 优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源 缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间 应用场景:适合作业比较少的场景、大作业的场景 添加: 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。 在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job yarn-session.sh(开辟资源) + flink run(提交任务) 1.在yarn上启动一个Flink会话,node1上执行以下命令 说明: 申请2个CPU、1600M内存 # -n 表示申请2个容器,这里指的就是多少个taskmanager # -tm 表示每个TaskManager的内存大小 # -s 表示每个TaskManager的slots数量 # -d 表示以后台程序方式运行 注意: 最后一个 block 没有足够的副本数,文件关闭失败 堆栈信息: 解决办法: NameNode 收到 HDFS 客户端关闭文件请求之后,会检查最后一个 block 的完成状态,只有当足够的 DataNode 上报 block 完成才可关闭文件。 网络 IO 延迟和 GC 等原因都将导致 DataNode 延迟上报。 HDFS 客户端会多次尝试关闭文件,通过增加重试次数可减少该问题。 添加如下内容: 该警告不用管 WARN org.apache.hadoop.hdfs.DFSClient - Caught exception java.lang.InterruptedException 1.直接提交job # -m jobmanager的地址(只是我的hadoop没有配置高可用) # -yjm 1024 指定jobmanager的内存信息 # -ytm 1024 指定taskmanager的内存信息 注意: 最后一个 block 没有足够的副本数,文件关闭失败 堆栈信息: 解决办法: NameNode 收到 HDFS 客户端关闭文件请求之后,会检查最后一个 block 的完成状态,只有当足够的 DataNode 上报 block 完成才可关闭文件。 网络 IO 延迟和 GC 等原因都将导致 DataNode 延迟上报。 HDFS 客户端会多次尝试关闭文件,通过增加重试次数可减少该问题。 添加如下内容: 该警告不用管 WARN org.apache.hadoop.hdfs.DFSClient - Caught exception java.lang.InterruptedException 解决办法: 下载如下jar包: flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz-Java文档类资源-CSDN下载flink1.14.0与hadoop3.x的兼容包,放在flink的lib目录下更多下载资源、学习资料请访问CSDN下载频道.https://download.csdn.net/download/wtl1992/41702901 放到$Flink_HOME/lib下面即可!!!vim /data/soft/flink-1.14.0/conf/flink-conf.yaml
jobmanager.rpc.address: node1
taskmanager.numberOfTaskSlots: 4
web.submit.enable: true
#历史服务器
jobmanager.archive.fs.dir: hdfs://node1:9000/flink/completed-jobs/
historyserver.web.address: node1
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node1:9000/flink/completed-jobs/
3.修改masters
vim /data/soft/flink-1.14.0/conf/masters
node1:8081
4.修改slaves
vim /data/soft/flink-1.14.0/conf/workers
node1
node2
node3
5.添加HADOOP_CONF_DIR环境变量
export HADOOP_CONF_DIR=/data/soft/hadoop-3.3.1/etc/hadoop
6.分发
rsync -av flink-1.14.0 root@node2:/data/soft
rsync -av flink-1.14.0 root@node3:/data/soft
rsync -av /etc/profile root@node2:/etc/
rsync -av /etc/profile root@node3:/etc/
source /etc/profile
2.3.测试
1.启动集群,在node1上执行如下命令
[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node1.
Starting taskexecutor daemon on host node1.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
[root@node1 flink-1.14.0]# jps
29090 NameNode
1045 StandaloneSessionClusterEntrypoint
29286 DataNode
1465 TaskManagerRunner
29614 SecondaryNameNode
1583 Jps
/data/soft/flink-1.14.0/bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
/data/soft/flink-1.14.0/bin/taskmanager.sh start|start-foreground|stop|stop-all
[root@node1 flink-1.14.0]# jps
14336 Jps
29090 NameNode
12756 StandaloneSessionClusterEntrypoint
29286 DataNode
13111 TaskManagerRunner
29614 SecondaryNameNode
2.启动历史服务器
/data/soft/flink-1.14.0/bin/historyserver.sh start
/data/soft/hadoop-3.3.1/sbin/start-dfs.sh
4.停止Flink集群
[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 1465) on host node1.
Stopping taskexecutor daemon (pid: 26295) on host node2.
Stopping taskexecutor daemon (pid: 26675) on host node3.
Stopping standalonesession daemon (pid: 1045) on host node1.
5.修改flink-conf.yaml
vim /data/soft/flink-1.14.0/conf/flink-conf.yaml
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:9000/flink-checkpoints
high-availability: zookeeper
high-availability.storageDir: hdfs://node1:9000/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
#开启HA,使用文件系统作为快照存储
state.backend: filesystem
#启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://node1:9000/flink-checkpoints
#使用zookeeper搭建高可用
high-availability: zookeeper
# 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://node1:9000/flink/ha/
# 配置ZK集群地址
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
6.修改masters
vim /data/soft/flink-1.14.0/conf/masters
node1:8081
node2:8081
7.同步
rsync -av /data/soft/flink-1.14.0 root@node2:/data/soft
rsync -av /data/soft/flink-1.14.0 root@node3:/data/soft
8.修改node2上的flink-conf.yaml
vim /data/soft/flink-1.14.0/conf/flink-conf.yaml
jobmanager.rpc.address: node2
[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/stop-cluster.sh
No taskexecutor daemon (pid: 24106) is running anymore on node1.
No taskexecutor daemon (pid: 26122) is running anymore on node2.
No taskexecutor daemon (pid: 26548) is running anymore on node3.
No standalonesession daemon (pid: 23785) is running anymore on node1.
No standalonesession daemon to stop on host node2.
[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host node1.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node1.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/flink run /data/soft/flink-1.14.0/examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 1eb0896a4ba2ba995dc88ece3dcd8fc5
Program execution finished
Job with JobID 1eb0896a4ba2ba995dc88ece3dcd8fc5 has finished.
Job Runtime: 1815 ms
Accumulator Results:
- 98d0871f23ab6fc907e47dae6d036868 (java.util.ArrayList) [170 elements]
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
(bear,3)
(bodkin,1)
(bourn,1)
(but,1)
(by,2)
(calamity,1)
(cast,1)
(coil,1)
(come,1)
(conscience,1)
(consummation,1)
(contumely,1)
(country,1)
(cowards,1)
(currents,1)
(d,4)
(death,2)
(delay,1)
(despis,1)
(devoutly,1)
(die,2)
(does,1)
(dread,1)
(dream,1)
(dreams,1)
(end,2)
(enterprises,1)
(er,1)
(fair,1)
(fardels,1)
(flesh,1)
(fly,1)
(for,2)
(fortune,1)
(from,1)
(give,1)
(great,1)
(grunt,1)
(have,2)
(he,1)
(heartache,1)
(heir,1)
(himself,1)
(his,1)
(hue,1)
(ills,1)
(in,3)
(insolence,1)
(is,3)
(know,1)
(law,1)
(life,2)
(long,1)
(lose,1)
(love,1)
(make,2)
(makes,2)
(man,1)
(may,1)
(merit,1)
(might,1)
(mind,1)
(moment,1)
(more,1)
(mortal,1)
(must,1)
(my,1)
(name,1)
(native,1)
(natural,1)
(no,2)
(nobler,1)
(not,2)
(now,1)
(nymph,1)
(o,1)
(of,15)
(off,1)
(office,1)
(ophelia,1)
(opposing,1)
(oppressor,1)
(or,2)
(orisons,1)
(others,1)
(outrageous,1)
(pale,1)
(pangs,1)
(patient,1)
(pause,1)
(perchance,1)
(pith,1)
(proud,1)
(puzzles,1)
(question,1)
(quietus,1)
(rather,1)
(regard,1)
(remember,1)
(resolution,1)
(respect,1)
(returns,1)
(rub,1)
(s,5)
(say,1)
(scorns,1)
(sea,1)
(shocks,1)
(shuffled,1)
(sicklied,1)
(sins,1)
(sleep,5)
(slings,1)
(so,1)
(soft,1)
(something,1)
(spurns,1)
(suffer,1)
(sweat,1)
(take,1)
(takes,1)
(than,1)
(that,7)
(the,22)
(their,1)
(them,1)
(there,2)
(these,1)
(this,2)
(those,1)
(thought,1)
(thousand,1)
(thus,2)
(thy,1)
(time,1)
(tis,2)
(to,15)
(traveller,1)
(troubles,1)
(turn,1)
(under,1)
(undiscover,1)
(unworthy,1)
(us,3)
(we,4)
(weary,1)
(what,1)
(when,2)
(whether,1)
(whips,1)
(who,2)
(whose,1)
(will,1)
(w
[root@node1 flink-1.14.0]# jps
25584 TaskManagerRunner
29090 NameNode
6261 HistoryServer
12485 QuorumPeerMain
11973 Jps
29286 DataNode
25179 StandaloneSessionClusterEntrypoint
29614 SecondaryNameNode
[root@node1 flink-1.14.0]# kill -kill 25179
[root@node1 flink-1.14.0]# jps
25584 TaskManagerRunner
29090 NameNode
6261 HistoryServer
12485 QuorumPeerMain
29286 DataNode
12397 Jps
29614 SecondaryNameNode
4.重新执行wc,还是可以正常执行
[root@node2 flink-1.14.0]# /data/soft/flink-1.14.0/bin/flink run /data/soft/flink-1.14.0/examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 73cad18e17d962e26ce57aed3b0bdc86
Program execution finished
Job with JobID 73cad18e17d962e26ce57aed3b0bdc86 has finished.
Job Runtime: 860 ms
Accumulator Results:
- 7ce63948462d538a891e144b030e5724 (java.util.ArrayList) [170 elements]
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)
(ay,1)
(bare,1)
(be,4)
(bear,3)
(bodkin,1)
(bourn,1)
(but,1)
(by,2)
(calamity,1)
(cast,1)
(coil,1)
(come,1)
(conscience,1)
(consummation,1)
(contumely,1)
(country,1)
(cowards,1)
(currents,1)
(d,4)
(death,2)
(delay,1)
(despis,1)
(devoutly,1)
(die,2)
(does,1)
(dread,1)
(dream,1)
(dreams,1)
(end,2)
(enterprises,1)
(er,1)
(fair,1)
(fardels,1)
(flesh,1)
(fly,1)
(for,2)
(fortune,1)
(from,1)
(give,1)
(great,1)
(grunt,1)
(have,2)
(he,1)
(heartache,1)
(heir,1)
(himself,1)
(his,1)
(hue,1)
(ills,1)
(in,3)
(insolence,1)
(is,3)
(know,1)
(law,1)
(life,2)
(long,1)
(lose,1)
(love,1)
(make,2)
(makes,2)
(man,1)
(may,1)
(merit,1)
(might,1)
(mind,1)
(moment,1)
(more,1)
(mortal,1)
(must,1)
(my,1)
(name,1)
(native,1)
(natural,1)
(no,2)
(nobler,1)
(not,2)
(now,1)
(nymph,1)
(o,1)
(of,15)
(off,1)
(office,1)
(ophelia,1)
(opposing,1)
(oppressor,1)
(or,2)
(orisons,1)
(others,1)
(outrageous,1)
(pale,1)
(pangs,1)
(patient,1)
(pause,1)
(perchance,1)
(pith,1)
(proud,1)
(puzzles,1)
(question,1)
(quietus,1)
(rather,1)
(regard,1)
(remember,1)
(resolution,1)
(respect,1)
(returns,1)
(rub,1)
(s,5)
(say,1)
(scorns,1)
(sea,1)
(shocks,1)
(shuffled,1)
(sicklied,1)
(sins,1)
(sleep,5)
(slings,1)
(so,1)
(soft,1)
(something,1)
(spurns,1)
(suffer,1)
(sweat,1)
(take,1)
(takes,1)
(than,1)
(that,7)
(the,22)
(their,1)
(them,1)
(there,2)
(these,1)
(this,2)
(those,1)
(thought,1)
(thousand,1)
(thus,2)
(thy,1)
(time,1)
(tis,2)
(to,15)
(traveller,1)
(troubles,1)
(turn,1)
(under,1)
(undiscover,1)
(unworthy,1)
(us,3)
(we,4)
(weary,1)
(what,1)
(when,2)
(whether,1)
(whips,1)
(who,2)
(whose,1)
(will,1)
(wish,1)
(
5.停止集群
[root@node1 flink-1.14.0]# /data/soft/flink-1.14.0/bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 25584) on host node1.
Stopping taskexecutor daemon (pid: 15256) on host node2.
Stopping taskexecutor daemon (pid: 15288) on host node3.
No standalonesession daemon (pid: 25179) is running anymore on node1.
Stopping standalonesession daemon (pid: 14795) on host node2.
4、Flink On Yarn模式
4.1 原理
4.1.1 为什么使用Flink On Yarn?
vim /data/soft/hadoop-3.3.1/etc/hadoop/yarn-site.xml
说明:
rsync -av /data/soft/hadoop-3.3.1 root@node2:/data/soft
rsync -av /data/soft/hadoop-3.3.1 root@node3:/data/soft
3.重启yarn
[root@node1 hadoop-3.3.1]# /data/soft/hadoop-3.3.1/sbin/stop-yarn.sh
Stopping nodemanagers
上一次登录:六 11月 13 16:10:02 EST 2021从 192.168.56.1pts/1 上
Stopping resourcemanager
上一次登录:六 11月 13 16:46:03 EST 2021pts/0 上
[root@node1 hadoop-3.3.1]# /data/soft/hadoop-3.3.1/sbin/start-yarn.sh
Starting resourcemanager
上一次登录:六 11月 13 16:46:07 EST 2021pts/0 上
Starting nodemanagers
上一次登录:六 11月 13 16:46:44 EST 2021pts/0 上
/data/soft/flink-1.14.0/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
java.io.IOException: Unable to close file because the last block does not have enough number of replicas.
at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2528)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2495)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2458)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
vim /data/soft/hadoop-3.3.1/etc/hadoop/core-site.xml
/data/soft/flink-1.14.0/bin/flink run -yjm 1024 -ytm 1024 /data/soft/flink-1.14.0/examples/batch/WordCount.jar
java.io.IOException: Unable to close file because the last block does not have enough number of replicas.
at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2528)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2495)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2458)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
vim /data/soft/hadoop-3.3.1/etc/hadoop/core-site.xml



