一、关联mysql查询维度数据
地理位置信息:使用httpClient查询高德地图
DimDemo
package cn._51doit.flink.day05;
import cn._51doit.flink.day05.func.GeoRichMapFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class DimDemo {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource lines = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator logBeanDataStream = lines.map(new MapFunction() {
@Override
public LogBean map(String value) throws Exception {
LogBean bean = null;
try {
bean = JSON.parseObject(value, LogBean.class);
} catch (Exception e) {
e.printStackTrace();
}
return bean;
}
});
SingleOutputStreamOperator filtered = logBeanDataStream.filter(e -> e != null);
//关联维度信息
SingleOutputStreamOperator logBeanWithNameDataStream = filtered.map(new RichMapFunction() {
private transient Connection connection;
private transient PreparedStatement prepareStatement;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8", "root", "123456");
prepareStatement = connection.prepareStatement("select id, name from tb_category where id = ?");
}
@Override
public LogBean map(LogBean value) throws Exception {
prepareStatement.setInt(1, value.cid);
ResultSet resultSet = prepareStatement.executeQuery();
String name = null;
if (resultSet.next()) {
name = resultSet.getString(2);
}
resultSet.close();
value.name = name;
return value;
}
@Override
public void close() throws Exception {
if (prepareStatement != null) {
prepareStatement.close();
}
if (connection != null) {
connection.close();
}
}
});
//查询经纬度,关联位置信息
SingleOutputStreamOperator result = logBeanWithNameDataStream.map(new GeoRichMapFunction("4924f7ef5c86a278f5500851541cdcff"));
result.print();
env.execute();
}
}
LogBean
package cn._51doit.flink.day05;
public class LogBean {
public String oid;
public Integer cid;
public Double money;
public Double longitude;
public Double latitude;
public String name;
public String province;
public String city;
@Override
public String toString() {
return "LogBean{" +
"oid='" + oid + ''' +
", cid=" + cid +
", money=" + money +
", longitude=" + longitude +
", latitude=" + latitude +
", name='" + name + ''' +
", province='" + province + ''' +
", city='" + city + ''' +
'}';
}
}
GeoRichMapFunction
package cn._51doit.flink.day05.func; import cn._51doit.flink.day05.LogBean; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; public class GeoRichMapFunction extends RichMapFunction{ private String key; public GeoRichMapFunction(String key) { this.key = key; } private transient CloseableHttpClient httpclient; @Override public void open(Configuration parameters) throws Exception { httpclient = HttpClients.createDefault(); } @Override public LogBean map(LogBean bean) throws Exception { double longitude = bean.longitude; double latitude = bean.latitude; HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location="+ longitude+"," +latitude+ "&key=" + key); CloseableHttpResponse response = httpclient.execute(httpGet); try { //System.out.println(response.getStatusLine) HttpEntity entity = response.getEntity(); // do something useful with the response body // and ensure it is fully consumed String province = null; String city = null; if (response.getStatusLine().getStatusCode() == 200) { //获取请求的json字符串 String result = EntityUtils.toString(entity); //转成json对象 JSONObject jsonObj = JSON.parseObject(result); //获取位置信息 JSONObject regeocode = jsonObj.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //获取省市区 bean.province = address.getString("province"); bean.city = address.getString("city"); } } } finally { response.close(); } return bean; } @Override public void close() throws Exception { if (httpclient != null) { httpclient.close(); } } }



