## 流计算测试(storm)
配置 kafka 格式化磁盘(安装 kafka 节点的服务器)挂载一块磁盘将其格式化为ext4格式
分别挂载磁盘到安装 kafka 节点的服务器mkfs.ext4 /dev/nvme0n1
有几个盘创建几个文件夹
mkdir -p t0 t1 t2 t3
挂载
准备数据mount /dev/nvme0n1 /mnt/t0
mount /dev/nvme1n1 /mnt/t1
mount /dev/nvme2n1 /mnt/t2
mount /dev/nvme3n1 /mnt/t3
…
1.准备数据:/dev/shm/data
mkdir /mnt/t0/powertest
cd /mnt/t0/powertest
vim data.sh
./data.sh
data.sh 内容,目的是在/dev/shm/data下面写一堆的数据
#!/bin/bash
data=",测试测试测试测试测试测试测试测试,qw54234,safasdg,sadgasdg,235412351235,sagsags,32412342134,1,234,1.2314123,---,asfs,2142,afdsf"
for i in $(seq 1 10000 )
do
echo ${i}$data >> /dev/shm/data1
done
for j in $(seq 1 1000)
do
cat /dev/shm/data1 >> /dev/shm/data
done
在/mnt/t0/目录下创建软链接,指向/dev/shm/data
说明:如果有多个分区,需要修改脚本生成多个/dev/shm/data,分别在/mnt 下的 /mnt/t0~ 下创建软连接。示例:
topic 创建、查看ln -s /dev/shm/data /mnt/t0/data
cd /usr/hdp/2.6.0.3-8/kafka/bin/
- 创建 topic
./kafka-topics.sh --create --topic test-01 --partition 50 --replication-factor 1 --zookeeper loongson.com10.40.25.196:2181
- 查看 topic 名称
./kafka-topics.sh --zookeeper localhost:2181 --list
- 查看 新建topic 中是否有数据
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test-01 --from-beginning
- 查看某个Topic的详情
./kafka-topics.sh --topic test --describe --zookeeper loongson:2181
打印如下:下面会卡,需要ctrl+c退出,可能就说明了没数据吧
[root@loongson bin]#
[root@loongson bin]# ./kafka-topics.sh --create --topic test-01 --partition 50 --replication-factor 1 --zookeeper 10.40.25.196:2181
Created topic "test-01".
[root@loongson bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list
test-01
[root@loongson bin]#
[root@loongson bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test-01 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{metadata.broker.list=loongson.com:6667, request.timeout.ms=30000, client.id=console-consumer-69686, security.protocol=PLAINTEXT}
修改 /mnt/t0/powertest/files.properties
添加文件和对应的路径,多个节点可以后面换行跟上
file1=/mnt/t0/data ...修改/mnt/t0/powertest/producer.properties
将域名改成自己的。
添加数据到 topicschema=string,string,string,string,string,string,string,string,string,string,string,string,string,string,string
metadata.broker.list= loongson.com:6667
request.required.acks=-1
producer.type=sync
serializer.class=kafka.serializer.DefaultEncoder
分区数计算方法: 倍数(3 台机器×4 块盘)
正常情况
cd /mnt/t0/powertest
java -cp kafkaproducer.jar com.iie.kafka.AvroProducer test-01 560 1000
命令说明:
在kafka broker节点加载数据到topic中,分区数与创建的topic相同,可以多次加载文件到topic中,使流量撑满。
java -cp kafkaproducer.jar com.iie.kafka.AvroProducer topic名 分区数 1000(一条消息包含的数据条数)
执行如下:
schema : {“name”:“avrodata”,“type”:“record”,“fields”:[{“name”:“c0”,“type”:“string”},{“name”:“c1”,“type”:“string”},{“name”:“c2”,“type”:“string”},{“name”:“c3”,“type”:“string”},{“name”:“c4”,“type”:“string”},{“name”:“c5”,“type”:“string”},{“name”:“c6”,“type”:“string”},{“name”:“c7”,“type”:“string”},{“name”:“c8”,“type”:“string”},{“name”:“c9”,“type”:“string”},{“name”:“c10”,“type”:“string”},{“name”:“c11”,“type”:“string”},{“name”:“c12”,“type”:“string”},{“name”:“c13”,“type”:“string”},{“name”:“c14”,“type”:“string”}]}
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
修改配置文件 Storm 配置文件:storm.properties文件读取完毕!!
cd /mnt/t0/powertest
numkafkaconsumer=4 #numkafkaconsumer=20 (spouts kafka-read) numkafkaproducer=4 #numkafkaproducer=40 (bolts) numfilter=10 #numfilter=10 (bols) numworkers=1 #match 的并行度 nummatch=40 #count 的并行度 numcount=20kafka 配置文件:consumer.properties,kafkabolt.properties,kafkaspout.properties
cd /mnt/t0/powertest
consumer.properties
vim consumer.properties topic=test-01 (创建的 kafka topic 名) schema=string,string,string,string,string,string,string,string,str ing,string,string,string,string,string,string bootstrap.servers=masterhdp1.com:6667(自己的域名) group.id=loongson enable.auto.commit=true auto.commit.interval.ms=1000 session.timeout.ms=30000 key.deserializer=org.apache.kafka.common.serialization.ByteArrayDe serializer value.deserializer=org.apache.kafka.common.serialization.ByteArray Deserializer auto.offset.reset=earliest
kafkabolt.properties
vim kafkabolt.properties #topic 结果存放的 topic topic=result-topic bootstrap.servers=master.com:6667(需要修改为安装 kafka 的任意服务器的主机 名) acks=0 retries=0 batch.size=16384 key.serializer=org.apache.kafka.common.serialization.ByteArraySeri alizer value.serializer=org.apache.kafka.common.serialization.ByteArraySe rializer
kafkaspout.properties
vim kafkaspout.properties topic=test-01(创建的 kafka topic 名) schema=string,string,string,string,string,string,string,string,str ing,string,string,string,string,string,string bootstrap.servers=master.com:6667(需要修改为安装 kafka 的任意服务器的主机 名) group.id=loongson-20190815174200(不需要更改) enable.auto.commit=trueauto.commit.interval.ms=1000 session.timeout.ms=30000 key.deserializer=org.apache.kafka.common.serialization.ByteArrayDe serializer value.deserializer=org.apache.kafka.common.serialization.ByteArray Deserializer auto.offset.reset=earliest编译 power
power是个测试用例
如果版本不同或者更改了测试用例源码,则需要重新编译 power.zip
unzip power.zip
apt-get install maven
或
yum install maven
cd power
mvn package
cp power/target/storm-1.0-SNAPSHOT-jar-with-dependencies.jar /mnt/t0/powertest
编译遇到问题
[ERROR] /home/loongson/storm_test/power-zmh/src/main/java/com/iie/util/SplitUtils.java:[7,31] 错误: 程序包org.apache.commons.lang3不存在
解决
在/home/loongson/storm_test/power/pom.xml中添加如下内容
vim pom.xml
测试org.apache.commons commons-lang33.0
/home/storm/jjli
单字段字符串匹配过滤测试
查看是否有正在运行的 storm 任务停止正在运行的 storm 任务storm list
运行测试storm kill [storm list 查看到的任务名]
测试脚本内容
vim /mnt/t0/pownode1:/home/storm/jjli# ls
kafkabolt.properties kafkaspout.properties storm-1.0-SNAPSHOT-jar-with-dependencies.jar
storm.propertiest/test.sh
node1:/home/storm/jjli# ./test.sh test.shvim test.sh #!/bin/bash timstamp=`date +%Y%m%d%H%M%S` task=loongsonjjli-$timstamp kafka_config=kafkaspout.properties echo "start running task:$task" sed -i "s/group.id=*.*$/group.id=$task/g" $kafka_config echo -n group.id change to : grep group.id $kafka_config #storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.iie.storm.AvroTest $task 123 #storm jar storm-1.0-SNAPSHOT.jar com.iie.storm.AvroTest $task 123 storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.iie.storm.FilterTest $task 123 storm list
chmod +x test.sh
[root@loongson powertest]# pwd /mnt/t0/powertest [root@loongson powertest]# ls consumer.properties kafkabolt.properties producer.properties test.sh data.sh kafkaproducer.jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar files.properties kafkaspout.properties storm.properties [root@loongson powertest]# ./test.sh
storm 命令解析
storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.iie.storm.StormAvroTest $task 123
执行jar命令
storm jar
storm jar包的存放位置:
storm-1.0-SNAPSHOT-jar-with-dependencies.jar
拓扑入口类,main函数:
com.iie.storm.StormAvroTest
main函数参数
$task 123
- 使用 dstat 查看网卡流量
执行完脚本后过一分钟左右会较大的流量显示,
主要关注的cpu usr、dsk read writ、net recv send
dstat -cdmn
[root@master kafka]# dstat -cdmn ----total-cpu-usage---- -dsk/total- ------memory-usage----- -net/total- usr sys idl wai hiq siq| read writ| used buff cach free| recv send 82 14 2 0 0 2| 0 152k|14.0G 421M 12.0G 5111M|9893k 121k 80 14 4 0 0 2| 0 8192B|14.1G 421M 12.0G 5017M| 11M 83k 87 11 1 0 0 1| 0 16k|14.1G 421M 12.0G 5004M|1564k 30k 87 9 2 0 0 2| 0 104k|14.2G 421M 12.0G 4983M|6754k 117k 88 10 1 0 0 2| 0 80k|14.2G 421M 12.0G 4971M|4177k 107k 88 10 0 0 0 2| 0 80k|14.2G 421M 12.0G 4964M|4657k 89k 89 9 1 0 0 1| 0 16k|14.2G 421M 12.0G 4959M|2111k 41k 83 12 2 0 0 2| 0 100k|14.2G 421M 12.0G 4946M|7818k 143k 81 12 4 0 0 3| 0 0 |14.3G 421M 12.0G 4803M| 10M 137k 87 11 0 0 0 2| 0 84k|14.4G 421M 12.0G 4791M|7710k 125k 84 13 0 0 0 3| 0 180k|14.4G 421M 12.0G 4790M| 11M 182k 87 12 1 0 0 1| 0 172k|14.4G 421M 12.0G 4776M| 970k 70k 90 10 0 0 0 1| 0 0 |14.4G 421M 12.0G 4759M| 46k 79k 86 10 2 0 0 2| 0 104k|14.4G 421M 12.0G 4749M|6782k 127k 88 9 2 0 0 2| 0 96k|14.4G 421M 12.0G 4744M| 12M 96k 86 10 3 0 0 1| 0 1164k|14.4G 421M 12.0G 4694M|1958k 127k 89 10 0 0 0 1| 0 16k|14.5G 421M 12.0G 4671M| 82k 128k 91 8 0 0 0 2| 0 108k|14.5G 421M 12.0G 4665M|8552k 43k 90 9 0 0 0 1| 0 0 |14.5G 421M 12.0G 4662M|8519k 54k 89 9 1 0 0 2| 0 80k|14.5G 421M 12.0G 4617M|5478k 131k 92 7 1 0 0 1| 0 8376k|14.6G 421M 12.0G 4586M| 59k 117k 88 11 0 0 0 1| 0 4096B|14.6G 421M 12.0G 4543M| 44k 67k 92 8 0 0 0 1| 0 0 |14.6G
- 从topic结果中查看,storm处理完的数据会输出道topic-result中,使用kafka-console-consumer.sh可以查看处理结果,topic结果名脚本有设置
执行如下
[root@master kafka]# [root@master kafka]# cd /usr/hdp/2.6.0.3-8/kafka [root@master kafka]# bin/kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets result-loongson result-loongson20190821160846 test-03 [root@master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic result-loongson --from-beginning …… 123 123 123 123 123 123 123 123 123 ……过滤后聚合测试 查看是否有正在运行的 storm 任务
storm list
停止正在运行的 storm 任务storm kill [storm list 查看到的任务名]
编写测试脚本vim /mnt/t0/powertest/test2.sh
#! /bin/bash
mytime=`date +%Y%m%d%H%M%S`
user=loongson
groupid=${user}-${mytime}
result=result-loongson${mytime}
name=${user}-${mytime}
#sip:若原始数据的第一个字段等于 sip,则把数据传给下一个 bolt 进行分组。
sip=2001
#word:用 word 来对第三个字段进行分组,若第三个字段等于 word,则把第三个数据的数量和内容输出到 kafka 的 topic中。
word=qw54234
testtype=$1
echo $groupid
sed -i "4c group.id=$groupid" ./kafkaspout.properties
sed -i "2c topic=$result" ./kafkabolt.properties
echo "storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.iie.storm.$testtype $name $sip $word"
storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.iie.storm.$testtype $name $sip $word
运行测试
[root@loongson powertest]# pwd /mnt/t0/powertest [root@loongson powertest]# ls consumer.properties kafkaproducer.jar producer.properties test2.sh data.sh kafkaspout.properties storm-1.0-SNAPSHOT.jar test.sh files.properties power.zip storm-1.0-SNAPSHOT-jar-with-dependencies.jar kafkabolt.properties power-zmh storm.properties [root@loongson powertest]# ./test2.sh GroupbyTest 7877 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: loongson-20190819175903查看测试结果
- 使用dstat查看
执行完脚本后过一分钟左右会较大的流量显示,
主要关注的cpu usr、dsk read writ、net recv send
dstat -cdmn
90 7 1 0 0 2| 0 0 |13.4G 421M 11.9G 5874M|9336k 95k 92 6 1 0 0 2| 0 164k|13.5G 421M 11.9G 5860M| 26M 197k 90 7 0 0 0 2| 0 252k|13.5G 421M 11.9G 5814M|4230B 2642B 86 7 7 0 0 0| 0 72k|13.5G 421M 11.9G 5804M| 11k 24k 84 13 3 0 0 0| 0 0 |13.5G 421M 11.9G 5797M| 18k 136k 88 8 1 0 0 3| 0 12k|13.5G 421M 11.9G 5793M| 16M 149k 87 9 1 0 0 3| 0 100k|13.5G 421M 11.9G 5794M| 14M 152k 88 9 1 0 0 2| 0 0 |13.5G 421M 11.9G 5796M| 19M 158k 87 6 5 0 0 2| 0 1884k|13.5G 421M 11.9G 5794M|6456k 275k 86 9 2 0 0 3| 0 4096B|13.6G 421M 11.9G 5749M| 16M 187k 89 8 0 0 0 3| 0 0 |13.6G 421M 11.9G 5741M| 17M 161k 89 9 0 0 0 2| 0 96k|13.6G 421M 11.9G 5689M|8539k 214k 86 12 1 0 0 1| 0 4096B|13.6G 421M 11.9G 5681M|8062k 249k 84 12 1 0 0 3| 0 308k|13.6G 421M 11.9G 5685M| 17M 142k 84 13 3 0 0 1| 0 0 |13.6G 421M 11.9G 5749M|2715k 29k ----total-cpu-usage---- -dsk/total- ------memory-usage----- -net/total- usr sys idl wai hiq siq| read writ| used buff cach free| recv send 85 8 5 0 0 2| 0 104k|13.6G 421M 11.9G 5739M|7257k 113k 90 6 3 0 0 1| 0 92k|13.6G 421M 11.9G 5730M|7732k 134k 87 8 4 0 0 2| 0 24k|13.6G 421M 11.9G 5737M| 11M 233k 87 9 2 0 0 2| 0 204k|13.6G 421M 11.9G 5750M|8462k 68k 90 7 3 0 0 0| 0 4096B|13.6G 421M 11.9G 5746M| 67k 12k 88 8 1 0 0 2| 0 96k|13.6G 421M 11.9G 5745M| 14M 105k 87 10 3 0 0 1| 0 72k|13.6G 421M 11.9G 5744M|1982k 34k 91 6 3 0 0 1| 0 16k|13.6G 421M 11.9G 5742M| 830k 25k 84 7 7 0 0 2| 0 1940k|13.6G 421M 11.9G 5738M| 13M 129k 91 4 4 0 0 0| 0 4096B|13.6G 421M 11.9G 5728M| 274k 18k 88 5 5 0 0 1| 0 172k|13.6G 421M 11.9G 5724M|8476k 102k
- 从topic结果中查看,storm处理完的数据会输出道topic-result中,使用kafka-console-consumer.sh可以查看处理结果,topic结果名脚本有设置
执行如下
[root@master kafka]# cd /usr/hdp/2.6.0.3-8/kafka [root@master kafka]# bin/kafka-topics.sh --zookeeper localhost:2181 --list [root@master kafka]# bin/kafka-topics.sh --zookeeper localhost:2181 --list__consumer_offsets result-loongson result-loongson20190821160846 test-03 [root@master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic result-loongson20190821160846 --from-beginning …… 1:qw54234 1:qw54234 1:qw54234 1:qw54234 1:qw54234 1:qw54234 1:qw54234 ……
com.iie.storm.StormAvroTest $task 123
storm list



