栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink同步Mysql数据到Hive(不开启Binlog)

Flink同步Mysql数据到Hive(不开启Binlog)

Flink同步Mysql数据到Hive(不开启Binlog) 方式一:自定义Source和自定义Sink

缺点:每从Mysql取一条数据插入到HIve,就会生成一次MR,很慢。

package com.blog;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class StreamMysqlToHiveCommon {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        System.setProperty("HADOOP_USER_NAME", "root");
        DataStream> streamSource = env.addSource(new MyMysqlSource("select * from test.mysql_hive"));
        streamSource.print();
        streamSource.addSink(new MyHiveSink("insert into test.mysql_hive(id,name,age,money,todate,ts) values(?,?,?,?,?,?)"));
        env.execute();


    }

    
    public static class MyMysqlSource extends RichSourceFunction> {
        private String sql;
        private Connection conn = null;
        private PreparedStatement pstm = null;
        private ResultSet rs = null;

        public MyMysqlSource(String sql) {
            this.sql = sql;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            String driver = "com.mysql.cj.jdbc.Driver";
            String url = "jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai";
            String username = "root";
            String password = "A";
            Class.forName(driver);
            conn = DriverManager.getConnection(url, username, password);
            pstm = conn.prepareStatement(sql);
        }

        @Override
        public void run(SourceContext> sourceContext) throws Exception {
            rs = pstm.executeQuery();
            int count = rs.getmetaData().getColumnCount();
            ArrayList bean = new ArrayList<>();
            while (rs.next()) {
                bean.clear();
                for (int i = 1; i <= count; i++) {
                    bean.add(rs.getString(i));
                }
                sourceContext.collect(bean);
            }

        }

        @Override
        public void cancel() {

        }

        @Override
        public void close() throws Exception {
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (pstm != null) {
                try {
                    pstm.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    
    public static class MyHiveSink extends RichSinkFunction> {
        private PreparedStatement pstm;
        private Connection conn;
        private String sql;

        public MyHiveSink(String sql) {
            this.sql = sql;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            conn = getConnection();
            pstm = conn.prepareStatement(sql);

        }

        @Override
        public void invoke(List value, Context context) throws Exception {
            for (int i = 1; i <= value.size(); i++) {
                pstm.setString(i, value.get(i - 1));
            }
            pstm.executeUpdate();

        }

        @Override
        public void close() {
            if (pstm != null) {
                try {
                    pstm.close();
                } catch (SQLException e) {

                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }

        private static Connection getConnection() {
            Connection conn = null;
            try {
                String jdbc = "org.apache.hive.jdbc.HiveDriver";
                String url = "jdbc:hive2://192.168.88.108:10000/test";
                String user = "root";
                String password = "";
                Class.forName(jdbc);
                conn = DriverManager.getConnection(url, user, password);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
            return conn;
        }
    }
}

方式二:FlinkSQL使用JDBC和HiveCatalog

缺点:生成一个文件,Mysql表的字段必须大于等于Hive的字段。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

public class MysqlToHiveSql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        System.setProperty("HADOOP_USER_NAME", "root");
        String columns = "id INT ,name STRING, age INT,money DOUBLE,todate DATE,ts TIMESTAMP";
        String mysql_source_table = "mysql_hive";
        String flink_source_table = "flink_mysql_hive";
        String base_sql = "CREATE TABLE %s (%s) " +
                "WITH (" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai'," +
                "'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
                "'connector.table' = '%s'," +
                " 'connector.username' = 'root'," +
                " 'connector.password' = 'A'" +
                " )";
        String source_ddl = String.format(base_sql, flink_source_table, columns, mysql_source_table);
        tableEnv.executeSql(source_ddl);
        Table dataTable = tableEnv.sqlQuery("select * from " + flink_source_table);
        // hive catalog
        String name = "hive-test";
        //数据库
        String defaultDatabase = "test";
        // Hive的hive-site.xml文件所在路径
        String hiveConfDir = "FlinkSQL/src/main/resources";
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog(name, hive);
        tableEnv.useCatalog(name);
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tableEnv.useDatabase("test");

        StatementSet statementSet = tableEnv.createStatementSet();
        statementSet.addInsert("mysql_hive", dataTable);
        statementSet.execute();
    }
}
方式三:Flink使用StreamingFileSink,直接保存到HDFS

缺点:需要自定义实体类,以及数据量的大小和未达到阈值,会生成临时文件,HIve无法直接读取

import com.utils.JdbcUtils;
import com.utils.SpendTimeUtils;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.*;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class MysqlToHDFS {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream streamSource = env.addSource(new MyMysqlSource("select * from test.mysql_hive"));
        System.setProperty("HADOOP_USER_NAME", "root");
        // Hive表的HDFS路径
        String outputbasePath = "hdfs://docker:9820/user/hive/warehouse/mysql_hive";


        StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path(outputbasePath), new SimpleStringEncoder("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(30))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024 * 1024 * 128)
                                .build())
                .withBucketAssigner(new CustomBucketAssigner("yyyyMMdd", ZoneId.of("Asia/Shanghai"), "dt="))
                .withBucketCheckInterval(1)
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartSuffix("--")
                                .withPartPrefix("part")
                                .withPartSuffix(".ext")
                                .build())
                .build();
        streamSource.addSink(sink);
        env.execute();
    }

    
    public static class MyMysqlSource extends RichSourceFunction {
        private String sql;
        private Connection conn = null;
        public MyMysqlSource(String sql) {
            this.sql = sql;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            String driver = "com.mysql.cj.jdbc.Driver";
            String url = "jdbc:mysql://192.168.88.2:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai";
            String username = "root";
            String password = "A";
            Class.forName(driver);
            conn = DriverManager.getConnection(url, username, password);
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            List personList = JdbcUtils.queryList(conn, sql, Person.class);
            for (Person person : personList) {
                sourceContext.collect(person);
            }
        }

       

        @Override
        public void cancel() {
        }

        @Override
        public void close() throws Exception {
            Thread.sleep(3000);
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    public static class CustomBucketAssigner implements BucketAssigner {
        private static final long serialVersionUID = 1L;
        private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd";
        private final String formatString;
        private final ZoneId zoneId;
        private final String column;
        private transient DateTimeFormatter dateTimeFormatter;
        public CustomBucketAssigner() {
            this(DEFAULT_FORMAT_STRING);
        }
        public CustomBucketAssigner(String formatString) {
            this(formatString, ZoneId.systemDefault(), "");
        }
        public CustomBucketAssigner(ZoneId zoneId) {
            this(DEFAULT_FORMAT_STRING, zoneId, "");
        }
        public CustomBucketAssigner(String formatString, String column) {
            this(formatString, ZoneId.systemDefault(), column);
        }
        public CustomBucketAssigner(String formatString, ZoneId zoneId) {
            this(formatString, ZoneId.systemDefault(), "");
        }
        public CustomBucketAssigner(String formatString, ZoneId zoneId, String column) {
            this.formatString = Preconditions.checkNotNull(formatString);
            this.zoneId = Preconditions.checkNotNull(zoneId);
            this.column = Preconditions.checkNotNull(column);
        }

        @Override
        public String getBucketId(Person element, Context context) {
            if (dateTimeFormatter == null) {
                dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
            }
            return "";
        }
        @Override
        public SimpleVersionedSerializer getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
        @Override
        public String toString() {
            return "DateTimeBucketAssigner{"
                    + "formatString='"
                    + formatString
                    + '''
                    + ", zoneId="
                    + zoneId
                    + '}';
        }
    }

    public static  class CustomRollingPolicy implements RollingPolicy{
        @Override
        public boolean shouldRollOnCheckpoint(PartFileInfo partFileState) throws IOException {
            return false;
        }

        @Override
        public boolean shouldRollOnEvent(PartFileInfo partFileState, Person element) throws IOException {
            return false;
        }

        @Override
        public boolean shouldRollOnProcessingTime(PartFileInfo partFileState, long currentTime) throws IOException {
            return false;
        }
    }
}

实体类

import java.sql.Date;
import java.sql.Timestamp;
public class Person  {
    private int id;
    private String name;
    private int age;
    private double money;
    private Date todate;
    private Timestamp ts;
    public Person(int id, String name, int age, double money, Date todate, Timestamp ts) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.money = money;
        this.todate = todate;
        this.ts = ts;
    }
    public Person() {
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public double getMoney() {
        return money;
    }
    public void setMoney(double money) {
        this.money = money;
    }
    public Date getTodate() {
        return todate;
    }
    public void setTodate(Date todate) {
        this.todate = todate;
    }
    public Timestamp getTs() {
        return ts;
    }
    public void setTs(Timestamp ts) {
        this.ts = ts;
    }
    @Override
    public String toString() {
        return id + "t" +
                name + "t" +
                age + "t" +
                money + "t" +
                todate + "t" +
                ts;
    }
}

工具类

import org.apache.commons.beanutils.BeanUtils;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class JdbcUtils {
    public static  List queryList(Connection connection, String querySql, Class clz) throws SQLException, IllegalAccessException, InstantiationException, InvocationTargetException {
        //创建集合用于存放查询结果数据
        ArrayList resultList = new ArrayList<>();
        //预编译SQL
        PreparedStatement preparedStatement = connection.prepareStatement(querySql);
        //执行查询
        ResultSet resultSet = preparedStatement.executeQuery();
        //解析结果集resultSet
        ResultSetmetaData metaData = resultSet.getmetaData();
        int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            //创建泛型对象
            T t = clz.newInstance();
            //给泛型对象赋值
            for (int i = 1; i <= columnCount; i++) {
                //获取列名
                String columnName = metaData.getColumnName(i);
                //获取列值
                Object value = resultSet.getObject(i);
                BeanUtils.setProperty(t,columnName,value);
            }
            //将该对象添加至集合
            resultList.add(t);

        }
        preparedStatement.close();
        resultSet.close();
        //返回结果集合
        return resultList;

    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758908.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号