public static void main(String [] args){
Properties pros = new Properties();
pros.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
pros.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000*60*5+"");
pros.setProperty(ProducerConfig.ACKS_CONFIG,"all");
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("node1")
.port(3306)
.databaseList("mydb")
.tableList("mydb.products")
.username("root").password("root!")
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
DataStreamSource mySQL_source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-cdc-Source");
//mySQL_source.setParallelism(4).print().setParallelism(1);
mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc",new SimpleStringSchema(),pros));
try {
env.execute("flink-cdc-test");
} catch (Exception e) {
e.printStackTrace();
}
}
1、如上面代码使用了原始的默认的序列化器并且并行度未设置所以默认是1,导致了数据倾斜,如果kafka主题是三个分区,在flink往kafka写入数据的时候设置3个并行度,数据就会均匀的写到三个分区中。
mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc2",new SimpleStringSchema(),pros)).setParallelism(3);
2、如上图,数据几乎全部写入到了0号分区了。这样肯定会影响效率。
public static void main(String [] args){
Properties pros = new Properties();
pros.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
pros.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000*60*5+"");
pros.setProperty(ProducerConfig.ACKS_CONFIG,"all");
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("node1")
.port(3306)
.databaseList("mydb")
.tableList("mydb.products")
.username("root").password("root!")
.startupOptions(StartupOptions.latest())
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
DataStreamSource mySQL_source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-Source");
//mySQL_source.setParallelism(4).print().setParallelism(1);
mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc2", new KafkaSerializationSchema(){
@Override
public ProducerRecord serialize(Object o, @Nullable Long aLong) {
return new ProducerRecord("testflinkcdc2",null,o.toString().getBytes());
}
},pros,FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
//mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc2",new SimpleStringSchema(),pros));
try {
env.execute("flink-cdc-test");
} catch (Exception e) {
e.printStackTrace();
}
}
1、使用该方式来实现,就不会出现数据倾斜了,数据被均匀的写到了每个分区中
mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc2", new KafkaSerializationSchema(){ @Override public ProducerRecord serialize(Object o, @Nullable Long aLong) { return new ProducerRecord("testflinkcdc2",null,o.toString().getBytes()); } },pros,FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
注意
当使用FlinkKafkaProducer.Semantic.EXACTLY_ONCE方式时,需要设置transaction.max.timeout.ms的值小于15分钟就行:
pros.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000*60*5+"");
Kafka brokers 默认的最大事务超时 transaction.max.timeout.ms 为15 minutes生产者设置事务超时不允许大于这个值。所以需要设置,否则会报如下错:



