flink elasticsearch sink 目前我这边电脑资源不够耍,写入虚拟机很慢数据有差异.
1.data 数据
sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718207,36.3 sensor_1,1547718209,32.8 sensor_1,1547718212,37.1 sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718207,36.3 sensor_1,1547718209,32.8 sensor_1,1547718212,37.1
2.pom.xml
org.apache.flink flink-connector-elasticsearch6_2.111.10.1
3.java代码
public class sink_es {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource txtSink = env.readTextFile("D:\ideaProject\flink-java\flink-java-api\src\main\resources\data.txt");
DataStream streamOperator = txtSink.map(new MapFunction() {
public SensorReading map(String line) throws Exception {
String[] split = line.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
}
});
streamOperator.print();
//定义 httphost
ArrayList list = new ArrayList();
list.add(new HttpHost("192.168.174.204",9200));
list.add(new HttpHost("192.168.174.205",9200));
ElasticsearchSink readingElasticsearchSink = new ElasticsearchSink.Builder(list, new ElasticsearchSinkFunction() {
public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
//定义写入的数据source
HashMap map = new HashMap();
map.put("id", sensorReading.getId());
map.put("temp", sensorReading.getTemperature().toString());
map.put("ts", sensorReading.getTimestamp().toString());
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest().index("book").type("serson").source(map);
//用 requestIndexer 发送最后的请求
requestIndexer.add(indexRequest);
}
}).build();
streamOperator.addSink(readingElasticsearchSink);
env.execute();
}
}



