栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink实时写入Mysql数据

Flink实时写入Mysql数据

1.需要的依赖

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; 、
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;

使用JDBCAppendTableSink方法创建mysqlsink写入,数据是追加到数据库的

 //创建流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        environment.setParallelism(2);
        //加载自定义数据形成数据流
        DataStreamSource personSource = environment.addSource(new DataDB());
        //转换为Row类型才能写入mysql
        DataStream personRow = personSource.map(new MapFunction() {
            @Override
            public Row map(Person person) throws Exception {
                Row row = new Row(4);
                row.setField(0, person.getPid());
                row.setField(1, person.getPname());
                row.setField(2, person.getPsex());
                row.setField(3, person.getPage());
                return row;
            }
        });
        //定义调用JDBCAppendTableSink创建mysqlSink
        JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder()
                //驱动程序
                .setDrivername("com.mysql.cj.jdbc.Driver")
                //连接URL地址
                .setDBUrl("jdbc:mysql://localhost:3306/school?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC")
                //用户名
                .setUsername("root")
                //密码
                .setPassword("1234")
                //写入类型
                .setParameterTypes(BasicTypeInfo.INT_TYPE_INFO,
                                    BasicTypeInfo.STRING_TYPE_INFO,
                                    BasicTypeInfo.STRING_TYPE_INFO,
                                    BasicTypeInfo.INT_TYPE_INFO)
                //写入语句
                .setQuery("insert into person values (?,?,?,?);")
                //一次批量写入条数
                .setBatchSize(10)
                .build();
        //调用mysqlsink写入mysql数据
        tableSink.emitDataStream(personRow);
        //执行程序
        environment.execute();

如果出现以下报错,

The server time zone value ‘�й���׼ʱ��’ is unrecognized or represents
more than one time zone. You must configure either the server or JDBC
driver (via the serverTimezone configuration property) to use a more
specifc time zone value if you want to utilize time zone support.

需要在url地址里面添加如下代码

&serverTimezone=UTC

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/720088.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号