栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink-cdc1.13.6读取mysql-binlog写入kafka出现数据倾斜

flink-cdc1.13.6读取mysql-binlog写入kafka出现数据倾斜

一、flink-cdc读取数据写入kafka出现了数据倾斜
public static void main(String [] args){
         Properties pros = new Properties();
         pros.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
        pros.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000*60*5+"");
         pros.setProperty(ProducerConfig.ACKS_CONFIG,"all");
         MySqlSource mySqlSource = MySqlSource.builder()
                 .hostname("node1")
                 .port(3306)
                 .databaseList("mydb")
                 .tableList("mydb.products")
                 .username("root").password("root!")
                 .startupOptions(StartupOptions.latest())
                 .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                 .build();

         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(5000);
         DataStreamSource mySQL_source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql-cdc-Source");
         //mySQL_source.setParallelism(4).print().setParallelism(1);
       mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc",new SimpleStringSchema(),pros));
         try {
             env.execute("flink-cdc-test");
         } catch (Exception e) {
             e.printStackTrace();
         }
     }

1、如上面代码使用了原始的默认的序列化器并且并行度未设置所以默认是1,导致了数据倾斜,如果kafka主题是三个分区,在flink往kafka写入数据的时候设置3个并行度,数据就会均匀的写到三个分区中。

mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc2",new SimpleStringSchema(),pros)).setParallelism(3);


2、如上图,数据几乎全部写入到了0号分区了。这样肯定会影响效率。

二、解决办法
public static void main(String [] args){
         Properties pros = new Properties();
         pros.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092");
         pros.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000*60*5+"");
         pros.setProperty(ProducerConfig.ACKS_CONFIG,"all");
         MySqlSource mySqlSource = MySqlSource.builder()
                 .hostname("node1")
                 .port(3306)
                 .databaseList("mydb")
                 .tableList("mydb.products")
                 .username("root").password("root!")
                 .startupOptions(StartupOptions.latest())
                 .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                 .build();

         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(5000);
         DataStreamSource mySQL_source = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-Source");
         //mySQL_source.setParallelism(4).print().setParallelism(1);
         mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc2", new KafkaSerializationSchema(){

             @Override
             public ProducerRecord serialize(Object o, @Nullable Long aLong) {
                 return new ProducerRecord("testflinkcdc2",null,o.toString().getBytes());
             }
         },pros,FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
         //mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc2",new SimpleStringSchema(),pros));
         try {
             env.execute("flink-cdc-test");
         } catch (Exception e) {
             e.printStackTrace();
         }
     }

1、使用该方式来实现,就不会出现数据倾斜了,数据被均匀的写到了每个分区中

mySQL_source.addSink(new FlinkKafkaProducer("testflinkcdc2", new KafkaSerializationSchema(){
             @Override
             public ProducerRecord serialize(Object o, @Nullable Long aLong) {
                 return new ProducerRecord("testflinkcdc2",null,o.toString().getBytes());
             }
         },pros,FlinkKafkaProducer.Semantic.EXACTLY_ONCE));


注意
当使用FlinkKafkaProducer.Semantic.EXACTLY_ONCE方式时,需要设置transaction.max.timeout.ms的值小于15分钟就行:

pros.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000*60*5+"");

Kafka brokers 默认的最大事务超时 transaction.max.timeout.ms 为15 minutes生产者设置事务超时不允许大于这个值。所以需要设置,否则会报如下错:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745340.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号