栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink on Kubernetes 部署模式

Flink on Kubernetes 部署模式

flink on k8s 部署模式

flink官网提供两种在k8s的部署模式
分别为 standalone模式 和 native模式
两种模式又分别分session模式和Application模式。
standalone的session模式和standalone的Application模式,以及 native的session模式和native的Application模式。
考虑到频繁更改业务代码带来的影响,首先考虑使用session模式。
native模式相对于Standalone模式更有优势,但目前尚处于实验阶段,稳定性不及standalone模式。
因此,生产环节优先考虑使用standalone的session模式。

环境准备
  • 预先安装部署好k8s 集群,且k8s version >= 1.9
  • 创建flink集群的当前服务器需要部署有k8s客户端 kubectl
  • 当前flink版本 1.14.0
standalone模式 standalone session模式部署 创建namespace
kubectl create ns flink-standalone
编辑编排文件

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  namespace: flink-standalone
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.memory.process.size: 1024m
  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

** jobmanager-deployment.yaml**

apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: flink-standalone
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.14.0
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$Flink_HOME/bin/jobmanager.sh start;
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

taskmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: flink-standalone
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.14.0
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$Flink_HOME/bin/taskmanager.sh start; 
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  namespace: flink-standalone
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager
创建 Flink 集群

通过 kubectl create -f 命令创建

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-deployment.yaml
kubectl create -f taskmanager-deployment.yaml

或者在k8s dashboard 创建

创建完成flink集群jobmanager和taskmanager都已经预先创建完成。

端口转发
kubectl -n flink-standalone port-forward --address 0.0.0.0 service/flink-jobmanager 8081:8081
运行flink job

打开flink web UI 通过 【Submit New Job】去提交job,taskmanager已经预先创建完成,超过预先创建taskmanager slot的job提交失败。

原生模式(native mode) 原生session模式部署 创建namespace、用户和赋权(RBAC)
# create namespace
kubectl create ns flink-native
# 设置命名空间首选项
kubectl config set-context --current --namespace=flink-native
# create serviceaccount 
kubectl create serviceaccount flink
# 赋权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink
启动flink集群
./bin/kubernetes-session.sh 
  -Dkubernetes.cluster-id=test1 
  -Dkubernetes.namespace=flink-native
  -Dkubernetes.service-account=flink 
  -Dkubernetes.rest-service.exposed.type=ClusterIP 
  -Dtaskmanager.memory.process.size=4096m 
  -Dkubernetes.taskmanager.cpu=2 
  -Dtaskmanager.numberOfTaskSlots=4 
  -Dresourcemanager.taskmanager-timeout=3600000 
  -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"

创建完成flink集群只有jobmanager,只有在提交任务的时候才去创建taskmanager。
k8s配置项参考。

查看日志

kubectl logs deployment/test1()

端口转发
# 将本机默认路由上的8082端口转发到service test1-rest 中的8081端口上
kubectl -n flink-native port-forward --address 0.0.0.0 service/test1-rest 8082:8081

接下来可以通过 http://ip:8082/ 访问flink web UI

运行flink job

打开flink web UI 通过 【Submit New Job】去提交job,flink集群会按需去k8s集群申请pod,并在每个pod中启动一个taskmanager。
有一点不太好的是:当前flink版本(1.14.0)还不能自动释放空闲的taskmanager

参考:
Flink On Standalone Kubernetes Reference
Flink On Native Kubernetes Reference
Flink On K8S终极实现方案
Flink on Kubernetes 部署模式

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/422396.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号