栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据之flink案例

大数据之flink案例

一、关联mysql查询维度数据

地理位置信息:使用httpClient查询高德地图

DimDemo

package cn._51doit.flink.day05;

import cn._51doit.flink.day05.func.GeoRichMapFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


public class DimDemo {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator logBeanDataStream = lines.map(new MapFunction() {
            @Override
            public LogBean map(String value) throws Exception {
                LogBean bean = null;
                try {
                    bean = JSON.parseObject(value, LogBean.class);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return bean;
            }
        });

        SingleOutputStreamOperator filtered = logBeanDataStream.filter(e -> e != null);

        //关联维度信息
        SingleOutputStreamOperator logBeanWithNameDataStream = filtered.map(new RichMapFunction() {

            private transient Connection connection;
            private transient PreparedStatement prepareStatement;

            @Override
            public void open(Configuration parameters) throws Exception {
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8", "root", "123456");
                prepareStatement = connection.prepareStatement("select id, name from tb_category where id = ?");
            }

            @Override
            public LogBean map(LogBean value) throws Exception {
                prepareStatement.setInt(1, value.cid);
                ResultSet resultSet = prepareStatement.executeQuery();
                String name = null;
                if (resultSet.next()) {
                    name = resultSet.getString(2);
                }
                resultSet.close();
                value.name = name;
                return value;
            }

            @Override
            public void close() throws Exception {
                if (prepareStatement != null) {
                    prepareStatement.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        });

        //查询经纬度,关联位置信息
        SingleOutputStreamOperator result = logBeanWithNameDataStream.map(new GeoRichMapFunction("4924f7ef5c86a278f5500851541cdcff"));

        result.print();

        env.execute();
    }
}

LogBean

package cn._51doit.flink.day05;

public class LogBean {

    public String oid;

    public Integer cid;

    public Double money;

    public Double longitude;

    public Double latitude;

    public String name;

    public String province;

    public String city;

    @Override
    public String toString() {
        return "LogBean{" +
                "oid='" + oid + ''' +
                ", cid=" + cid +
                ", money=" + money +
                ", longitude=" + longitude +
                ", latitude=" + latitude +
                ", name='" + name + ''' +
                ", province='" + province + ''' +
                ", city='" + city + ''' +
                '}';
    }
}

GeoRichMapFunction

package cn._51doit.flink.day05.func;

import cn._51doit.flink.day05.LogBean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

public class GeoRichMapFunction extends RichMapFunction {

    private String key;

    public GeoRichMapFunction(String key) {
        this.key = key;
    }

    private transient CloseableHttpClient httpclient;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpclient = HttpClients.createDefault();
    }

    @Override
    public LogBean map(LogBean bean) throws Exception {
        double longitude = bean.longitude;
        double latitude = bean.latitude;
        HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location="+ longitude+"," +latitude+ "&key=" + key);
        CloseableHttpResponse response = httpclient.execute(httpGet);
        try {
            //System.out.println(response.getStatusLine)
            HttpEntity entity = response.getEntity();
            // do something useful with the response body
            // and ensure it is fully consumed
            String province = null;
            String city = null;
            if (response.getStatusLine().getStatusCode() == 200) {
                //获取请求的json字符串
                String result = EntityUtils.toString(entity);
                //转成json对象
                JSONObject jsonObj = JSON.parseObject(result);
                //获取位置信息
                JSONObject regeocode = jsonObj.getJSONObject("regeocode");
                if (regeocode != null && !regeocode.isEmpty()) {
                    JSONObject address = regeocode.getJSONObject("addressComponent");
                    //获取省市区
                    bean.province = address.getString("province");
                    bean.city = address.getString("city");
                }
            }
        } finally {
            response.close();
        }
        return bean;
    }

    @Override
    public void close() throws Exception {
        if (httpclient != null) {
            httpclient.close();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734352.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号