栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink自定义Sink系列2

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink自定义Sink系列2

Flink自定义Sink系列之JDBC

文章目录

Flink自定义Sink系列之JDBC前言一、代码如下总结


前言

使用flink的官方JDBC连接器创建数据库的连接Sink

一、代码如下

代码如下(示例):

public class JDBCSink {

    public  SinkFunction getSinkFunction(String sql){
        return JdbcSink.sink(sql,
                new JdbcStatementBuilder() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, T t) throws SQLException {
                        Class aClass = t.getClass();
                        Field[] fields = aClass.getDeclaredFields();
                        Object o=null;
                        for (int i = 0; i < fields.length; i++) {
                            try {
                                fields[i].setAccessible(true);
                                o = fields[i].get(t);
                            } catch (IllegalAccessException e) {
                                e.printStackTrace();
                            }
                            preparedStatement.setObject(i+1,o);
                        }
                    }
                }, JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build()
        ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
        .withDriverName("org.postgresql.Driver")
        .withUsername("someUser")
        .withPassword("somePassword")
        .build());
    }
}

注意这里可以点进去源码看到,后面的实例不是new出来的,而是通过Build来创建的,但是这两个Build是有区别的,一个是static final,一个是staic,通过static final修饰,其实原理是一样的都是通过内部类来实现的

public static class JdbcConnectionOptionsBuilder {
		private String url;
		private String driverName;
		private String username;
		private String password;

		public JdbcConnectionOptionsBuilder withUrl(String url) {
			this.url = url;
			return this;
		}

		public JdbcConnectionOptionsBuilder withDriverName(String driverName) {
			this.driverName = driverName;
			return this;
		}

		public JdbcConnectionOptionsBuilder withUsername(String username) {
			this.username = username;
			return this;
		}

		public JdbcConnectionOptionsBuilder withPassword(String password) {
			this.password = password;
			return this;
		}

		public JdbcConnectionOptions build() {
			return new JdbcConnectionOptions(url, driverName, username, password);
		}
	}

第二种

    public static Builder builder() {
		return new Builder();
	}

	
	public static final class Builder {
		private long intervalMs = DEFAULT_INTERVAL_MILLIS;
		private int size = DEFAULT_SIZE;
		private int maxRetries = DEFAULT_MAX_RETRY_TIMES;

		public Builder withBatchSize(int size) {
			this.size = size;
			return this;
		}

		public Builder withBatchIntervalMs(long intervalMs) {
			this.intervalMs = intervalMs;
			return this;
		}

		public Builder withMaxRetries(int maxRetries) {
			this.maxRetries = maxRetries;
			return this;
		}

		public JdbcExecutionOptions build() {
			return new JdbcExecutionOptions(intervalMs, size, maxRetries);
		}
	}

对于内部类的使用是通过
new 主类.内部类().xxxx这样去实例化的。

总结

以上就是对flink jdbcsink的简单记录!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/785716.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号