本文章用到:
flink-1.10.2-bin-scala_2.11.tgz 集群:三台【master、slave1、slave2】单机模式部署
- Hadoop集群普通分布式搭建(省略)
- /etc/hosts、免密(略)
- 解压tar -zxf flink-1.10.2-bin-scala_2.11.tgz -C /usr/apps
- 重命名:mv flink-1.10.2-bin-scala_2.11 flink-1.10.2
- 修改配置文件:
Ⅰ. vim ./conf/masters
[root@master flink-1.10.2]# vim ./conf/masters # 修改内容如下: master:8081
Ⅱ. vim ./conf/slaves
[root@master flink-1.10.2]# vim ./conf/slaves # 修改内容如下: slave1 slave2
Ⅲ. vim ./conf/flink-conf.yaml
[root@master flink-1.10.2]# vim ./conf/flink-conf.yaml #修改如下: # 工作任务提交入口 jobmanager.rpc.address: master # 任务调度空间 jobmanager.heap.size: 1024m # 线程运行空间【此设置可放大空间】 taskmanager.memory.process.size: 1728m #插槽数设置【多线程数量,为“1”则是单线程】 taskmanager.numberOfTaskSlots: 2
- 分发
scp -r /usr/apps/flink-1.10.2 slave1:/usr/apps/ scp -r /usr/apps/flink-1.10.2 slave2:/usr/apps/
- 启动
# master [root@master flink-1.10.2]# ./bin/start-cluster.sh Starting cluster. [INFO] 1 instance(s) of standalonesession are already running on master. Starting standalonesession daemon on host master. Starting taskexecutor daemon on host slave1. Starting taskexecutor daemon on host slave2. [root@master flink-1.10.2]# jps 23952 StandaloneSessionClusterEntrypoint 12967 NameNode 13751 Master 13048 ResourceManager 25032 Jps 13227 DataNode 24268 TaskManagerRunner 13246 NodeManager [root@master flink-1.10.2]# # slave1 [root@slave1 ~]# jps 3746 NodeManager 3989 Worker 3687 DataNode 14024 Jps 13962 TaskManagerRunner 3933 SecondaryNameNode [root@slave1 ~]# # slave2 [root@slave2 ~]# jps 3840 Worker 3603 DataNode 11850 TaskManagerRunner 11898 Jps 3662 NodeManager [root@slave2 ~]#HA模式部署
- 修改zookeeper配置文件zoo.cfg
[root@master flink-1.10.2]# vim ./conf/zoo.cfg server.1=master:2888:3888 server.2=slave1:2888:3888 server.3=slave2:2888:3888
- 修改masters
[root@master flink-1.10.2]# vim ./conf/masters master1:8081 master2:8081
- 修改slaves
[root@master flink-1.10.2]# vim ./conf/slaves master1 master2 slave1
- 修改flink-conf.yaml
[root@master flink-1.10.2]# vim ./conf/flink-conf.yaml jobmanager.rpc.port: 6123 # The heap size for the JobManager JVM jobmanager.heap.size: 1024m # The heap size for the TaskManager JVM taskmanager.heap.size: 1024m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. taskmanager.numberOfTaskSlots: 2 # The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1 # The default file system scheme and authority. # # By default file paths without scheme are interpreted relative to the local # root file system 'file:///'. Use this to override the default and interpret # relative paths relative to a different file system, # for example 'hdfs://mynamenode:12345' # # fs.default-scheme #============================================================================== # High Availability #============================================================================== # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. # high-availability: zookeeper # The path where metadata for master recovery is persisted. While ZooKeeper stores # the small ground truth for checkpoint and leader election, this location stores # the larger objects, like persisted dataflow graphs. # # Must be a durable file system that is accessible from all nodes # (like HDFS, S3, Ceph, nfs, ...) # high-availability.storageDir: hdfs://jh/flink/ha/ # The list of ZooKeeper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) # high-availability.zookeeper.quorum: node7-1:2181,node7-2:2181,node7-3:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /flinkCluster # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) # The default value is "open" and it can be changed to "creator" if ZK security is enabled # # high-availability.zookeeper.client.acl: open #============================================================================== # Fault tolerance and checkpointing #============================================================================== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the #. # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # state.checkpoints.dir: hdfs://node7-1/flink/checkpoints # Default target directory for savepoints, optional. # state.savepoints.dir: hdfs://node7-1/flink/checkpoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). # # state.backend.incremental: false #============================================================================== # Web Frontend #============================================================================== # The address under which the web-based runtime monitor listens. # #web.address: 0.0.0.0 # The port under which the web-based runtime monitor listens. # A value of -1 deactivates the web server. rest.port: 8081
- 修改bin目录下的config.sh
[root@master flink-1.10.2]# vim ./bin/config.sh
如下:
- flink的lib目录:上传Hadoop所需jar包
flink-shaded-hadoop2-uber-1.7.2.jar
- 验证:
杀死master1的进程,master2会顶上即为成功



