栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink-cdc(java案例)

flink-cdc(java案例)

flinkcdc是一款flink的开源项目,他继承了传统的cdc工具,让实时开发更适合flink语言详情见下:

1:开发前的准备

cdc开发所需要的maven依赖-切记flink版本的更新导致不同的工具在使用时候会存在不兼容的问题


    4.0.0

    com.lkr.flink
    flink-cdc
    1.0
    jar

    Flink Quickstart Job

    
        UTF-8
        1.13.5
        1.8
        2.12
        2.12.1
        8
        8
        5.1.49
        2.0.0
        1.2.75
    

    
        
            apache.snapshots
            Apache Development Snapshot Repository
            https://repository.apache.org/content/repositories/snapshots/
            
                false
            
            
                true
            
        
    

    
        
            org.apache.flink
            flink-java
            ${flink.version}
            provided
        

        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
            provided
        

        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
            provided
        

        
            org.apache.hadoop
            hadoop-common
            2.8.0
        
        
            org.apache.hive
            hive-exec
            2.3.6
        
        
            org.apache.hadoop
            hadoop-hdfs
            2.8.0
        
        
            org.apache.hadoop
            hadoop-client
            2.8.0
        

        
            junit
            junit
            3.8.1
            test
        

        
            org.apache.flink
            flink-table-planner-blink_${scala.binary.version}
            ${flink.version}
            provided
        

        
            com.ververica
            flink-connector-mysql-cdc
            ${flinkcdc.version}
        

        
            com.alibaba
            fastjson
            ${fastjson.version}
        

        
            org.apache.flink
            flink-connector-jdbc_${scala.binary.version}
            ${flink.version}
        

        
            mysql
            mysql-connector-java
            ${mysql.version}
        

        
            org.apache.flink
            flink-connector-kafka_${scala.binary.version}
            ${flink.version}
        

        
            com.alibaba
            fastjson
            1.2.68
        

        
            mysql
            mysql-connector-java
            5.1.47
        
    
    
        

            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.1
                
                    ${target.java.version}
                    ${target.java.version}
                
            

            
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.1.1
                
                    
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    org.apache.flink:force-shading
                                    com.google.code.findbugs:jsr305
                                    org.slf4j:*
                                    org.apache.logging.log4j:*
                                
                            
                            
                                
                                    
                                    *:*
                                    
                                        meta-INF/*.SF
                                        meta-INF/*.DSA
                                        meta-INF/*.RSA
                                    
                                
                            
                            
                                
                                    com.lkr.flink.StreamingJob
                                
                            
                        
                    
                
            
        
    
2:开启mysql的binlog日志

 

一般binlog都保存在/etc/my.cnf中具体配置如下

开启日之后可以登陆到mysql中查看binlog是否开启(on表示已经开启)

 之后我们需要在mysql创建一张测试的表(test_cdc)

 3:创建kafka的toptic用于我们最后将同步数据sink到对应的主题

 

 4:打开kafka和zk在目标端开启对应的消费者

 5:flinkcdc默认的序列化器不够灵活我们首先可以自定义序列化器
package flink_cdc;

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Struct;

import java.util.List;

public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
        Struct value = (Struct) sourceRecord.value();
        Struct source = value.getStruct("source");

        //获取数据库名称
        String db = source.getString("db");
        String table = source.getString("table");

        //获取数据类型
        String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
        if(type.equals("create")){
            type = "insert";
        }
        JSonObject jsonObject = new JSonObject();
        jsonObject.put("database",db);
        jsonObject.put("table",table);
        jsonObject.put("type",type);
        //获取数据data
        Struct after = value.getStruct("after");
        JSonObject jsonObject2 = new JSonObject();
        List fields = after.schema().fields();
        for (Field field : fields) {
            String field_name = field.name();
            Object fieldValue  = after.get(field);
            jsonObject2.put(field_name,fieldValue);
        }

        jsonObject.put("date",jsonObject2);
        //向下游传递数据
        collector.collect(jsonObject.toJSonString());
    }

    @Override
    public TypeInformation getProducedType() {
        return TypeInformation.of(String.class);
    }
}
6:为了方操作创建一个简单的kafka的连接类
package flink_cdc;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;

public class kafakUntil {
    private static String KAFKA_SERVER = "hadoop:9092";
    private static Properties properties =  new Properties();
    static {
        properties.setProperty("bootstrap.servers",KAFKA_SERVER);
    }

    public static FlinkKafkaProducer getKafkaSink(String topic){
        return new FlinkKafkaProducer(topic,new SimpleStringSchema(),properties);
    }
}
7:入口类

1:我们使用的是flinkcdc-mysql的jar包所以远端只能直连mysql其他的连接可以在官网下载对应的依赖或者jar包

package flink_cdc;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class demo_cdc {
    public static void main(String[] args) throws Exception {
//        Properties debeziumProperties = new Properties();
//        debeziumProperties.put("snapshot.locking.mode", "none");
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        DebeziumSourceFunction source = MySqlSource.builder()
                .hostname("192.168.144.130")
                .port(3306)
                .databaseList("cdc_test")
                .tableList("cdc_test.*")
                .username("root")
                .password("123456")
                .deserializer(new MyDeserializationSchemaFunction())  //cdc读取数据的模式
                .startupOptions(StartupOptions.initial())
//                .debeziumProperties(debeziumProperties)
                .build();

        DataStreamSource mysql_source = environment.addSource(source);
        mysql_source.addSink(kafakUntil.getKafkaSink("test_cdc"));
        environment.execute("flink-cdc");
    }
}

至此我们就可以启动主程序完成一个实时的cdc同步任务

8:在msql端新增或者修改一条或多条数据

 9:kafka端就会产生对应的json结果进行反馈

 10:注意kafak只支持upsert操作,如果源端有删除操作,java程序会报错
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711853.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号