目录
1.需求背景
2.解决方案
3.代码pom引用
4.代码集成UDF
5.编译jar包
6.使用说明
7.将jar包放到hdfs
8.创建持久化function
9.测试
10.结语
1.需求背景
用户行为埋点数据中存在经度、维度,需要通过hive分析数据所在地址信息。
2.解决方案
通过HIVE的自定义UDF函数,自己实现经纬度转换成地址信息集成百度接口查询。
3.代码pom引用
4.0.0
com.dd.xinwen.hive.udf
dd-hiveudf
1.0-SNAPSHOT
dd-hiveudf
cloudera
https://repository.cloudera.com/artifactory/cloudera-repos/
UTF-8
1.8
1.8
2.3.3
1.2.28
jar
org.apache.hive
hive-exec
${hive.version}
junit
junit
4.11
test
com.alibaba
fastjson
${fastjson.version}
org.apache.maven.plugins
maven-compiler-plugin
3.0
1.8
1.8
UTF-8
org.apache.maven.plugins
maven-shade-plugin
2.2
package
shade
*:*
meta-INFRSA
4.代码集成UDF
package com.dd.xinwen.hive.udf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
public class MapBaiduAddressUDF extends UDF {
public Text evaluate (String lat, String lng){
String s="";
if (null==lat || null==lng){
return null;
}
try {
s=getLocationByBaiduMap(lng, lat);
} catch (Exception e) {
e.printStackTrace();
}
return new Text(s);
}
public static void main(String[] args) {
String lat="31.931";
String lng="120.961";
Text evaluate = new MapBaiduAddressUDF().evaluate(lat,lng);
System.out.println(evaluate.toString());
}
public static String getLocationByBaiduMap(String longitude,String latitude) throws Exception {
String ak = "你的百度地图AK,申请一个吧";
String locJson = geturl("http://api.map.baidu.com/geoconv/v1/?coords=" + longitude + "," +latitude + "&from=1&to=5&ak=" + ak);
System.out.println(locJson);
JSonObject jobject = JSON.parseObject(locJson);
JSonArray jsonArray = jobject.getJSonArray("result");
String lat=jsonArray.getJSonObject(0).getString("y");
String lng=jsonArray.getJSonObject(0).getString("x");
//System.out.println(lat);
String addrJson = geturl("http://api.map.baidu.com/reverse_geocoding/v3/?ak="+ ak +"&location=" + lat + "," + lng + "&output=json&pois=1");
System.out.println(addrJson);
JSonObject jobjectaddr = JSON.parseObject(addrJson);
JSonObject rJsonObject = jobjectaddr.getJSonObject("result");
System.out.println(rJsonObject.getJSonObject("addressComponent").getString("city"));
System.out.println(rJsonObject.getJSonObject("addressComponent").getString("province"));
String addr=jobjectaddr.getJSonObject("result").getString("formatted_address");
return addr;
}
private static String geturl(String geturl) throws Exception {
//请求的webservice的url
URL url = new URL(geturl);
//创建http链接,得到connection对象
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
//设置请求的方法类型
httpURLConnection.setRequestMethod("POST");
//设置请求的内容类型
httpURLConnection.setRequestProperty("Content-type", "application/x-www-form-urlencoded");
//设置发送数据
httpURLConnection.setDoOutput(true);
//设置接受数据
httpURLConnection.setDoInput(true);
//发送数据,使用输出流
OutputStream outputStream = httpURLConnection.getOutputStream();
//发送的soap协议的数据
String content = "user_id="+ URLEncoder.encode("用户Id", "utf-8");
//发送数据
outputStream.write(content.getBytes());
//接收数据
InputStream inputStream = httpURLConnection.getInputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
StringBuffer buffer = new StringBuffer();
String line = "";
while ((line = in.readLine()) != null){
buffer.append(line);
}
String str = buffer.toString();
return str;
}
}
package com.dd.xinwen.hive.udf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
public class MapBaiduAddressUDF extends UDF {
public Text evaluate (String lat, String lng){
String s="";
if (null==lat || null==lng){
return null;
}
try {
s=getLocationByBaiduMap(lng, lat);
} catch (Exception e) {
e.printStackTrace();
}
return new Text(s);
}
public static void main(String[] args) {
String lat="31.931";
String lng="120.961";
Text evaluate = new MapBaiduAddressUDF().evaluate(lat,lng);
System.out.println(evaluate.toString());
}
public static String getLocationByBaiduMap(String longitude,String latitude) throws Exception {
String ak = "你的百度地图AK,申请一个吧";
String locJson = geturl("http://api.map.baidu.com/geoconv/v1/?coords=" + longitude + "," +latitude + "&from=1&to=5&ak=" + ak);
System.out.println(locJson);
JSonObject jobject = JSON.parseObject(locJson);
JSonArray jsonArray = jobject.getJSonArray("result");
String lat=jsonArray.getJSonObject(0).getString("y");
String lng=jsonArray.getJSonObject(0).getString("x");
//System.out.println(lat);
String addrJson = geturl("http://api.map.baidu.com/reverse_geocoding/v3/?ak="+ ak +"&location=" + lat + "," + lng + "&output=json&pois=1");
System.out.println(addrJson);
JSonObject jobjectaddr = JSON.parseObject(addrJson);
JSonObject rJsonObject = jobjectaddr.getJSonObject("result");
System.out.println(rJsonObject.getJSonObject("addressComponent").getString("city"));
System.out.println(rJsonObject.getJSonObject("addressComponent").getString("province"));
String addr=jobjectaddr.getJSonObject("result").getString("formatted_address");
return addr;
}
private static String geturl(String geturl) throws Exception {
//请求的webservice的url
URL url = new URL(geturl);
//创建http链接,得到connection对象
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
//设置请求的方法类型
httpURLConnection.setRequestMethod("POST");
//设置请求的内容类型
httpURLConnection.setRequestProperty("Content-type", "application/x-www-form-urlencoded");
//设置发送数据
httpURLConnection.setDoOutput(true);
//设置接受数据
httpURLConnection.setDoInput(true);
//发送数据,使用输出流
OutputStream outputStream = httpURLConnection.getOutputStream();
//发送的soap协议的数据
String content = "user_id="+ URLEncoder.encode("用户Id", "utf-8");
//发送数据
outputStream.write(content.getBytes());
//接收数据
InputStream inputStream = httpURLConnection.getInputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
StringBuffer buffer = new StringBuffer();
String line = "";
while ((line = in.readLine()) != null){
buffer.append(line);
}
String str = buffer.toString();
return str;
}
}
仿照实现另外2个java类即可MapBaiduCityUDF,MapBaiduProvinceUDF。
5.编译jar包
dd-hiveudf-1.0-SNAPSHOT.jar
大约85.5MB
6.使用说明
我们需要创建持久函数(Permanent Functions),并且使用hdfs上的jar。
生产环境建议此种方式,便于管理和使用。
7.将jar包放到hdfs
比较放到warehouse/dd/auxlib/dd-hiveudf-1.0-SNAPSHOT.jar
8.创建持久化function
在hive中执行
create function gotBaiduAddr as 'com.dd.xinwen.hive.udf.MapBaiduAddressUDF' USING JAR 'hdfs://xxx:4007/warehouse/dd/auxlib/dd-hiveudf-1.0-SNAPSHOT.jar'; create function gotBaiduCity as 'com.dd.xinwen.hive.udf.MapBaiduCityUDF' USING JAR 'hdfs://xxx:4007/warehouse/dd/auxlib/dd-hiveudf-1.0-SNAPSHOT.jar'; create function gotBaiduProvince as 'com.dd.xinwen.hive.udf.MapBaiduProvinceUDF' USING JAR 'hdfs://xxx:4007/warehouse/dd/auxlib/dd-hiveudf-1.0-SNAPSHOT.jar';
9.测试
在hive中执行
select gotbaiduaddr('31.931','120.961');
select gotbaiducity('31.931','120.961');
select gotbaiduprovince('31.931','120.961');
10.结语
创建临时函数(Temporary Functions)
官方参考:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateFunction
缺点:Temporary Functions只对当前 session(窗口)有效
示例:在Hive的Shell中执行
ADD JAR /home/hadoop/lib/g6-hadoop-udf.jar;
CREATE TEMPORARY FUNCTION sayHello AS 'com.ruozedata.hadoop.udf.HelloUDF';
11.遇到的问题问题1:hive创建永久函数失败
参考CSDN贴主教程打jar包即可
hive创建永久函数失败,Failed to register youmeng.finderrorcount using class com.jinghang.hive.MyCoustom_lkm0522的博客-CSDN博客



