- Flink On Yarn模式配置
- 引言
- 一、安装JDK
- 二、安装Hadoop
- 三、安装Zookeeper
- 四、安装Flink
引言
Flink依靠Yarn来实现高可用,由于Yarn依赖于Hadoop,而Hadoop又依赖于Jdk。
准备三台机器
1.1.1.1 node1
1.1.1.2 node2
1.1.1.3 node3
一、安装JDK1. 下载解压 tar -xvf jdk-8u271-linux-x64.tar.gz -C /usr/local mv jdk_1.8.271 jdk 2. 配置环境变量 export JAVA_HOME=/usr/local/jdk export PATH=$PATH:$JAVA_HOME/bin export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar 3. 验证 java -version二、安装Hadoop
1. 配置hosts,做主机名到ip地址映射,每台机器都要更改 vi /etc/hosts 添加如下内容 1.1.1.1 node1 1.1.1.2 node2 1.1.1.3 node3 2. 配置ssh免密登录 ssh-keygen ssh-copy-id node2 ssh-copy-id node3 3. 解压hadoop安装包 tar -xvf hadoop-2.10.1.tar.gz -C /usr/local mv hadoop-2.10.1 hadoop 4. 配置环境变量 export HADOOP_HOME=/usr/local/hadoop export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin
5. 配置HDFS集群 1. hadoop-env.sh 添加jdk路径 export JAVA_HOME=/usr/local/jdk 2. core-site.xml3. hdfs-site.xml hadoop.tmp.dir file:/usr/local/hadoop/data/hdfs/tmp A base for other temporary directories. io.file.buffer.size 131072 fs.defaultFS hdfs://ns hadoop.proxyuser.root.hosts * hadoop.proxyuser.root.groups * dfs.journalnode.edits.dir /usr/local/hadoop/data/hdfs/journal ha.zookeeper node1:2181,node2:2181,node3:2181 4. mapred-site.xml dfs.replication 2 dfs.block.size 134217728 dfs.namenode.name.dir file:///usr/local/hadoop/data/hdfs/namenode dfs.datanode.data.dir file:///usr/local/hadoop/data/hdfs/datanode dfs.namenode.edits.dir file:///usr/local/hadoop/data/hdfs/nn/edits dfs.nameservices ns dfs.ha.namenodes.ns nn1,nn2 dfs.namenode.rpc-address.ns.nn1 node1:9000 dfs.namenode.rpc-address.ns.nn2 node2:9000 dfs.namenode.http-address.ns.nn1 node1:50070 dfs.namenode.http-address.ns.nn2 node2:50070 dfs.namenode.shard.edits.dir qjournal://node1:8485;node2:8485;node3:8485/ns dfs.namenode.secondary.http-address node1:9001 dfs.webhdfs.enabled true dfs.ha.automatic-failover.enabled.ns true dfs.permissions false dfs.ha.fencing.methods sshfence dfs.ha.fencing.ssh.private-key-files ~/.ssh/id_rsa dfs.client.failover.proxy.provider.ns org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider mapreduce.framework.name yarn mapreduce.jobhistory.address node1:10200 mapreduce.jobhistory.webapp.address node1:19888
6. 配置yarn集群 yarn-site.xmlyarn.nodemanager.aux-services mapreduce_shuffle yarn.nodemanager.aux-services.mapreduce.shuffle.class org.apache.hadoop.mapred.ShuffleHandler yarn.resourcemanager.ha.enabled true yarn.resourcemanager.cluster-id ns yarn.resourcemanager.ha.rm-ids rm1,rm2 yarn.resourcemanager.hostname.rm1 node1 yarn.resourcemanager.hostname.rm2 node2 yarn.resourcemanager.webapp.address.rm1 node1:8088 yarn.resourcemanager.webapp.address.rm2 node2:8088 yarn.resourcemanager.recovery.enabled true yarn.resourcemanager.store.class org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore yarn.log-aggregation-enable true yarn.log-aggregation-retain-seconds 604800 yarn.resourcemanager.zk-address node1:2181,node2:2181,node3:2181 yarn.nodemanager.resource.memory-mb 4096 yarn.nodemanager.vmem-check-enabled false yarn.nodemanager.pmem-check-enabled false yarn.client.failover-proxy-provider org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider yarn.resourcemanager.ha.automatic-failover.enabled true yarn.resourcemanager.am.max-attempts 10
7. 将/usr/local/hadoop文件夹分发给slave1和slave2 scp -r hadoop root@node2:/usr/local/ scp -r hadoop root@node3:/usr/local/ 8. 启动集群 1) 在node1上 hdfs zkfc -formatZK 2) 在三个节点分别启动 hadoop-daemon.sh start journalnode 3) 在node1 hdfs namenode -format hadoop-daemon.sh start namenode 4) 在node2上 hdfs namenode -bootstrapStandby hadoop-daemon.sh start namenode 5) 在node1和node2上 hadoop-daemon.sh start zkfc 6) 在三个节点上分别启动 hadoop-daemon.sh start datanode 7) 在node1和node2上 yarn-daemon.sh start resourcemanager 8) 在三个节点上分别启动 yarn-daemon.sh start nodemanager 10. 验证 jps 日常启动 在三个节点分别启动 hadoop-daemon.sh start journalnode 在node1和node2启动 hadoop-daemon.sh start zkfc 一键启动 start-dfs.sh start-yarn.sh三、安装Zookeeper
1. 下载解压 tar -xvf apache-zookeeper-3.5.9-bin.tar.gz -C /usr/local mv /usr/local/apache-zookeeper-3.5.9 /usr/local/zookeeper 2. 修改用户名和用户组权限 chown -R root:root zookeeper/ 3. 配置环境变量 4. 修改配置文件 cp zoo_sample.cfg zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/usr/local/zookeeper/tmp/data/zookeeper dataLogDir=/usr/local/zookeeper/tmp/log/zookeeper # the port at which the clients will connect clientPort=2181 autopurge.purgeInterval=1 server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888 # 注:server.1中的1为服务器id,需要与myid中的id一致 # 每个节点重复以上步骤 5. 设置服务器id touch /usr/local/zookeeper/tmp/data/zookeeper/myid echo 1 > /usr/local/zookeeper/tmp/data/zookeeper/myid # node2 2 , node3中echo 3 6. 启动服务器 zkServer.sh start 7. 连接客户端 zkCli.sh -server node1:2181四、安装Flink
1. 下载解压
tar -xvf flink-1.13.2-bin-scala_2.11.tgz -C /usr/local/
mv /usr/local/flink-1.13.2 /usr/local/flink
2. 配置环境变量
export HADOOP_CLASSPATH=`/usr/local/hadoop/bin/hadoop classpath`
export Flink_HOME=/usr/local/flink
3. 编辑配置文件 vi flink-conf.yaml # JobManager内存主要分为四部分:JVM Heap、Off-Heap Memory、JVM metaspace、JVM Overhead # JobManager总内存设置为2048m,则JVM Overhead可根据0.1的fraction换算得到204.8m,即JVM Overhead内存为205m # JVM metaspace默认为256m # Off-Heap Memory默认为128m # JVM Heap最终被推断为2048m-205m-256m - 128m = 1459m,即1.42g # 但gc算法会占用一小部分固定内存作为Non-Heap,占用大小为0.05g # JVM Heap实际大小为1.42g - 0.05g = 1.38g jobmanager.rpc.address: node1 jobmanager.rpc.port: 6123 #JobManager jvm堆大小,主要取决于运行的作业数量、作业结构及用户代码的要求 jobmanager.heap.size: 1024m #进程总内存 jobmanager.memory.process.size: 2048m taskmanager.memory.process.size: 4096m #每个TaskManager提供的任务Slots数量,建议与cpu核数一致 taskmanager.numberOfTaskSlots: 4 parallelism.default: 1 env.hadoop.conf.dir: /usr/local/hadoop/etc/hadoop high-availability: zookeeper # flink在重启时,尝试的最大次数 yarn.application-attempts: 10 high-availability.storageDir: hdfs://ns/flink/recovery high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181 high-availability.zookeeper.path.root: /flink #用于存储和检查点状态 state.backend: filesystem state.checkpoints.dir: hdfs://ns/flink/checkpoints state.savepoints.dir: hdfs://ns/flink/savepoints #故障转移策略 jobmanager.execution.failover-strategy: region rest.port: 8081 #是否启动web提交 web.submit.enable: true io.tmp.dirs: /usr/local/flink/data/tmp env.log.dir: /usr/local/flink/data/logs taskmanager.memory.network.fraction: 0.1 taskmanager.memory.network.min: 64mb taskmanager.memory.network.max: 1gb fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop historyserver.web.address: 0.0.0.0 historyserver.web.port: 8082 historyserver.archive.fs.refresh-interval: 10000
4. 修改masters node1:8081 node2:8081 5. 修改workers node1 node2 node3 6. 修改zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/flink/data/tmp/zookeeper/dataDir dataLogDir=/usr/local/flink/data/tmp/zookeeper/dataLogDir clientPort=2181 server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888 7. 添加jar包 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 8. 启动flink yarn session模式 yarn-session.sh



