flink官网提供两种在k8s的部署模式
分别为 standalone模式 和 native模式
两种模式又分别分session模式和Application模式。
standalone的session模式和standalone的Application模式,以及 native的session模式和native的Application模式。
考虑到频繁更改业务代码带来的影响,首先考虑使用session模式。
native模式相对于Standalone模式更有优势,但目前尚处于实验阶段,稳定性不及standalone模式。
因此,生产环节优先考虑使用standalone的session模式。
- 预先安装部署好k8s 集群,且k8s version >= 1.9
- 创建flink集群的当前服务器需要部署有k8s客户端 kubectl
- 当前flink版本 1.14.0
kubectl create ns flink-standalone编辑编排文件
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
namespace: flink-standalone
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
** jobmanager-deployment.yaml**
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: flink-standalone
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.14.0
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$Flink_HOME/bin/jobmanager.sh start;
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: flink-standalone
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.14.0
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$Flink_HOME/bin/taskmanager.sh start;
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
namespace: flink-standalone
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
创建 Flink 集群
通过 kubectl create -f 命令创建
kubectl create -f flink-configuration-configmap.yaml kubectl create -f jobmanager-service.yaml kubectl create -f jobmanager-deployment.yaml kubectl create -f taskmanager-deployment.yaml
或者在k8s dashboard 创建
创建完成flink集群jobmanager和taskmanager都已经预先创建完成。
端口转发kubectl -n flink-standalone port-forward --address 0.0.0.0 service/flink-jobmanager 8081:8081运行flink job
打开flink web UI 通过 【Submit New Job】去提交job,taskmanager已经预先创建完成,超过预先创建taskmanager slot的job提交失败。
原生模式(native mode) 原生session模式部署 创建namespace、用户和赋权(RBAC)# create namespace kubectl create ns flink-native # 设置命名空间首选项 kubectl config set-context --current --namespace=flink-native # create serviceaccount kubectl create serviceaccount flink # 赋权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink启动flink集群
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=test1 -Dkubernetes.namespace=flink-native -Dkubernetes.service-account=flink -Dkubernetes.rest-service.exposed.type=ClusterIP -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4 -Dresourcemanager.taskmanager-timeout=3600000 -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
创建完成flink集群只有jobmanager,只有在提交任务的时候才去创建taskmanager。
k8s配置项参考。
kubectl logs deployment/test1()
端口转发# 将本机默认路由上的8082端口转发到service test1-rest 中的8081端口上 kubectl -n flink-native port-forward --address 0.0.0.0 service/test1-rest 8082:8081
接下来可以通过 http://ip:8082/ 访问flink web UI
运行flink job打开flink web UI 通过 【Submit New Job】去提交job,flink集群会按需去k8s集群申请pod,并在每个pod中启动一个taskmanager。
有一点不太好的是:当前flink版本(1.14.0)还不能自动释放空闲的taskmanager
参考:
Flink On Standalone Kubernetes Reference
Flink On Native Kubernetes Reference
Flink On K8S终极实现方案
Flink on Kubernetes 部署模式



