1、pom.xml
注意:
datahub-connector
此依赖需要手动加载到maven仓库,具体方式参见:读取DataHub数据示例 - 实时计算Flink版 - 阿里云
也可以参考本人小记:
Blink-DataHub connector Maven依赖转化_大数据00的博客-CSDN博客
4.0.0 com.alibaba.blink blink-udx-3.x1.0-SNAPSHOT 2.11.12 2.11 blink-3.3.0 1.8 1.8 1.8 com.alibaba.blink flink-core${blink.version} provided com.alibaba.blink flink-streaming-java_${scala.binary.version}${blink.version} provided com.alibaba.blink flink-table_2.11${blink.version} provided junit junit4.12 test org.scala-lang scala-library2.11.12 provided com.aliyun.datahub aliyun-sdk-datahub2.12.2-public provided org.slf4j slf4j-apiorg.slf4j slf4j-log4j12org.slf4j jcl-over-slf4jorg.slf4j jul-slf4jlog4j log4jcom.alibaba.flink datahub-connector0.1-SNAPSHOT jar-with-dependencies org.postgresql postgresql42.1.1 org.apache.maven.plugins maven-assembly-plugin3.1.1 com.alibaba.blink.demo.stream_demo jar-with-dependencies make-assembly package single
2、开发入口类
package com.alibaba.blink.demo;
import com.alibaba.blink.mojo.Record;
import com.alibaba.blink.sink.HoloSink;
import com.alibaba.flink.connectors.datahub.datastream.source.DatahubSourceFunction;
import com.aliyun.datahub.client.model.RecordEntry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
public class stream_demo {
private static String endPoint = "your endpoint";//内网访问。
//private static String endPoint ="public endpoint";//公网访问(填写内网Endpoint,就不用填写公网Endpoint)。
private static String projectName = "your projectName ";
private static String topicSourceName = "your topicSourceName";
private static String accessId = "your accessId ";
private static String accessKey = "your accessKey";
private static Long datahubStartInMs = 0L;//设置消费的启动位点对应的时间。
private static Long datahubEndInMs=Long.MAX_VALUE;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource> listDataStreamSource = env.addSource(
new DatahubSourceFunction(endPoint,
projectName,
topicSourceName,
accessId,
accessKey,
datahubStartInMs,
datahubEndInMs,
20L,
1000L,
1000));
SingleOutputStreamOperator result = listDataStreamSource
.flatMap((FlatMapFunction, Record>) (ls, collector) -> {
for (RecordEntry recordEntry : ls) {
Record record = new Record(recordEntry);
collector.collect(record);
}
}).returns(Record.class)
.filter(s -> (s.getoneid() != null))
.filter(s -> (s.getEvent() != null))
;
// result.print();
// .addSink((SinkFunction)new HoloSink())
// .setParallelism(2);
result.addSink(new HoloSink()).setParallelism(10);
env.execute();
}
}
3、开发自定义HoloSink
package com.alibaba.blink.sink; import com.alibaba.blink.mojo.Record; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.UUID; public class HoloSink extends RichSinkFunction{ private static String url="jdbc:postgresql://ip:port/database?tcpKeepAlive=true"; private static String username = "username "; private static String password = "password "; private static String postgresdriver = "org.postgresql.Driver"; private Connection connection; private ThreadLocal pstmt; private ThreadLocal querymt; private ThreadLocal updatemt; private Connection getConnection() { Connection conn = null; try { Class.forName(postgresdriver); conn = DriverManager.getConnection(url, username, password); } catch (Exception e) { e.printStackTrace(); } return conn; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.connection = getConnection(); this.pstmt = new ThreadLocal<>(); this.querymt = new ThreadLocal<>(); this.updatemt = new ThreadLocal<>(); } @Override public void invoke(Record record, Context context) throws Exception { if ( null == record || record.getoneid() == null) { System.out.println("record is null!!!"); return; } //哪个线程空着呢 用哪个线程 if (this.querymt.get() == null) this.querymt.set(this.connection.prepareStatement("select oneid,event from holo_sink where oneid=?")); if (this.updatemt.get() == null) this.updatemt.set(this.connection.prepareStatement("update holo_sink set event=? where oneid=?")); if (this.pstmt.get() == null) this.pstmt.set(this.connection.prepareStatement("insert into holo_sink(oneid,event) values (?,?)")); ((PreparedStatement)this.querymt.get()).setString(1, record.getoneid()); ResultSet resultSet = ((PreparedStatement)this.querymt.get()).executeQuery(); // System.out.println(resultSet.getFetchSize()); //如果oneid存在 即更新 if ( resultSet.next() ) { ((PreparedStatement)this.updatemt.get()).setString(1, record.getEvent()); ((PreparedStatement)this.updatemt.get()).setString(2, record.getoneid()); ((PreparedStatement)this.updatemt.get()).executeUpdate(); System.out.println("update " + record.toString() + ",threadId:" + Thread.currentThread().getId()); // oneid不存在就插入 并赋值新的oneid } else { ((PreparedStatement)this.pstmt.get()).setString(1, UUID.randomUUID().toString()); ((PreparedStatement)this.pstmt.get()).setString(2, record.getEvent()); ((PreparedStatement)this.pstmt.get()).executeUpdate(); System.out.println("insert " + record.toString() + ",threadId:" + Thread.currentThread().getId()); } } @Override public void close() throws Exception { super.close(); if (this.pstmt.get() != null) ((PreparedStatement)this.pstmt.get()).close(); if (this.querymt.get() != null) ((PreparedStatement)this.querymt.get()).close(); if (this.updatemt.get() != null) ((PreparedStatement)this.updatemt.get()).close(); if (this.connection != null) this.connection.close(); } }
4、自定义mojo类
package com.alibaba.blink.mojo;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
public class Record {
private String oneid;
private String event;
public Record(RecordEntry recordEntry) {
this.oneid = getString(recordEntry, "oneid");
this.event = getString(recordEntry, "event");
}
private String getString(RecordEntry recordEntry, String field) {
Object o = ((TupleRecordData)recordEntry.getRecordData()).getField(field);
if (o == null)
return "null";
return o.toString();
}
public String getoneid() {
return oneid;
}
public void setoneid(String oneid) {
this.oneid = oneid;
}
public String getEvent() {
return event;
}
public void setEvent(String event) {
this.event = event;
}
public Record(String oneid, String event) {
this.oneid = oneid;
this.event = event;
}
public Record() {
}
@Override
public String toString() {
return "Record{" +
"oneid='" + oneid + ''' +
", event='" + event + ''' +
'}';
}
}
5、辅助类 DataHub Writer
package com.alibaba.blink.datahub;
import com.alibaba.blink.utils.producer_with_random;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.AliyunAccount;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.PutRecordsResult;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.TupleRecordData;
import java.util.ArrayList;
import java.util.List;
public class datahubwriter {
public static void main(String[] args) throws InterruptedException {
// Endpoint以Region: 华东1为例,其他Region请按实际情况填写
String endpoint = "endpoint ";
String accessId = "accessId ";
String accessKey = "accessKey ";
// 创建DataHubClient实例
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
// 是否开启二进制传输,服务端2.12版本开始支持
new AliyunAccount(accessId, accessKey), true))
//专有云使用出错尝试将参数设置为 false
// HttpConfig可不设置,不设置时采用默认值
.setHttpConfig(new HttpConfig()
.setCompressType(HttpConfig.CompressType.LZ4) // 读写数据推荐打开网络传输 LZ4压缩
.setConnTimeout(10000))
.build();
String project = "projectName";
String topic = "topic ";
int retryTimes = 10;
tupleExample(datahubClient,project,topic,retryTimes);
}
// 写入Tuple型数据
public static void tupleExample(DatahubClient datahubClient,String project,String topic,int retryTimes) throws InterruptedException {
// 获取schema
RecordSchema recordSchema = datahubClient.getTopic(project,topic ).getRecordSchema();
// 生成100条数据
List recordEntries = new ArrayList<>();
for (int i = 0; i < 100; ++i) {
RecordEntry recordEntry = new RecordEntry();
// 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
recordEntry.addAttribute("key2", "value2");
if(i%10==0) {
Thread.sleep(1000);
}
String records = producer_with_random.get_records();
String v = "" +Math.random() * 100 + 1+ System.currentTimeMillis() ;
System.out.println(records +"=>"+ v);
TupleRecordData data = new TupleRecordData(recordSchema);
data.setField("oneid", records);
data.setField("event", v);
recordEntry.setRecordData(data);
boolean add = recordEntries.add(recordEntry);
if (add==true){
System.out.println("数据生成成功!!!!");
}
}
try {
PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
int i = result.getFailedRecordCount();
if (i > 0) {
retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
}
} catch (DatahubClientException e) {
System.out.println("requestId:" + e.getRequestId() + "tmessage:" + e.getErrorMessage());
}
}
//重试机制
public static void retry(DatahubClient client, List records, int retryTimes, String project, String topic) {
boolean suc = false;
while (retryTimes != 0) {
retryTimes = retryTimes - 1;
PutRecordsResult recordsResult = client.putRecords(project, topic, records);
if (recordsResult.getFailedRecordCount() > 0) {
retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
}
suc = true;
break;
}
if (!suc) {
System.out.println("retryFailure");
}
}
}
6、方法类
package com.alibaba.blink.utils;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class producer_with_random {
static Map map = new HashMap(){
{ put(1,"0000048e-fb42-4898-ad1c-4b2aca517165");
put(2,"00000842-448e-415b-808a-1f3682c64b1b");
put(3,"00000d2d-7c7b-46b7-bd99-2e224cd194f9");
put(4,"0000110b-efb9-4d03-9438-f4e58dbd6f11");
put(5,"00001245-aa6c-4ddc-94be-b99ee7b51deb");
put(6,"00001406-c496-4bdf-9fbc-76bbbc053423");
put(7,"00001755-c010-49ef-b063-313bf0833fc1");
put(8,"00001a92-8c10-4a73-9480-13941e2483db");
put(9,"00001b17-8fff-440f-b9eb-5c9a5477d533");
put(10,"00001d09-66ad-4ad7-a042-c8bdc578676d");
put(11,"00001d67-d753-43d9-a087-2556548ca7c3");
put(12,"00001da8-0290-4bed-8872-8a014543e91d");
put(13,"00001f8d-6aa6-4483-9c75-b4c98c2d05cb");
put(14,"000022b8-623d-404b-9a56-4f9243adc32c");
put(15,"000023f4-c9e5-43fd-aa33-56551d6aebee");
put(16,"00002651-df13-4cb1-a107-93623e18734c");
put(17,"000033a1-6a4f-4b95-ae5a-40b7f8616498");
put(18,"00003d5f-6360-4344-94c1-d67bedbd1709");
put(19,"00003dcc-5b4c-4519-af41-484704079680");
put(20, String.valueOf(UUID.randomUUID()));
}
};
public static String get_records(){
int random = (int)(Math.random()*21+1);
System.out.println(random);
return (String) map.get(random);
}
public static void main(String[] args) {
System.out.println(get_records());
}
}
7、打包上传
8、创建DataStream任务
9、任务详情
--完整主类名,必填,例如com.alibaba.realtimecompute.DatastreamExample
blink.main.class=com.alibaba.blink.demo.stream_demo
--包含完整主类名的JAR包资源名称,多个JAR包时必填,例如blink_datastream.jar
--blink.main.jar=${完整主类名jar包的资源名称}
--默认state backend配置,当作业代码没有显式声明时生效
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
--默认Checkpoint配置,当作业代码没有显式声明时生效
blink.checkpoint.interval.ms=180000
--默认启用项目参数
--disable.project.config=false
--设置自定义参数,代码中获取自定义参数的方法请参考如下链接:
--https://help.aliyun.com/document_detail/127758.html?spm=a2c4g.11174283.6.677.61fb1e49NJoWTR
其他可选择设置项
3.2及以上版本开启window miniBatch方法(3.2及以上版本默认不开启window miniBatch)。 sql.exec.mini-batch.window.enabled=true -- excatly-once语义。 blink.checkpoint.mode=EXACTLY_onCE -- checkpoint间隔时间,单位毫秒。 blink.checkpoint.interval.ms=180000 blink.checkpoint.timeout.ms=600000 -- 实时计算Flink版2.0及以上版本使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒。 state.backend.type=niagara state.backend.niagara.ttl.ms=129600000 -- 实时计算Flink版2.0及以上版本开启5秒的microbatch(窗口函数不需要设置该参数)。 blink.microBatch.allowLatencyMs=5000 -- 表示整个Job允许的延迟。 blink.miniBatch.allowLatencyMs=5000 -- 双流join节点优化参数。 blink.miniBatch.join.enabled=true -- 单个Batch的size。 blink.miniBatch.size=20000 -- local优化,实时计算Flink版2.0及以上版本默认已经开启,1.6.4版本需要手动开启。 blink.localAgg.enabled=true -- 实时计算Flink版2.0及以上版本开启partial优化,解决count distinct效率低问题。 blink.partialAgg.enabled=true -- union all优化。 blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true -- GC优化(源表为SLS时,不能设置该参数)。 blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4' -- 时区设置。 blink.job.timeZone=Asia/Shanghai
10、提交发布 启动
11、资源文件
alibaba-flink-connectors-flink-1.5.2-compatible.zip-flink文档类资源-CSDN下载
blink_udx_3x-master.zip-flink文档类资源-CSDN下载
datahub-demo-master.zip-flink文档类资源-CSDN下载



