顺应近年来,科学社会的不断发展,人们对于海量数据的挖掘以及实际运用以及重视,互联网是面向全社会公众信息交流的平台,已经成为了收集信息的最佳最有效率的渠道并成为当前主流。伴随大数据技术的创新与实际应用,进一步为人民进行大数据统计分析提供了极大的便利。
大数据信息的统计分析可为企业决策者提供充实可行的依据支持,例如:通过对于移动APP的下载数据量进行统计分析,可以得出应用程序的受欢迎程序,还可以通过访问的ip编号,区域,时间段,下载方式等,进行下一步更深层次的数据分析,为运营分析与推广决策提供强有力的数据支持。
本项目对于某个网站产生的流量日志数据进行统计分析,可以得出网站的日访问量,从而得出网站的欢迎程度,对于访问网站ip的地区统计分析,可以得出对于此网站,某地区的点击热度,并对其进行实现可视化界面等。
本项目的主要流程为:首先通过提供的ip日志数据,进行数据预处理,并将处理后的数据上传到分布式文件管理系统(HDFS)上,再导入到Hive数据库中,然后使用Navicat工具连接Mysql数据库,并使用Sqoop将数据导出,然后搭建SSM+Spring框架,自定义传导至前端的web接口,使用Tomcat、HTML语言、SQL语言等相关知识进行操作,最后实现数据的最终可视化界面。
数据预处理过程:
- 根据源数据的数据字段创建相应的属性变量,部分操作见图2;
图2 建立与源数据字段相对应的属性
(2) 重写toString()方法,使用Hive默认分隔符进行分隔,为后期导入Hive表提供便利、设置初始化方法(详见图3),加载网站需要分析的url分类数据,存储到MapTask的内存中,用来对日志数据进行过滤,如果用户请求的资源是以下列形式,就表示用户请求的是合法资源(详见图4)
图3 重写toString方法
图4 过滤日志数据及存储到MapTask内存
(3)数据预处理后效果展示(见图5)
部分核心代码:
(1)WebLogBean.java
package cn.edu.fjjxu.mr.bean;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WebLogBean implements Writable {
private Boolean valid = false;
private String remote_addr;
private String remote_user;
private String time_local;
private String requset;
private String status;
private String body_byes_sent;
private String http_referer;
private String http_user_agent;
public void setBean(boolean valid, String remote_addr,
String remote_user, String time_loacl,
String requset, String status, String body_byyes_sent,
String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_loacl;
this.requset = requset;
this.status = status;
this.body_byes_sent = body_byyes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public Boolean getValid() {
return valid;
}
public void setValid(Boolean valid) {
this.valid = valid;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequset() {
return requset;
}
public void setRequset(String requset) {
this.requset = requset;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_byes_sent() {
return body_byes_sent;
}
public void setBody_byes_sent(String body_byes_sent) {
this.body_byes_sent = body_byes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append(this.valid);
sb.append(" 01").append(this.getRemote_addr());
sb.append(" 01").append(this.getRemote_user());
sb.append(" 01").append(this.getTime_local());
sb.append(" 01").append(this.getRequset());
sb.append(" 01").append(this.getStatus());
sb.append(" 01").append(this.getBody_byes_sent());
sb.append(" 01").append(this.getHttp_referer());
sb.append(" 01").append(this.getHttp_user_agent());
return sb.toString();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeBoolean(this.valid);
dataOutput.writeUTF(null==remote_addr?"":remote_addr);
dataOutput.writeUTF(null==remote_user?"":remote_user);
dataOutput.writeUTF(null==time_local?"":time_local);
dataOutput.writeUTF(null==requset?"":requset);
dataOutput.writeUTF(null==status?"":status);
dataOutput.writeUTF(null==body_byes_sent?"":body_byes_sent);
dataOutput.writeUTF(null==http_referer?"":http_referer);
dataOutput.writeUTF(null==http_user_agent?"":http_user_agent);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.valid = dataInput.readBoolean();
this.remote_addr = dataInput.readUTF();
this.remote_user = dataInput.readUTF();
this.time_local = dataInput.readUTF();
this.requset = dataInput.readUTF();
this.status = dataInput.readUTF();
this.body_byes_sent = dataInput.readUTF();
this.http_referer = dataInput.readUTF();
this.http_user_agent = dataInput.readUTF();
}
}
(2)WebLogParser.java
package cn.itcast.mr.weblog.preprocess;
import cn.itcast.mr.weblog.bean.WebLogBean;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;
public class WebLogParser {
//定义时间格式
public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
public static WebLogBean parser(String line) {
WebLogBean webLogBean = new WebLogBean();
//把一行数据以空格字符切割并存入数组arr中
String[] arr = line.split(" ");
//如果数组长度小于等于11,说明这条数据不完整,因此可以忽略这条数据
if (arr.length > 11) {
//满足条件的数据逐个赋值给webLogBean对象
webLogBean.setRemote_addr(arr[0]);
webLogBean.setRemote_user(arr[1]);
String time_local = formatDate(arr[3].substring(1));
if(null==time_local || "".equals(time_local)) time_local="-invalid_time-";
webLogBean.setTime_local(time_local);
webLogBean.setRequest(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setBody_bytes_sent(arr[9]);
webLogBean.setHttp_referer(arr[10]);
//如果useragent元素较多,拼接useragent
if (arr.length > 12) {
StringBuilder sb = new StringBuilder();
for(int i=11;i= 400) {// 大于400,HTTP错误
webLogBean.setValid(false);
}
if("-invalid_time-".equals(webLogBean.getTime_local())){
webLogBean.setValid(false);
}
} else {
webLogBean=null;
}
return webLogBean;
}
//添加标识
public static void filtStaticResource(WebLogBean bean, Set pages) {
if (!pages.contains(bean.getRequest())) {
bean.setValid(false);
}
}
//格式化时间方法
public static String formatDate(String time_local) {
try {
return df2.format(df1.parse(time_local));
} catch (ParseException e) {
return null;
}
}
}
(3)WeblogPreProcess.java
package cn.itcast.mr.weblog.preprocess;
import cn.itcast.mr.weblog.bean.WebLogBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
public class WeblogPreProcess {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\HadoopTest\MapReduce\WebLog\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\HadoopTest\MapReduce\WebLog\output"));
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
public static class WeblogPreProcessMapper extends Mapper {
// 用来存储网站url分类数据
Set pages = new HashSet();
Text k = new Text();
NullWritable v = NullWritable.get();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
pages.add("/about");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据
String line = value.toString();
//调用解析类WebLogParser解析日志数据,最后封装为WebLogBean对象
WebLogBean webLogBean = WebLogParser.parser(line);
if (webLogBean != null) {
// 过滤js/图片/css等静态资源
WebLogParser.filtStaticResource(webLogBean, pages);
k.set(webLogBean.toString());
context.write(k, v);
}
}
}
}
- 数据分析
数据仓库中的数据分析与数据导出:
(1)插入项目操作所需的数据表,详见图6;
图6 hive数据源表
(2)把预处理结果上传到hadoop3:
(3)并在分布式文件管理系统(HDFS)上查看数据的上传情况,如图7所示;
图7 HDFS上传文件内容显示
(4)在hive中查看表中的结构以及插入的数据,查询结果如图8、图9、图10所示;
图8 ods_weblog_origin 部分数据
图9 dw_pvs_erverdays数据内容
图10 dw_avgpv_user_erverdays数据内容
(4) 使用Sqoop将hive的dw_avgpv_user_erverdays表导入到Mysql数据库中,导入代码如下;
sqoop export
--connect jdbc:mysql://hadoop01:3306/sqoopdb
--username root
--password 123456
--table t_pv_num
--columns
"dw_pvs_everyday,dw_pvs_everyday.month,dw_pvs_everyday.day"
--fields-terminated-by ' 01'
--export-dir /user/hive/warehouse/weblog.db/dw_pvs_everyday
(5)使用Navicat工具连接Mysql数据库,查看所导出的数据表,查看结果如图11所示;
图11 日平均PV量
(1)inedex.jsp(最近7天日平均PV量):(2)index.jsp(折线图): (3)index.jsp(饼状图):
- 数据可视化
SSM+Spring框架实现的数据可视化:
(1)通过IDEA创建相关目标项目,部分代码如图12所示;
图12 SSM+Spring框架部分代码展示
(2)借用Tomcat、HTML语言、SQL语言等相关知识实现目标操作;
- 通过数据来源dw_pvs_everday表获取访问次数总量
- 在hive数据仓库中建立临时表tmp用于存储目标数据
- 借助sqoop将hive中的tmp表导出到Mysql数据库中(如图15所示)
图15 日访问PV总量
结果可视化(如图16):
图16 日PV总量图
部分核心代码:
(1)TpvNumMapper.xml:
parameterType="String"> select * from t_pv_num where dateStr between #{0} and #{1} order by dateStr asc;
(2)IndexController.java:
package cn.itcast.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import cn.itcast.service.PvService;
@Controller
public class IndexController {
@Autowired
private PvService pvService;
@RequestMapping("/index")
public String showIndex() {
return "index";
}
@RequestMapping(value = "/ PvNum", produces = "application/json;charset=UTF-8")
@ResponseBody
public String getChart() {
System.out.println("获取pv数据..");
String data = pvService.get PvNumByDates("2013-09-18", "2013-09-24");
return data;
}
}
- 首先根据数据源表清洗出所有的记录的IP插入到hive仓库的临时表中;
- 借用sqoop将临时表中的数据导出到Mysql数据库中,如图17所示;
图17 清洗出IP数据表
- 使用Python访问IP定位的API接口对Mysql中的IP记录进行位置定位,获取出每个IP对应的位置信息相关操作如图18所示(API接口涉及个人隐私不方便显示);
图18 定位IP位置
- 统计中国省份的访问总量并通过自定义SSM框架接口,实现可视化展示到项目页面的操作,具体操作如图19、图20、图21;
图19 自定义框架传导操作1
图20 自定义框架传导操作2
图21 自定义框架传导操作3
结果可视化(如图22、图23):
图22 ip归属地区总量图1
图23 ip归属地区总量图2
部分核心代码:
(1)IndexController.java:
@RequestMapping(value = "/LocationPvnum", produces = "application/json;charset=UTF-8")
@ResponseBody
public String getLocationPvnum(){
System.out.println("获取地区访问数据..");
String data = lpnService.getTLocationPvNum();
return data;
}
}
(2)TLocationPvNumMapper.xml:
(3)LocationPvNumToPageBean.java:
package cn.itcast.pojo;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class LocationPvNumToPageBean {
private List datas;
public List getDatas() {
return datas;
}
public void setDatas(List datas) {
this.datas = datas;
}
}
(4)TLocationPvNum.java
package cn.itcast.pojo;
public class TLocationPvNum {
private String province;
private Integer num;
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
}
(5)ip定位操作:
import csv
import requests
import pymysql
import time
def get_data():
# 连接数据库
db = pymysql.connect(host="hadoop01", user='root', password="123456", database="sqoopdb")
# 使用cursor()方法创建一个游标对象
cursor = db.cursor()
# 使用execute()方法执行SQL语句
cursor.execute('SELECT * FROM ip_total')
# 使用fetall()获取全部数据
data = cursor.fetchall()
# 打印获取到的数据
ip = data
# 关闭游标和数据库的连接
cursor.close()
db.close()
return ip
def get_location(ip):
//此处api地址为个人隐私不方便透露出处,所以进行删除处理
l_location = list()
t = 0
for i in ip:
t += 1
if t == 1000:
print("time sleep!!1")
t = 0
time.sleep(10)
print(l_location)
try:
r = requests.get(s + i[0])
l_location.append(r.text.split(',')[0][8:-1])
except Exception as e:
print(e)
dic = dict()
for i in l_location:
dic[i] = l_location.count(i)
return dic
def write_data(d):
with open("1.csv", "a", encoding="utf-8", newline='') as csv_file:
writer = csv.writer(csv_file)
for key, value in d.items():
writer.writerow([key, value])
if __name__ == '__main__':
data = get_data()
start = time.time()
write_data(get_location(data))
end = time.time()
print(end - start)
- 根据之前所统计出的地区次数,借助SQL语句查询出数量前四的地区并将剩下的地区访问量进行统计
- 根据查询的结果并将其进行可视化操作,最终以饼状图形式展现
结果可视化(如图24):
图24 地区Top4饼状图
- 首先根据数据源,借用mapreduce清洗并统计出每日对应的true、false状态各自的访问数量,预处理结果表数据展示如图25
图25 每日访问状态类型总量
- 在SSM框架中自定义传到到web前端的接口,具体实现如图26、图27、图28所示
图26 自定义框架传导操作1
图27 自定义框架传导操作2
图28 自定义框架传导操作3
- 以时间为横坐标,状态数量为纵坐标,绘制可视化图形,突出正确错误的访问次数(并提供web前端多种可视化图形展示类型具体如图29、图30所示)
图中虚线表示为平均值,图中红色方框内可以自由切换显示图形!
图29 切换以折线图来体现
图30 切换以柱状图来体现
(1)IndexController.java
@RequestMapping(value = "/TypeNum", produces = "application/json;charset=UTF-8")
@ResponseBody
public String getTypeNum() {
System.out.println("获取访问状态数据..");
String data = tyService.getTypeNumByDates("2013-09-18","2013-09-24");
return data;
}
(2)TTypeNumMapper.xml
select *
from t_typeNum_everyday
where dateStr between #{0} and #{1} order by dateStr asc;
(3)TTypeNum.java
package cn.itcast.pojo;
public class TTypeNum {
private String datestr;
private String true_num;
private String false_num;
public String getDatestr() {
return datestr;
}
public void setDatestr(String datestr) {
this.datestr = datestr;
}
public String getTrue_num() {
return true_num;
}
public void setTrue_num(String true_num) {
this.true_num = true_num;
}
public String getFalse_num() {
return false_num;
}
public void setFalse_num(String false_num) {
this.false_num = false_num;
}
}
(4)TypeNumToPageBean.java
package cn.itcast.pojo;
public class TypeNumToPageBean {
private String[] dates;
private String[] true_nums;
private String[] false_nums;
public String[] getDates() {
return dates;
}
public void setDates(String[] dates) {
this.dates = dates;
}
public String[] getTrue_nums() {
return true_nums;
}
public void setTrue_nums(String[] true_nums) {
this.true_nums = true_nums;
}
public String[] getFalse_nums() {
return false_nums;
}
public void setFalse_nums(String[] false_nums) {
this.false_nums = false_nums;
}
}
最终项目系统页面可视化结果(如图31所示):
图31 项目全局界面



