hive做报表经常会有ip解析的需求,如解析ip为国家地区省份等作为大屏的地图展示。
方案hive udf配合免费的GeoLite2来实现,GeoLite2官网:https://dev.maxmind.com/geoip/geolite2-free-geolocation-data。
实现需要注意的是地址库GeoLite2-City.mmdb文件的存放位置,我看其他方案有放到hive classpath下,但是这种情况下使用hive on spark生成了spark job时会导致任务报FileNotFoundException,所以最好还是使用绝对路径。
package com.ms.hive.udf;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.record.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
@Slf4j
public class ParseIpUDF extends GenericUDF {
private static DatabaseReader reader = null;
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
if (objectInspectors.length != 1) {
throw new UDFArgumentException("参数个数应为1");
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
if (deferredObjects == null || deferredObjects.length == 0) {
return null;
}
String ip = deferredObjects[0].get().toString();
if (!StringUtils.isEmpty(ip)) {
return parseIp(ip.replaceAll(" ", ""));
}
return null;
}
@Override
public String getDisplayString(String[] strings) {
return "";
}
public String parseIp(String ip) {
try {
if (reader == null) {
// 需要将文件放在集群所有节点
File database = new File("/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/etc/hive/conf.dist/GeoLite2-City.mmdb");
try {
reader = new DatabaseReader.Builder(database).build();
} catch (IOException e) {
log.error("ParseIpUDF init reader err, {}", e.getMessage());
e.printStackTrace();
}
}
InetAddress ipAddress = InetAddress.getByName(ip);
CityResponse response = reader.city(ipAddress);
Subdivision subdivision = response.getMostSpecificSubdivision();
City city = response.getCity();
Country country = response.getCountry();
String c = country.getNames().get("zh-CN");
if (c.contains("台湾")) {
c = "中国台湾";
}
String provinceName = subdivision.getNames().get("zh-CN");
String cityName = city.getNames().get("zh-CN");
return c + "," + provinceName + "," + cityName;
} catch (Exception e) {
e.printStackTrace();
log.error("evaluate err", e);
}
return null;
}
}
上传jars和创建hive function
add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/lombok-1.18.16.jar; add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/maxmind-db-2.0.0.jar; add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/geoip2-2.15.0.jar; add jar hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/ms-hive-custom-1.0.1-SNAPSHOT.jar; create temporary function parse_ip as 'com.examole.hive.udf.ParseIpUDF' using jar 'hdfs://cdh01:8020/user/hive/warehouse/test.db/jars/ms-hive-custom-1.0.1-SNAPSHOT.jar';



