从kafka里面读取数据,经过处理写入到mysql里面。在flink里面就是从source到sink的过程,那么本篇文章说明一下,mysqlsink的单条数据插入和批量数据插入操作。1、先说明一下SinkFunction的继承关系
通过API我们可以看到sinkFunction接口的实现类有很多。比如常用的RichSinkFunction。
public class SinkToMysql extends RichSinkFunction> { public void invoke(Tuple2 value, Context context) throws Exception { conn = getConnection(); String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);"; ps = this.conn.prepareStatement(sql); ps.setString(1, value.f0); ps.setString(2, value.f0 + "1"); // ps.addBatch(); int resultBatch = ps.executeUpdate(); System.out.println("成功插入数据 " + resultBatch + "行"); this.close(); } }
上面的类实现RichSikFunction类,重写invoke方法,该方法用来执行与数据库的交互,插入数据 。该方法是上游有一条数据就会执行一次。实际开发的时候效率低。
3、MySQL 批量插入当程序到sink的时候,我们可以把上游的数据先积攒一段时间,在按批一起发送到sink的下游。
public class SinkToMysqlBatch implements SinkFunction> { PreparedStatement ps; Connection conn; @Override public void invoke(ArrayList value, Context context) throws Exception { conn = SinkToMysql.getConnection(); conn.setAutoCommit(false); String sql = "insert into dm_stu(stuname, stuaddr) values(?,?);"; ps = this.conn.prepareStatement(sql); for (String s: value) { ps.setString(1, s); ps.setString(2,s+1); ps.addBatch(); } int[] resultBatch = ps.executeBatch(); System.out.println("当前窗口插入数据:" + resultBatch.length+ " 行"); conn.commit(); try { if (ps != null) { ps.close(); } if (conn != null) { conn.close(); } }catch (Exception e) { e.printStackTrace(); } } }
上面的批量操作,自定义类实现了SinkFunction接口,重写了invoke方法。泛型是集合类型,表示上游传递过来的数据是一个集合。我们需要在invoke方法中去进行遍历。一个batch写入到mysql。
实际开发遇到的问题是
在这段代码中,addSink(中自定义的mysql批量类是继承了RichSinkFunction类,但是这边会报错说必须要实现了SinkFunction接口)。这边有疑问。



