栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

[Scala/Flink] 基于Docker搭建Flink+Kafka+Redis流数据处理系统

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

[Scala/Flink] 基于Docker搭建Flink+Kafka+Redis流数据处理系统

1. 环境搭建 1.1 安装 docker 和 docker-compose

在 centos7 上安装 docker 和 docker-compose, 可以参考这篇文章

1.2 设置 docker 非 root 用户启动

使用 root 用户启动 docker 有安全隐患, 所以最好设置 docker 非 root 用户启动, 设置方法是将普通用户加入 docker 组, 具体操作如下

// 查看 docker 组是否存在, 没有的话使用 sudo groupadd docker 添加
sudo cat /etc/group |grep docker
// 将 tmp 用户加入 docker 组
sudo gpasswd -a tmp docker
// 刷新用户组
newgrp docker
1.3 配置相关容器

需要启动的容器包括:

  • Flink 的 jobmanager: 负责提交及查看任务
  • Flink 的 taskmanager: 负责执行任务
  • kafka: 负责数据的缓存和分发
  • zookeeper: 负责集群调度
  • almond: 负责启动包含 Scala 内核的 Jupyter
  • redis: 存储数据

docker-compose.yml 内容如下:

version: '3.8'

services:
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: xxx.xx.xx.xx
      KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  jobmanager:
    container_name: jobmanager
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager 

  taskmanager:
    container_name: taskmanager
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2     

  almond:
    container_name: almond
    image: almondsh/almond:latest
    ports:
      - "8888:8888"

  redis:
    container_name: redis
    image: bitnami/redis:latest
    environment:
      # ALLOW_EMPTY_PASSWORD is recommended only for development.
      - ALLOW_EMPTY_PASSWORD=yes
      - REDIS_DISABLE_COMMANDS=FLUSHDB,FLUSHALL
    ports:
      - '6379:6379'
    volumes:
      - '/data/docker/redis:/bitnami/redis/data'

注:

  1. kafka 的 KAFKA_ADVERTISED_HOST_NAME 需要设置为服务运行机器的 IP
  2. kafka 的参数 KAFKA_CREATE_TOPICS 用来设置 Topic, 根据上面的设置, Topic 1将有 1 个分区和 3 个副本,Topic 2将有 1 个分区、1 个副本
  3. jobmanager 对外暴露 8081 端口, 通过该端口可以打开 Flink 任务管理页面, 如果 8081 已被占用, 将 ports 的值改为 - "xxxx:8081", 其中 xxxx 为未被占用的端口
  4. almond 对外暴露 8888 端口, 通过该端口可以打开包含 Scala 内核的 Jupyter 页面
1.4 启动及关闭容器

首先进入包含 docker-compose.yml 文件的目录, 然后执行下面的命令

// 启动
docker-compose up -d
// 销毁集群
docker-compose stop
// 添加经纪人
docker-compose scale kafka=3
2. Flink 读写 Kafka (使用 Scala) 2.1 Flink 读取 Kafka
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.api.common.serialization.SimpleStringSchema
import java.util.Properties

def readKafka(
               env: StreamExecutionEnvironment,
               topic: String,
               groupId: String,
               offset: String = "latest"
            )= {
   val props = new Properties()
   props.setProperty("bootstrap.servers", "xxx.xxx.xx.xx:9092")
   props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
   props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
   props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
   props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

   props.setProperty("group.id", groupId)
   props.setProperty("auto.offset.reset", offset)

   env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), props))
}
2.2 Flink 写入 Kafka

封装生产者类

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, Callback, RecordMetadata}
import java.util.Properties

class MyKafkaProducer(val topic: String) {
   val props = new Properties()
   props.setProperty("bootstrap.servers", "xxx.xxx.xx.xx:9092")
   props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
   props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
   props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
   props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

   private val producer = new KafkaProducer[String, String](props)

   def send(data: String): Unit = {
     val record = new ProducerRecord[String, String](topic, data)

     producer.send(record, new ProducerCallBack)
     producer.flush()
   }

   class ProducerCallBack extends Callback {
     def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
       if (exception != null) exception.printStackTrace()
     }
   }
}

使用生产者类

val producer = MyKafkaProducer("Test")
producer.send("data")
3. Flink 写入 Redis
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

def getRedisSink(dbSeq: Int, additionalKey: String): RedisSink[(String, String)] = {
  val host = loadConf("host")
  val builder = new FlinkJedisPoolConfig.Builder().setHost(host).setPort(6379).setDatabase(dbSeq).build()
  new RedisSink[(String, String)](builder, new MyMapper(additionalKey))
}

//写到redis里面的数据类型
class MyMapper(additionalKey: String) extends RedisMapper[(String, String)]{
  override def getCommandDescription: RedisCommandDescription = {
    return new RedisCommandDescription(RedisCommand.HSET, additionalKey)
  }
  // 指定key
  override def getKeyFromData(data: (String, String)): String = data._1
  // 指定value
  override def getValueFromData(data: (String, String)):String = data._2
}
// 写入流 stream
env = StreamExecutionEnvironment.getExecutionEnvironment
val redisSink = getRedisSink(dbSeq = 0, additionalKey = "Test")
stream.addSink(redisSink)
env.execute()
错误处理 1. Could not start cluster entrypoint StandaloneSessionClusterEntrypoint

下载依赖 jar 包, 并放入 ./lib 目录下, 详情参阅 官方文档

2. Could not create the DispatcherResourceManagerComponent

端口被占用, 在 ./conf/flink-conf.yaml 中将端口由8081改为一个未被占用的端口

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/865676.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号