效果展示
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YYZMeccW-1640344966005)(/Users/lifuwei/Library/Application Support/typora-user-images/image-20211224170025068.png)]
我的大致思路是
1、写了一个udf函数,这个函数的作用就是把对应的ip转换成省份,这个主要参考的是老师给的ip.txt(里面有很多ip所对应的地址) 2、把这个udf函数放到hive中,然后对ip进行解析,然后对解析出来的ip进行分组,并count() 3、然后用脚本实时查询这个结果并把这个结果放到mysql数据中 4、通过azkaban定时的去调度这个脚本,实现实时功能
遇到的困难点
1、写ip地址的转换 2、为了进行实时的更新不受原数据的影响,我进行了hive中的查询结果的show先删除再进行建立,sqoop在导入数据到mysql中的show表的时候,我用的是直接覆盖原数据。 3、在进行动态进行的展示的Spring MVC页面中,如何实现动图的效果,(这里采用的是和老师上课的效果相一致,把一个list分成两个list,然后就可以实现动态的展示)
Spring MVC中的代码
dao层(有两个)
package com.qf.bigdata.dao;
import com.qf.bigdata.pojo.Show;
import java.util.List;
public interface ShowMapper {
List select();
}
pojo
package com.qf.bigdata.pojo;
public class Show {
private String province;
private Integer cnt;
public Show() {
}
public Show(String province, Integer cnt) {
this.province = province;
this.cnt = cnt;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public Integer getCnt() {
return cnt;
}
public void setCnt(Integer cnt) {
this.cnt = cnt;
}
@Override
public String toString() {
return "Show{" +
"province='" + province + ''' +
", cnt=" + cnt +
'}';
}
}
package com.qf.bigdata.pojo;
import java.util.List;
public class ShowVo {
private List province;
private List cnt;
public List getProvince() {
return province;
}
public void setProvince(List province) {
this.province = province;
}
public List getCnt() {
return cnt;
}
public void setCnt(List cnt) {
this.cnt = cnt;
}
public ShowVo() {
}
public ShowVo(List province, List cnt) {
this.province = province;
this.cnt = cnt;
}
@Override
public String toString() {
return "ShowVo{" +
"province=" + province +
", cnt=" + cnt +
'}';
}
}
service层
package com.qf.bigdata.service;
import com.qf.bigdata.pojo.ShowVo;
public interface ShowService {
ShowVo select();
}
package com.qf.bigdata.service.impl;
import com.qf.bigdata.dao.ShowMapper;
import com.qf.bigdata.pojo.Show;
import com.qf.bigdata.pojo.ShowVo;
import com.qf.bigdata.service.ShowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class ShowServiceImpl implements ShowService {
@Autowired
private ShowMapper showMapper;
@Override
public ShowVo select() {
//1、查询结果
List list = showMapper.select();
//2、重新封装结果到Vo中
ShowVo vo = new ShowVo();
ArrayList province = new ArrayList<>();
ArrayList cnts = new ArrayList<>();
for (Show show : list) {
province.add(show.getProvince());
cnts.add(show.getCnt());
}
vo.setProvince(province);
vo.setCnt(cnts);
return vo;
}
}
Controller层
package com.qf.bigdata.web.servlet;
import com.qf.bigdata.pojo.ShowVo;
import com.qf.bigdata.service.ShowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping("/show")
public class ShowController {
@Autowired
private ShowService showService;
@RequestMapping("/select")
@ResponseBody
public ShowVo select(){
return showService.select();
}
}
jsp
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
echarts
udf函数代码
package com.qf.bigdata
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo1 extends UDF {
def evaluate(ip:String) : String = {
val sc = new SparkContext(new SparkConf().setAppName("Demo1").setMaster("local[*]"))
val ipLogRDD: RDD[String] = sc.textFile("src/main/resources/ip.txt")
val province2IPRDD: RDD[(String, Long, Long)] = ipLogRDD.map(log => {
val fields: Array[String] = log.split("\|")
val startIP: Long = IPUtils.ip2Long(fields(0))
val endIP: Long = IPUtils.ip2Long(fields(1))
val province: String = fields(6)
(province, startIP, endIP)
})
//对ip进行排序
val ipArr: Array[(String, Long, Long)] = province2IPRDD.collect().sortWith {
case ((province1, startIP1, endIP1), (province2, startIP2, endIP2)) => {
startIP1 < startIP2
}
}
if(ip.isEmpty && ip==null){
return "浙江";
}else{
val provinceIP: String = IPUtils.ip2address(ip, ipArr)
provinceIP
}
}
}
package com.qf.bigdata
object IPUtils {
def ip2address(ip:String, ipDatabase:Array[(String, Long, Long)]):String = {
//1. 转换为十进制数字
val ipNum: Long = ip2Long(ip)
//2. 解决ipNum是否存在于ipDatabase中,如果存在,那么在ipDatabase的哪个index
val index = binarySearch(ipNum, ipDatabase)
//3. 获取到地址
if(index != -1) return ipDatabase(index)._1
else return ""
}
def ip2Long(ip:String):Long = {
//1. 切割ip字符串
val fields: Array[String] = ip.split("\.")
//2. 遍历数组就能获取每个元素
var ipNum = 0L
for (field <- fields) {
ipNum = field.toLong | ipNum << 8
}
ipNum
}
def binarySearch(ip:Long, ipDatabase:Array[(String, Long, Long)]): Int = {
var start = 0
var end = ipDatabase.length - 1
while (start <= end) {
val mid = (start + end) / 2 // 中间的索引
val startIp = ipDatabase(mid)._2
val endIp = ipDatabase(mid)._3
if (ip >= startIp && ip <= endIp) return mid // 说明正好在范围内,直接返回mid
else if (ip < startIp) end = mid - 1
else start = mid + 1
}
return -1
}
}
脚本
init.hql
-- 导入jar add jar /data/jars/ipparse.jar; create temporary function `ipparse` as 'com.qf.bigdata.Demo1'; -- 先删除show表,为了进行数据更新 drop table if exists `ods_news`.`show` -- 在hive中创建show表 create table if not exists `ods_news`.`show`( province string, cnt int ) row format delimited fields terminated by ' 01'; -- 插入数据 insert into `ods_news`.`show` select ipparse(ip), count(1) from news_parquet group by ipparse(ip);
init.sh
#!/bin/bash echo "正在初始化hive中的show表--------------" hive -f /data/scripts/init.hql echo "hive中show表初始化完成-------------" SQOOP_HOME=/opt/apps/sqoop-1.4.7 JDBC_URL=jdbc:mysql://10.31.162.72:3306/learning?characterEncoding=utf-8 USERNAME=root PASSWORD=li1579026891 echo "数据正在导出--------------" $SQOOP_HOME/bin/sqoop export --connect $JDBC_URL --username $USERNAME --password $PASSWORD --table show --export-dir /user/hive/warehouse/ods_news.db/show --columns province,cnt --update-mode allowinsert --input-fields-terminated-by ' 01' --num-mappers 1 echo "数据导出成功----------------"
azkaban的调度
init.flow
## filename : init.flow
## author : lifuwei
## date : 20211224
## version : 1.0
config:
param.script_path_prefix:/data/scripts
nodes:
# 开始节点
- name: START
type: noop
#将数据导入到mysql中
- name: SQOOP_MYSQL
type: command
dependsOn:
- START
config:
command: sh ${param.script_path_prefix}/init.sh
# 结束节点
- name: END
type: noop
dependsOn:
- SQOOP_MYSQL
init.project
#filename:news.project azkaban-flow-version: 2.0



