Flink自定义Sink系列之PHOENIX前言一、代码实现总结
前言
自定义一个Sink方法实现SinkFunction
一、代码实现public class HbaseSink总结extends RichSinkFunction { public Connection connection=null; @Override public void open(Configuration parameters) throws Exception { ParameterTool parameterTool= (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); String phoenix_driver = parameterTool.get("PHOENIX_DRIVER"); String phoenix_server = parameterTool.get("PHOENIX_SERVER"); String hbase_schema = parameterTool.get("Hbase_SCHEMA"); Class.forName(phoenix_driver); connection= DriverManager.getConnection(phoenix_server); connection.setSchema(hbase_schema); } @Override public void invoke(T value, Context context) throws Exception { //执行预编译sql PreparedStatement preparedStatement = connection.prepareStatement(""); //赋值 //preparedStatement.setObject(); //执行 preparedStatement.execute(); } }
很简单的一个Sink方法的实现的记录!



