测试版本为FlinkX1.10
最终的测试结果:
设置parallelism并行度,在json文件的speed.channel里配置 设置taskmanger内存,在flink-conf.yaml的taskmanager.memory.process.size里配置 设置slot个数,在flinkx-conf.yaml的taskmanager.numberOfTaskSlots里配置 -confProp只有配置jobmanager.memory.mb才有生效,其他配置都不生效yarn 配置
yarn.scheduler.minimum-allocation-mb=1G yarn.scheduler.minimum-allocation-vcores=1 yarn.scheduler.maximum-allocation-mb=12G yarn.scheduler.maximum-allocation-vcores=12
Flink配置
jobmanager.rpc.address: server001 jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.memory.process.size: 1024m taskmanager.numberOfTaskSlots: 1 parallelism.default: 2
测试一
mysql2hive.json配置
"splitPk": "id" "channel": 2
flinkx任务执行脚本
/usr/local/src/flinkx-1.10/bin/flinkx
-mode yarnPer
-job /usr/local/src/flinkx-1.10/job/mysql2hive.json
-queue root.default
正确,由于mysql2hive任务设置了2个通道读写,也就是开启了2个parallelism,由于slot=1,所以是2个taskmanager,3个container,3核CPU,3G内存
启发:难道flinkx需要设置指定的分隔符,才能使parallelism并发度生效呢?走去看下一个测试
测试二,stream_stream.json
{
"job" : {
"content" : [ {
"reader" : {
"parameter" : {
"column" : [ {
"name": "id",
"type" : "id"
}, {
"name": "string",
"type" : "string"
} ],
"sliceRecordCount" : [ "10000"]
},
"name" : "streamreader"
},
"writer" : {
"parameter" : {
"print" : true
},
"name" : "streamwriter"
}
} ],
"setting" : {
"speed" : {
"channel" : 2
}
}
}
}
flinkx启动脚本
/usr/local/src/flinkx-1.10/bin/flinkx
-mode yarnPer
-job /usr/local/src/flinkx-1.10/docs/example/stream_stream.json
-queue root.default
之前还以为只有关系型数据库才能设置并行度读取数据,其实不然
parallelism正常,taskmanager数 = parallelism/slot=2/1=2个
测试三,接着上面的配置,修改flink-conf.yaml的parallelism.default: 1 和 taskmanager.numberOfTaskSlots: 2
flinkx启动脚本
Flink配置
parallelism.default: 1
taskmanager.numberOfTaskSlots: 2
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
FlinkX的json配置
"channel" : 2
/usr/local/src/flinkx-1.10/bin/flinkx
-mode yarnPer
-job /usr/local/src/flinkx-1.10/docs/example/stream_stream.json
-queue root.default
预计是taskmanager数=parallelism / slot = 2 / 2 = 1,也就是2个container,CPU数=taskmanger数 * slot + 1 = 1*2+1=3核,2G内存。
正常
测试四,接着上面的配置,修改json.speed.channel=4
Flink配置
parallelism.default: 1
taskmanager.numberOfTaskSlots: 2
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
FlinkX的json配置
"speed" : { "channel" : 4 }
/usr/local/src/flinkx-1.10/bin/flinkx
-mode yarnPer
-job /usr/local/src/flinkx-1.10/docs/example/stream_stream.json
-queue root.default
预计taskmanager数= 4/2 =2,3个container,CPU数=taskmanger数 * slot + 1 = 2*2+1=5G,使用内存=jobmanager.heap+taskmanager数*taskmanager.memory=1024+2*1024=3G
正常
测试五,接着上面的配置测试,修改taskmanager.memory.process.size: 2048m
Flink的配置
parallelism.default: 1
taskmanager.numberOfTaskSlots: 2
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 2048m
Flinkx的json配置
channel=4
/usr/local/src/flinkx-1.10/bin/flinkx
-mode yarnPer
-job /usr/local/src/flinkx-1.10/docs/example/stream_stream.json
-queue root.default
预计taskmanager数=parallelism/slot=4/2=2,3个container,CPU数=taskmanger数*slot+1=2*2+1=5核,使用内存=jobmanager.heap+taskmanager数*taskmanager.memory=1024+2*2048=5G
正常
最终需要着手的是,如何使-confProp的taskmanger内存和slot能够生效



