栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink自定义sink端SinkFunction的应用

Flink自定义sink端SinkFunction的应用

场景说明
从kafka里面读取数据,经过处理写入到mysql里面。在flink里面就是从source到sink的过程,那么本篇文章说明一下,mysqlsink的单条数据插入和批量数据插入操作。
1、先说明一下SinkFunction的继承关系


通过API我们可以看到sinkFunction接口的实现类有很多。比如常用的RichSinkFunction。

2、mysql单条数据插入
public class SinkToMysql extends RichSinkFunction> {
public void invoke(Tuple2 value, Context context) throws Exception {
        conn = getConnection();
        String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
        ps = this.conn.prepareStatement(sql);
        ps.setString(1, value.f0);
        ps.setString(2, value.f0 + "1");
//            ps.addBatch();
        int resultBatch = ps.executeUpdate();
        System.out.println("成功插入数据 " + resultBatch + "行");
        this.close();
    }
}	

上面的类实现RichSikFunction类,重写invoke方法,该方法用来执行与数据库的交互,插入数据 。该方法是上游有一条数据就会执行一次。实际开发的时候效率低。

3、MySQL 批量插入

当程序到sink的时候,我们可以把上游的数据先积攒一段时间,在按批一起发送到sink的下游。

public class SinkToMysqlBatch implements SinkFunction> {
    PreparedStatement ps;
    Connection conn;

    @Override
    public void invoke(ArrayList value, Context context) throws Exception {
        conn = SinkToMysql.getConnection();
        conn.setAutoCommit(false);
        String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);";
        ps = this.conn.prepareStatement(sql);
        for (String s: value) {
            ps.setString(1, s);
            ps.setString(2,s+1);
            ps.addBatch();
        }
        int[] resultBatch = ps.executeBatch();
        System.out.println("当前窗口插入数据:" + resultBatch.length+ " 行");
        conn.commit();
        try {
            if (ps != null) {
                ps.close();
            }
            if (conn != null) {
                conn.close();
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

}

上面的批量操作,自定义类实现了SinkFunction接口,重写了invoke方法。泛型是集合类型,表示上游传递过来的数据是一个集合。我们需要在invoke方法中去进行遍历。一个batch写入到mysql。

实际开发遇到的问题是


在这段代码中,addSink(中自定义的mysql批量类是继承了RichSinkFunction类,但是这边会报错说必须要实现了SinkFunction接口)。这边有疑问。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/674112.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号