1. HDFS部署 (k8s)持续保持更新
有些部署yaml借鉴的文章忘记哪里搬过来了额,但是经过自己测试过可行
再次表谢感谢
可以单独create -f 部署,也可以基于helm统一部署
1.1 hdfs-conf.yamlapiVersion: v1 kind: ConfigMap metadata: namespace: dev-flink-plat name: kube-hadoop-conf data: HDFS_MASTER_SERVICE: hadoop-hdfs-master HDOOP_YARN_MASTER: hadoop-yarn-master1.2 hdfs-service.yaml
apiVersion: v1
kind: Service
metadata:
namespace: dev-flink-plat
name: hadoop-hdfs-master
spec:
type: NodePort
selector:
app: hdfs-master
ports:
- name: rpc
port: 9000
targetPort: 9000
- name: http
port: 50070
targetPort: 50070
nodePort: 32007
1.3 hdfs-namenode.yaml
没有配置secondary-namenode,而是通过ReplicationController的replicas来保证namenode的副本数
apiVersion: v1
kind: ReplicationController
metadata:
namespace: dev-flink-plat
name: hdfs-master
labels:
app: hdfs-master
spec:
replicas: 1
selector:
name: hdfs-master
template:
metadata:
labels:
name: hdfs-master
spec:
containers:
- name: hdfs-master
image: kubeguide/hadoop:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9000
- containerPort: 50070
env:
- name: HADOOP_NODE_TYPE
value: namenode
- name: HDFS_MASTER_SERVICE
valueFrom:
configMapKeyRef:
name: kube-hadoop-conf
key: HDFS_MASTER_SERVICE
- name: HDOOP_YARN_MASTER
valueFrom:
configMapKeyRef:
name: kube-hadoop-conf
key: HDOOP_YARN_MASTER
restartPolicy: Always
1.4 hdfs-datanode.yaml
apiVersion: v1
kind: Pod
metadata:
namespace: dev-flink-plat
name: hadoop-datanode
labels:
app: hadoop-datanode
spec:
containers:
- name: hadoop-datanode
image: kubeguide/hadoop:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9000
- containerPort: 50070
env:
- name: HADOOP_NODE_TYPE
value: datanode
- name: HDFS_MASTER_SERVICE
valueFrom:
configMapKeyRef:
name: kube-hadoop-conf
key: HDFS_MASTER_SERVICE
- name: HDOOP_YARN_MASTER
valueFrom:
configMapKeyRef:
name: kube-hadoop-conf
key: HDOOP_YARN_MASTER
restartPolicy: Always
2. Flink部署 (k8s)
Flink部署方式大概分为3种
flink standaloneflink on yarnflink on k8s
我们采用的是flink on k8s的部署方式
2.1 flink on k8s 部署方案Flink 选择 Kubernetes 的主要原因是结合 Flink 和 Kubernetes 的长稳性。
① Flink 特性:提供的实时服务是需要长时间、稳定地运行,常应用于电信网络质量监控、实时风控、实时推荐等稳定性要求较高的场景;
② Kubernetes 优势:为应用提供了部署、管理能力,同时保证其稳定运行。Kubernetes 具有很好的生态,可以集成各种运维工具,例如 prometheus、主流日志采集工具等。Kubernetes 具有很好的扩缩容机制,可以大大提高资源利用率。
预先构建 Flink 集群,且该集群长期处于运行状态,但不能自动扩缩容。用户通过 client 提交作业到运行中的 JobManager,而 JobManager 将任务分配到运行中的 TaskManager
| 优点 | 缺点 |
|---|---|
| Flink 集群是预先启动运行的。用户提交作业的时候,作业可以立即分配到 TaskManager,即作业启动速度快 | 资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反正 TaskManager 资源不足 |
| 作业隔离性差,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启 | |
每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。
注意:
① Flink 镜像需要包含作业即Application 依赖的 Class
② 启动作业的时候需要指定 Main 函数入口类
| 优点 | 缺点 |
|---|---|
| 一个作业独占一个集群,作业的隔离性好 | 资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之 TaskManager 资源不足。同时,JobManager 不能复用 |
类似 Session 模式,需要预先构建 JobManager。不同点是用户通过 Flink Client 向 JobManager 提交作业后,根据作业需要的 Slot 数量,JobManager 直接向 Kubernetes 申请 TaskManager 资源,最后把作业提交到 TaskManager 上。
| 优点 | 缺点 |
|---|---|
| TaskManager 的资源是实时的、按需进行的创建,对资源的利用率更高 | 作业真正运行起来的时间较长,因为需要等待 TaskManager 创建 |
类似 Application 模式,每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。不同点是 Native 特性,即 Flink 直接与 Kubernetes 进行通信并按需申请资源,无需用户指定 TaskManager 资源的数量。
| 优点 | 缺点 |
|---|---|
| 一个作业独占一个集群,作业的隔离性好 | 一个作业独占一个集群,JobManager 不能复用 |
| 资源利用率相对较高,按需申请 JobManager 和 TaskManager | 作业启动较慢,在作业提交后,才开始创建 JobManager 和 TaskManager |
| 模式 | 隔离性 | 作业启动时间 | 资源利用率 | 资源按需创建 |
|---|---|---|---|---|
| Session | 弱,作业共享集群 | 较短,立即启动 | 较低,集群长期存在 | 否 |
| Application | 强,作业独享集群 | 最长,等待集群创建完成 | 一般,作业结束后释放资源 | 否 |
| Native Session | 弱,作业共享集群 | 一般,等待 TaskManager 创建 | 较低,TaskManager 按需申请 | 是 |
| Native Application | 强,作业独占集群 | 一般, 等待集群创建完成 | 最好,集群按需创建 | 是 |
Session 模式和 Application 模式集群都支持使用 Kubernetes 高可用服务。需要在 flink-configuration-configmap.yaml 中添加如下 Flink 配置项。
Note 配置了 HA 存储目录相对应的文件系统必须在运行时可用。请参阅自定义Flink 镜像和启用文件系统插件获取更多相关信息。
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
...
kubernetes.cluster-id:
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
...
此外,你必须使用具有创建、编辑、删除 ConfigMap 权限的 service 账号启动 JobManager 和 TaskManager pod。请查看如何为 pod 配置 service 账号获取更多信息。
当启用了高可用,Flink 会使用自己的 HA 服务进行服务发现。因此,JobManager Pod 会使用 IP 地址而不是 Kubernetes 的 service 名称来作为 jobmanager.rpc.address 的配置项启动。
2.2 flink-conf.yamlapiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: fat-bigdata-cluster
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 50
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1524m
taskmanager.memory.process.size: 4096m
execution.target: kubernetes-session
state.backend: filesystem
state.checkpoints.dir: hdfs://192.168.5.131:25305/flink/cp
state.savepoints.dir: hdfs://192.168.5.131:25305/flink/sp
state.backend.incremental: true
kubernetes.cluster-id: fat-bigdata-cluster-k8s-id
classloader.resolve-order: parent-first
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs://192.168.5.131:25305/flink/recovery
#restart-strategy: fixed-delay
#restart-strategy.fixed-delay.attempts: 10
#high-availability.jobmanager.port: 34560
#metrics.internal.query-service.port: 34561
kubernetes.namespace: fat-bigdata-cluster
kubernetes.service-account: flink-bigdata-cluster
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
2.3 jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
namespace: fat-bigdata-cluster
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
2.4 jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
namespace: dev-flink-plat
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager
2.5 jobmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: fat-bigdata-cluster
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.13.2
env:
- name: Flink_PROPERTIES
value: 'jobmanager.rpc.address: flink-jobmanager'
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$Flink_HOME/bin/jobmanager.sh start;
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
# - name: flink-config-volume
# mountPath: /opt/flink/conf/
- name: flink-lib-volume
mountPath: /opt/flink/lib/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
# - name: flink-config-volume
# configMap:
# name: flink-config
# items:
# - key: flink-conf.yaml
# path: flink-conf.yaml
# - key: log4j.properties
# path: log4j.properties
- name: flink-lib-volume
hostPath:
path: /home/sll/lib/
type: Directory
2.6 taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: fat-bigdata-cluster
name: flink-taskmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.13.2
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$Flink_HOME/bin/taskmanager.sh start;
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: flink-lib-volume
mountPath: /opt/flink/lib/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
- name: flink-lib-volume
hostPath:
path: /home/sll/lib
type: Directory
2.7 serviceaccount.yaml
apiVersion: v1 kind: ServiceAccount metadata: name: fat-bigdata-cluster namespace: fat-bigdata-cluster automountServiceAccountToken: false2.8 configmaps-cluster-role.yaml
kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: namespace: fat-bigdata-cluster name: configmaps-reader rules: - apiGroups: [""] resources: ["configmaps"] verbs: ["update","create","get", "watch", "list"]2.9 绑定clusterRole与serviceaccount
k create clusterrolebinding flink-reader-binding --clusterrole=configmaps-reader --serviceaccount=fat-bigdata-cluster:default3. Flink版本更新
flink on k8s部署的方式是通过hostPath将额外的jar包打入到flink/lib里,所以在版本更新后需要同样更新相同版本的依赖包(例如connector等)
flink 基础镜像通过将yaml里的image进行升级即可
如果有找到更好的在线升级版本的方式将继续补充文档
3.1 更新记录| 版本 | 概要 (细节请移步官方文档) |
|---|---|
| Flink 1.7 版本 | Flink中的Scala 2.12支持 |
| 状态变化 | |
| Exactly-once语义的S3 StreamingFileSink | |
| Streaming SQL中支持MATCH_RECOGNIZE | |
| Streaming SQL中的 Temporal Tables 和 Temporal Joins | |
| 版本化REST API | |
| Kafka 2.0 Connector | |
| 本地恢复 | |
| 删除Flink的传统模式 | |
| Flink 1.8 版本 | 使用TTL(生存时间)连续增量清除旧的Key状态 |
| 恢复保存点时对模式迁移的新支持 | |
| 保存点兼容性 | |
| RocksDB版本冲突并切换到FRocksDB(Flink-10471) | |
| Maven 依赖 | |
| TaskManager配置(Flink-11716) | |
| Table API 的变动 | |
| 连接器变动 | |
| Flink 1.9 版本 | 细粒度批作业恢复 (FLIP-1) |
| State Processor API (FLIP-43) | |
| Stop-with-Savepoint (FLIP-34) | |
| 重构 Flink WebUI | |
| 新Blink SQL 查询处理器预览 | |
| Table API / SQL 的其他改进 | |
| Flink 1.10 版本 [重要版本 : Blink 整合完成] | 内存管理及配置优化 |
| 统一的作业提交逻辑 | |
| 原生 Kubernetes 集成(Beta) | |
| Table API/SQL: 生产可用的 Hive 集成 | |
| 其他 Table API/SQL 优化 | |
| PyFlink: 支持原生用户自定义函数(UDF) | |
| 重要变更 Flink-10725[34] | |
| Flink 1.11 版本 [重要版本] | Table & SQL 支持 Change Data Capture(CDC) |
| Table & SQL 支持 JDBC Catalog | |
| Hive 实时数仓 | |
| 全新 Source API | |
| PyFlink 生态 | |
| 生产可用性和稳定性提升 | |
| Flink 1.12 版本 [重要版本] | DataStream API 支持批执行模式 |
| 新的 Data Sink API (Beta) | |
| 基于 Kubernetes 的高可用 (HA) 方案 | |
| 其它功能改进 | |
| Table API/SQL 变更 | |
| PyFlink: Python DataStream API | |
| PyFlink 中的其它改进 | |
| Flink 1.13 版本 | |
基于helm进行部署
4.1 flink-web-configmap.yamlapiVersion: v1 kind: ConfigMap metadata: name: flink-web-config data: flink-web.conf: | spring.datasource.url=jdbc:mysql://192.168.5.131:6033/flink_web?characterEncoding=UTF-8&useSSL=false spring.datasource.username=cnte spring.datasource.password=Cnte@198084.2 flink-web-service.yaml
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: kompose convert -f docker-compose.yml -o cnte-iot -c
kompose.version: 1.22.0 (955b78124)
creationTimestamp: null
labels:
io.kompose.service: flink-web
name: flink-web
spec:
type: NodePort
ports:
- name: "8081"
port: 8081
nodePort: 8081
targetPort: 8081
- name: "9084"
port: 9084
nodePort: 9084
targetPort: 9084
selector:
io.kompose.service: flink-web
status:
loadBalancer: {}
4.3 flink-web-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
namespace: dev-bigdata
name: flink-web
spec:
replicas: 1
selector:
matchLabels:
app: flink-web
strategy: {}
template:
metadata:
labels:
app: flink-web
spec:
containers:
- env:
- name: TZ
value: Asia/Shanghai
image: registry.cn-hangzhou.aliyuncs.com/flink-streaming-platform-web/flink-web:flink-1.13.2
name: flink-web
ports:
- containerPort: 8081
- containerPort: 9084
volumeMounts:
- name: application-properties
mountPath: /data/projects/flink-streaming-platform-web/conf
- name: flink-client
mountPath: /data/projects/flink-streaming-platform-web/flink
restartPolicy: Always
volumes:
- name: application-properties
configMap:
name: flink-web-config
items:
- key: flink-web.conf
path: application-docker.properties
- name: flink-client
hostPath:
path: /home/sll/flink-1.12.2
type: Directory
5. ES索引生命周期管理
5.1 elasticsearch部署
可配置2个master节点可配置2个热数据/温数据/冷数据节点可按需求配置其他类型节点
---
apiVersion: v1
kind: Namespace
metadata:
name: es-cluster
labels:
name: ns-elasticsearch
---
apiVersion: v1
kind: ServiceAccount
metadata:
labels:
elastic-app: elasticsearch
name: elasticsearch-admin
namespace: es-cluster
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: elasticsearch-admin
labels:
elastic-app: elasticsearch
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: elasticsearch-admin
namespace: es-cluster
---
kind: Deployment
apiVersion: apps/v1beta2
metadata:
labels:
elastic-app: elasticsearch
role: master
name: elasticsearch-master
namespace: es-cluster
spec:
replicas: 2
revisionHistoryLimit: 10
selector:
matchLabels:
elastic-app: elasticsearch
role: master
template:
metadata:
labels:
elastic-app: elasticsearch
role: master
spec:
containers:
- name: elasticsearch-master
image: elasticsearch:7.12.0
lifecycle:
postStart:
exec:
command: ["/bin/bash", "-c", "sysctl -w vm.max_map_count=262144; ulimit -l unlimited;"]
ports:
- containerPort: 9200
protocol: TCP
- containerPort: 9300
protocol: TCP
env:
- name: "cluster.name"
value: "elasticsearch-cluster"
- name: "bootstrap.memory_lock"
value: "true"
- name: "discovery.zen.ping.unicast.hosts"
value: "elasticsearch-discovery"
- name: "discovery.zen.minimum_master_nodes"
value: "2"
- name: "discovery.zen.ping_timeout"
value: "5s"
- name: "node.master"
value: "true"
- name: "node.data"
value: "false"
- name: "node.ingest"
value: "false"
- name: "ES_JAVA_OPTS"
value: "-Xms256m -Xmx256m"
securityContext:
privileged: true
serviceAccountName: elasticsearch-admin
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
---
kind: Service
apiVersion: v1
metadata:
labels:
elastic-app: elasticsearch
name: elasticsearch-discovery
namespace: es-cluster
spec:
ports:
- port: 9300
targetPort: 9300
selector:
elastic-app: elasticsearch
role: master
---
kind: Deployment
apiVersion: apps/v1beta2
metadata:
labels:
elastic-app: elasticsearch
role: data
name: elasticsearch-data-hot
namespace: es-cluster
spec:
replicas: 2
revisionHistoryLimit: 10
selector:
matchLabels:
elastic-app: elasticsearch
template:
metadata:
labels:
elastic-app: elasticsearch
role: data
spec:
containers:
- name: elasticsearch-data
image: elasticsearch:7.12.0
lifecycle:
postStart:
exec:
command: ["/bin/bash", "-c", "sysctl -w vm.max_map_count=262144; ulimit -l unlimited;"]
ports:
- containerPort: 9200
protocol: TCP
- containerPort: 9300
protocol: TCP
volumeMounts:
- name: esdata
mountPath: /usr/share/elasticsearch/data
env:
- name: "cluster.name"
value: "elasticsearch-cluster"
- name: "bootstrap.memory_lock"
value: "true"
- name: "discovery.zen.ping.unicast.hosts"
value: "elasticsearch-discovery"
- name: "node.master"
value: "false"
- name: "node.data"
value: "true"
- name: "ES_JAVA_OPTS"
value: "-Xms256m -Xmx256m"
- name: "node.attr.box_type"
value: "hot"
securityContext:
privileged: true
volumes:
- name: esdata
emptyDir: {}
serviceAccountName: elasticsearch-admin
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
---
kind: Deployment
apiVersion: apps/v1beta2
metadata:
labels:
elastic-app: elasticsearch
role: data
name: elasticsearch-data-cold
namespace: es-cluster
spec:
replicas: 2
revisionHistoryLimit: 10
selector:
matchLabels:
elastic-app: elasticsearch
template:
metadata:
labels:
elastic-app: elasticsearch
role: data
spec:
containers:
- name: elasticsearch-data
image: elasticsearch:7.12.0
lifecycle:
postStart:
exec:
command: ["/bin/bash", "-c", "sysctl -w vm.max_map_count=262144; ulimit -l unlimited;"]
ports:
- containerPort: 9200
protocol: TCP
- containerPort: 9300
protocol: TCP
volumeMounts:
- name: esdata
mountPath: /usr/share/elasticsearch/data
env:
- name: "cluster.name"
value: "elasticsearch-cluster"
- name: "bootstrap.memory_lock"
value: "true"
- name: "discovery.zen.ping.unicast.hosts"
value: "elasticsearch-discovery"
- name: "node.master"
value: "false"
- name: "node.data"
value: "true"
- name: "ES_JAVA_OPTS"
value: "-Xms256m -Xmx256m"
- name: "node.attr.box_type"
value: "cold"
securityContext:
privileged: true
volumes:
- name: esdata
emptyDir: {}
serviceAccountName: elasticsearch-admin
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
---
kind: Service
apiVersion: v1
metadata:
labels:
elastic-app: elasticsearch-service
name: elasticsearch-service
namespace: es-cluster
spec:
ports:
- port: 9200
targetPort: 9200
nodePort: 32210
selector:
elastic-app: elasticsearch
type: NodePort
5.1 设置动态索引
'index' = 'index-{log_ts|yyyy-MM-dd}'
5.2 设置索引为冷数据
PUT test_index/_settings
{
"settings": {
"index.routing.allocation.require.box_type": "cool"
}
}
//可用脚本实现几天的数据加入到冷数据节点
5.3 创建查询模板
PUT _template/logs_template
{
"index_patterns": "logs-*",
"order": 1,
"settings": {
"number_of_shards": 4,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
}
}
}
}



