栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka压测

kafka压测

实时ETL流程测试
文档编号版本号V1.0
名称实时ETL流程测试文档
总页数正文
编写日期审批
目录

1. 测试目的

本次测试主要对基于Flink的实时ETL系统各个环节跑通

测试各个组件功能可用性测试数据一致性、实时性、完整性 2. 测试方法

采用python脚本模拟生成数据,通过Kafka作为消息队列,Flink完成实时抽取转换,输出数据到HDFS

测试过程主要分为以下3阶段:

    模拟实时生产数据
      程序目录:scdh03 /tmp/pycharm_myrs主要逻辑:每秒发送十条模拟数据到kafka
    获取数据并转换输出
      Flink集群管理平台测试项目地址:git:my-sc-slaughter-flink-test
        程序入口:org.myrs.consumer.StreamingJob
      主要逻辑:实时消费kafka数据,对数据进行转换处理,存入HDFS
    数据存储
      HDFS存储路径:/data/ods/test/存储规则:按小时划分目录,已log文件形式存储查看方法:scdh01 hadoop fs -ls /data/ods/test
3. 测试结果
    组件功能可用性
      Kafka接收发送消息正常,无积压,集群运行稳定无异常Flink数据采集、转换、输出正常,集群运行稳定无异常
    数据验证
      实时性:数据生产和落地无延迟完整性:源数据和落地数据数据量一致,数据内容相同
实时大数据平台压测方案 1. 压测目的

本次性能测试在正式环境下单台服务器上Kafka处理消息能力及Flink承载能力进行压力测试。测试包括对Kafka写入消息和消费消息进行压力测试,根据不同量级的消息处理结果,评估Kafka的处理性能是否满足项目需求,Flink处理速度是否会产生背压

2. 测试范围及方法

测试使用测试脚本,通过Kafka发起大量写入请求。模拟不同数量级的消息写入和消费场景,查看Kafka处理不同数量级的消息数时的处理能力,包括每秒生成消息数、吞吐量、消息延迟时间

2.1 测试方法

Kafka消息写入创建的topic命名为myrs_consumer,Kafka消费读取的topic也是该topic,使用命令发起消费该topic的请求,针对不同的测试指标,本次我们采用固定其他值,动态变化测量值的方式来进行

2.2 准备工作

测试之前,先用linux命令去测试磁盘的读写速度,具体命令如下:

1.测试IO读
    hdparm -t --direct /dev/scda3
2.测试IO写
    sync;/usr/bin/time -p bash -c "(dd if=/dev/zero of=test.dd  bs=1M count=20000)"

磁盘读在163m/s-206m/s之间,而写速度是163m/s。后续评测我们以该磁盘测试为基准来

2.3 测试环境
    硬件资源:3台32C-CPU、32G内存、500G硬盘的虚拟机操作系统:Centos7程序版本:Kafka(2.11) Flink(1.12.5)
2.4 Kafka参数

zookeeper.version=3.4.5-cdh6.2.0--1

java.version=1.8.0_181

user.dir=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/kafka

Maximum Message Size=1000kb

Segment File Size=1G

Data Retention Time=7天

Data Directories=/var/local/kafka/data

Additional Broker Java Options:-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinmetaspaceFreeRatio=50 -XX:MaxmetaspaceFreeRatio=80 -XX:+DisableExplicitGC -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.host=127.0.0.1 -Dcom.sun.management.jmxremote.local.only=true -Djava.rmi.server.hostname=127.0.0.1

3. 测试过程 3.1 producer测试

测试目的:测试kafka producer吞吐量测试脚本

./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=10000   --throughput 30000
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=20000   --throughput 30000
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=40000   --throughput 30000
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=60000   --throughput 30000
./kafka-producer-perf-test.sh  --topic myrs_consumer --num-records 100000000 --record-size 687  --producer-props   bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092  batch.size=80000   --throughput 30000

测试结果

测试结论

发现在消息未压缩的前提下,20000条一批次之后吞吐稳定在30000条/s,而数据量在19.65M/s本次测试对数据的存储块大小未测,但在之前的测试中发现压缩以及解压的情况也是lz4算法最优,==lz4压缩最大时可以达到30w+/s的吞吐,而不压缩为12w/s,snappy最大为16w/s,gzip最大为5.8w/s==;故后续生产消息时建议采用lz4压缩,不仅可以节省磁盘,也可以大幅度增加我们的吞吐 3.2consumer测试

测试目的:测试consumer消费情况

./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576  --messages 100000  --threads 1 --hide-header --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576  --messages 100000  --threads 4 --hide-header  --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576  --messages 100000  --threads 7 --hide-header  --num-fetch-threads 1
./kafka-consumer-perf-test.sh bootstrap.servers=10.106.58.117:9092,10.106.58.118:9092,10.106.58.119:9092 --topic myrs_consumer --fetch-size 1048576  --messages 100000  --threads 10 --hide-header  --num-fetch-threads 1

测试结论:在threads为4时,消费速度最好达到24.1w/s,而后续慢慢平稳 4. 测试结果

producer方面,在主从同步选取1时性能和稳定性适中,压缩方面,我们选择lz4压缩方式,而批大小我们可以选择100w左右,并发保持在60,消息的大小建议在4k左右较好,分区数在3-5个,副本数为3个既可以保证性能也能维持高可用;consumer的处理线程我们选择4个,抓取消息大小则设置在400w条左右,抓取线程设置为10个即可broker的参数方面,replica.fetcher设置为服务器core的个数时较好,io.threads 则设置为core个数的3倍,network.threads保持和core个数相等即可,interval.messages数设置为2w,interval.ms则设置为10000 ms 5. 测试异常

    Kafka集群宕机
      现象:集群监控显示scdh2节点失去联系,查阅日志监控服务读取信息失败,提示空间不足解决方案:对Kafka存储空间扩容,并增加监控告警,避免极端情况下大量数据写入影响集群运行
6. 性能优化

    Kafka缓冲磁盘扩容

      kafka数据目录:scdh02 /var/local/kafka/data

      默认数据目录大小57G,增加磁盘 /root/data/kafka/data

    Flink集群配置高可用

      修改 /usr/app/flink-1.12.5/conf/flink-conf.yaml

        jobmanager.rpc.address: scdh01

      修改 /usr/app/flink-1.12.5/conf/workers

      scdh02
      scdh03

      修改flink-conf.yaml配置文件,增加高可用配置

      state.backend: filesystem
      state.checkpoints.dir: hdfs://scdh02:9000/flink-checkpoints
      state.savepoints.dir: hdfs://scdh02:9000/flink-savepoints
      
      high-availability: zookeeper
      high-availability.storageDir: hdfs://scdh02:9000/flink/ha/
      high-availability.zookeeper.quorum: scdh01:2181,scdh02:2181,scdh03:2181

      重启Flink集群

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745967.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号