在 centos7 上安装 docker 和 docker-compose, 可以参考这篇文章
1.2 设置 docker 非 root 用户启动使用 root 用户启动 docker 有安全隐患, 所以最好设置 docker 非 root 用户启动, 设置方法是将普通用户加入 docker 组, 具体操作如下
// 查看 docker 组是否存在, 没有的话使用 sudo groupadd docker 添加 sudo cat /etc/group |grep docker // 将 tmp 用户加入 docker 组 sudo gpasswd -a tmp docker // 刷新用户组 newgrp docker1.3 配置相关容器
需要启动的容器包括:
- Flink 的 jobmanager: 负责提交及查看任务
- Flink 的 taskmanager: 负责执行任务
- kafka: 负责数据的缓存和分发
- zookeeper: 负责集群调度
- almond: 负责启动包含 Scala 内核的 Jupyter
- redis: 存储数据
docker-compose.yml 内容如下:
version: '3.8'
services:
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: xxx.xx.xx.xx
KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
jobmanager:
container_name: jobmanager
image: flink:latest
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
container_name: taskmanager
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
almond:
container_name: almond
image: almondsh/almond:latest
ports:
- "8888:8888"
redis:
container_name: redis
image: bitnami/redis:latest
environment:
# ALLOW_EMPTY_PASSWORD is recommended only for development.
- ALLOW_EMPTY_PASSWORD=yes
- REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL
ports:
- '6379:6379'
volumes:
- '/data/docker/redis:/bitnami/redis/data'
注:
- kafka 的 KAFKA_ADVERTISED_HOST_NAME 需要设置为服务运行机器的 IP
- kafka 的参数 KAFKA_CREATE_TOPICS 用来设置 Topic, 根据上面的设置, Topic 1将有 1 个分区和 3 个副本,Topic 2将有 1 个分区、1 个副本
- jobmanager 对外暴露 8081 端口, 通过该端口可以打开 Flink 任务管理页面, 如果 8081 已被占用, 将 ports 的值改为 - "xxxx:8081", 其中 xxxx 为未被占用的端口
- almond 对外暴露 8888 端口, 通过该端口可以打开包含 Scala 内核的 Jupyter 页面
首先进入包含 docker-compose.yml 文件的目录, 然后执行下面的命令
// 启动 docker-compose up -d // 销毁集群 docker-compose stop // 添加经纪人 docker-compose scale kafka=32. Flink 读写 Kafka (使用 Scala) 2.1 Flink 读取 Kafka
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.api.common.serialization.SimpleStringSchema
import java.util.Properties
def readKafka(
env: StreamExecutionEnvironment,
topic: String,
groupId: String,
offset: String = "latest"
)= {
val props = new Properties()
props.setProperty("bootstrap.servers", "xxx.xxx.xx.xx:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("group.id", groupId)
props.setProperty("auto.offset.reset", offset)
env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), props))
}
2.2 Flink 写入 Kafka
封装生产者类
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, Callback, RecordMetadata}
import java.util.Properties
class MyKafkaProducer(val topic: String) {
val props = new Properties()
props.setProperty("bootstrap.servers", "xxx.xxx.xx.xx:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
private val producer = new KafkaProducer[String, String](props)
def send(data: String): Unit = {
val record = new ProducerRecord[String, String](topic, data)
producer.send(record, new ProducerCallBack)
producer.flush()
}
class ProducerCallBack extends Callback {
def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) exception.printStackTrace()
}
}
}
使用生产者类
val producer = MyKafkaProducer("Test")
producer.send("data")
3. Flink 写入 Redis
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
def getRedisSink(dbSeq: Int, additionalKey: String): RedisSink[(String, String)] = {
val host = loadConf("host")
val builder = new FlinkJedisPoolConfig.Builder().setHost(host).setPort(6379).setDatabase(dbSeq).build()
new RedisSink[(String, String)](builder, new MyMapper(additionalKey))
}
//写到redis里面的数据类型
class MyMapper(additionalKey: String) extends RedisMapper[(String, String)]{
override def getCommandDescription: RedisCommandDescription = {
return new RedisCommandDescription(RedisCommand.HSET, additionalKey)
}
// 指定key
override def getKeyFromData(data: (String, String)): String = data._1
// 指定value
override def getValueFromData(data: (String, String)):String = data._2
}
// 写入流 stream
env = StreamExecutionEnvironment.getExecutionEnvironment
val redisSink = getRedisSink(dbSeq = 0, additionalKey = "Test")
stream.addSink(redisSink)
env.execute()
错误处理
1. Could not start cluster entrypoint StandaloneSessionClusterEntrypoint
下载依赖 jar 包, 并放入 ./lib 目录下, 详情参阅 官方文档
2. Could not create the DispatcherResourceManagerComponent端口被占用, 在 ./conf/flink-conf.yaml 中将端口由8081改为一个未被占用的端口


![[Scala/Flink] 基于Docker搭建Flink+Kafka+Redis流数据处理系统 [Scala/Flink] 基于Docker搭建Flink+Kafka+Redis流数据处理系统](http://www.mshxw.com/aiimages/31/865676.png)
