- 7-点击流数据分析项目-数据预处理
- 1.数据集介绍
- 原始数据样式:
- 待生成的页面点击流模型Pageviews表
- 待生成的点击流模型Visits表
- 数据清洗
- 2.采集日志数据到HDFS上
- 创建目录
- 编写脚本
- 脚本内容
- 给脚本添加执行权限
- 执行脚本
- 3.采用MR实现数据预处理-过滤静态资源
- 创建maven工程
- 编写日志的实体类
- 编写处理数据清洗的工具类
- 编写预处理Mapper类
- 编写预处理Driver类
- 4.采用MR实现数据预处理-获得页面流pageviews数据模型
- pageviews原理
- 创建页面流数据模型pageviews-Mapper类
- 创建页面流数据模型pageviews-Reducer类
- 创建页面流数据模型pageviews-Driver类
- 5.采用MR实现数据预处理-获得点击流访问表visits数据模型
- 点击流访问表visits原理
- 生成点击流访问数据
- 生成Visits访问数据MR-实体类PageViewsBean
- 生成Visits访问数据MR-实体类VisitBean
- 生成Visits访问数据MR-Mapper类ClickStreamVisitMapper
- 生成Visits访问数据MR-Reducer类ClickStreamVisitReducer
- 生成Visits访问数据MR-Driver类ClickStreamVisitDriver
- 导入HDFS
- 总结
数据集介绍见https://blog.csdn.net/m0_38139250/article/details/122181337
数据集下载地址:
https://download.csdn.net/download/m0_38139250/72088781
中,我们对项目做了大致的介绍,这里摘录数据的部分罗列如下:
原始数据样式:待生成的页面点击流模型Pageviews表194.237.142.21 - - [18/Sep/2021:06:49:18 +0000] “GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1” 304 0 “-” “Mozilla/4.0 (compatible;)”
session为一个用户,用户对网站的每次访问。
(按session聚集的页面访问信息)
这就是点击流模型。当WEB日志转化成点击流数据的时候,很多网站分析度量的计算变得简单了,这就是点击流的“魔力”所在。基于点击流数据我们可以统计出许多常见的网站分析度量
- 时间格式无效
- 响应状态码》400
- 静态页面删除(js、css资源)
# 创建 /sx/clickstream/process/input 目录 hadoop fs -mkdir -p /sx/clickstream/process/input编写脚本
touch mv2clickstreamprocessinput.sh脚本内容
#!/bin/bash
#
# ===========================================================================
# 程序名称:
# 功能描述: 移动文件到预处理工作目录
# 输入参数: 运行日期
# 目标路径: /sx/clickstream/process/input
# 数据源 : /home/ubuntu/Code/sx
# 代码审核:
# 修改人名:
# 修改日期:
# 修改原因:
# 修改列表:
# ===========================================================================
#lsn上传日志文件存放的目录
log_dir=/home/ubuntu/Code/sx
# 日志文件名字
log_name=access.log.fensi
#预处理程序的工作目录
log_pre_input=/sx/clickstream/process/input
#读取日志文件的目录,判断是否有需要上传的文件
#files=`hadoop fs -ls $log_dir | grep $day_01 | wc -l`
files=`ls $log_dir | wc -l`
hadoop fs -mkdir -p ${log_pre_input}
if [ $files -gt 0 ]; then
hadoop fs -put ${log_dir}/${log_name} ${log_pre_input}
echo "success moved ${log_dir}/${log_name} to ${log_pre_input} ....."
fi
给脚本添加执行权限
chmod u+x mv2clickstreamprocessinput.sh
chmod是权限管理命令change the permissions mode of a file的缩写。。
u代表所有者,x代表执行权限。 + 表示增加权限。
chmod u+x file.sh 就表示对当前目录下的file.sh文件的所有者增加可执行权限。
sh mv2clickstreamprocessinput.sh3.采用MR实现数据预处理-过滤静态资源 创建maven工程
修改pom文件
8 8 org.apache.hadoop hadoop-client 2.7.3 org.apache.hadoop hadoop-common 2.7.3 org.apache.hadoop hadoop-hdfs 2.7.3 org.apache.hadoop hadoop-mapreduce-client-core 2.7.3 org.apache.hbase hbase-client 1.1.2 org.apache.hbase hbase-server 1.1.2 org.apache.maven.plugins maven-jar-plugin 2.6 com.mystudy.hadoopPro.APP org.apache.maven.plugins maven-assembly-plugin 3.1.1 jar-with-dependencies UTF-8 make-assembly package single
回顾下原始数据:
编写日志的实体类194.237.142.21 - - [18/Sep/2021:06:49:18 +0000] “GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1” 304 0 “-” “Mozilla/4.0 (compatible;)”
edu.sx.clickstream.pre.WebLogBean
代码如下
package edu.sx.clickstream.pre;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class WebLogBean implements Writable {
private boolean valid = true;// 判断数据是否合法
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息
public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_local;
this.request = request;
this.status = status;
this.body_bytes_sent = body_bytes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return this.time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append(" 01").append(this.getRemote_addr());
sb.append(" 01").append(this.getRemote_user());
sb.append(" 01").append(this.getTime_local());
sb.append(" 01").append(this.getRequest());
sb.append(" 01").append(this.getStatus());
sb.append(" 01").append(this.getBody_bytes_sent());
sb.append(" 01").append(this.getHttp_referer());
sb.append(" 01").append(this.getHttp_user_agent());
return sb.toString();
}
@Override
public void readFields(DataInput in) throws IOException {
this.valid = in.readBoolean();
this.remote_addr = in.readUTF();
this.remote_user = in.readUTF();
this.time_local = in.readUTF();
this.request = in.readUTF();
this.status = in.readUTF();
this.body_bytes_sent = in.readUTF();
this.http_referer = in.readUTF();
this.http_user_agent = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.valid);
out.writeUTF(null==remote_addr?"":remote_addr);
out.writeUTF(null==remote_user?"":remote_user);
out.writeUTF(null==time_local?"":time_local);
out.writeUTF(null==request?"":request);
out.writeUTF(null==status?"":status);
out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
out.writeUTF(null==http_referer?"":http_referer);
out.writeUTF(null==http_user_agent?"":http_user_agent);
}
}
编写处理数据清洗的工具类
edu.sx.clickstream.pre.WebLogParser
代码如下:
package edu.sx.clickstream.pre;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;
public class WebLogParser {
public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
public static WebLogBean parser(String line) {
WebLogBean webLogBean = new WebLogBean();
//通过空格来对我们的数据进行切割,然后拼接字符串,将我们同一个字段里面的数据拼接到一起
//222.66.59.174 -- [18/Sep/2021:06:53:30 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0"
String[] arr = line.split(" ");
if (arr.length > 11) {
webLogBean.setRemote_addr(arr[0]);
webLogBean.setRemote_user(arr[1]);
//将我们的字符串转换成中文习惯的字符串
// [18/Sep/2021:06:52:32 +0000]
// 18/Sep/2021:06:52:32------》2021-09-18 06:52:32
String time_local = formatDate(arr[3].substring(1));
if(null==time_local || "".equals(time_local)) {
time_local="-invalid_time-";
}
webLogBean.setTime_local(time_local);
webLogBean.setRequest(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setBody_bytes_sent(arr[9]);
webLogBean.setHttp_referer(arr[10]);
//如果useragent元素较多,拼接useragent。
//数组长度大于12,说明我们的最后一个字段切出来比较长,我们把所有多的数据都塞到最后一个字段里面去
// "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; MDDR; InfoPath.2; .NET4.0C)"
if (arr.length > 12) {
StringBuilder sb = new StringBuilder();
for(int i=11;i= 400) {// 大于400,HTTP错误
webLogBean.setValid(false);
}
//如果获取时间没拿到,那么也是认为是无效的数据
if("-invalid_time-".equals(webLogBean.getTime_local())){
webLogBean.setValid(false);
}
} else {
//58.215.204.118 - - [18/Sep/2021:06:52:33 +0000] "-" 400 0 "-" "-"
//如果切出来的数组长度小于11个,说明数据不全,,直接丢掉
webLogBean=null;
}
return webLogBean;
}
public static void filtStaticResource(WebLogBean bean, Set pages) {
if (!pages.contains(bean.getRequest())) {
bean.setValid(false);
}
}
//格式化时间方法
public static String formatDate(String time_local) {
try {
return df2.format(df1.parse(time_local));
} catch (ParseException e) {
return null;
}
}
}
编写预处理Mapper类
edu.sx.clickstream.pre.WeblogPreProcessMapper
代码如下:
package edu.sx.clickstream.pre; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.HashSet; import java.util.Set; public class WeblogPreProcessMapper extends Mapper编写预处理Driver类{ // 用来存储网站url分类数据 Set pages = new HashSet (); Text k = new Text(); NullWritable v = NullWritable.get(); @Override protected void setup(Context context) throws IOException, InterruptedException { //定义一个集合 pages.add("/about"); pages.add("/black-ip-list/"); pages.add("/cassandra-clustor/"); pages.add("/finance-rhive-repurchase/"); pages.add("/hadoop-family-roadmap/"); pages.add("/hadoop-hive-intro/"); pages.add("/hadoop-zookeeper-intro/"); pages.add("/hadoop-mahout-roadmap/"); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //得到我们一行数据 String line = value.toString(); WebLogBean webLogBean = WebLogParser.parser(line); if (webLogBean != null) { // 过滤js/图片/css等静态资源 WebLogParser.filtStaticResource(webLogBean, pages); if (!webLogBean.isValid()) return; k.set(webLogBean.toString()); context.write(k, v); } } }
edu.sx.clickstream.pre.WeblogEtlPreProcessDriver
代码如下:
package edu.sx.clickstream.pre;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class WeblogEtlPreProcessDriver {
static {
try {
// 设置 HADOOP_HOME 目录
System.setProperty("hadoop.home.dir", "D:/hadoop");
// 加载库文件
System.load("D:/hadoop/bin/hadoop.dll");
} catch (UnsatisfiedlinkError e) {
System.err.println("Native code library failed to load.n" + e);
System.exit(1);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
FileInputFormat.addInputPath(job,new Path("file:///D:\hadoop\clickstreaminput1"));
// FileInputFormat.addInputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/input/2021-12-10/11-36_192.168.137.128"));
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job,new Path("file:///D:\hadoop\clickstreamoutput1"));
// FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/weblogPreOut"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setJarByClass(WeblogEtlPreProcessDriver.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
}
}
4.采用MR实现数据预处理-获得页面流pageviews数据模型
pageviews原理
在GA上,每个页面每次加载将被记为一次PV。举例来说,一次用户访问页面顺序为:页面A->页面B->页面A,然后离开了你的站点,那这次用户访问(Visits)的PV总计为3次。
edu.sx.clickstream.pageviews.ClickStreamMapper
代码:
package edu.sx.clickstream.pageviews; import edu.sx.clickstream.pre.WebLogBean; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ClickStreamMapper extends Mapper创建页面流数据模型pageviews-Reducer类{ Text k = new Text(); WebLogBean v = new WebLogBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(" 01"); if (fields.length < 9) return; //将切分出来的各字段set到weblogbean中 v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]); //只有有效记录才进入后续处理 if (v.isValid()) { //此处用ip地址来标识用户 //使用我们的IP作为我们的k2这样就可以标识出我们同一个IP的数据都会发送到同一个reduce当中去 k.set(v.getRemote_addr());//将我们的ip地址设置成我们的key2 context.write(k, v); } } }
edu.sx.clickstream.pageviews.ClickStreamReducer
代码:
package edu.sx.clickstream.pageviews; import edu.sx.clickstream.pre.WebLogBean; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; public class ClickStreamReducer extends Reducer创建页面流数据模型pageviews-Driver类{ Text v = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { ArrayList beans = new ArrayList (); //循环遍历V2,这里面装的,都是我们的同一个用的数据 for (WebLogBean bean : values) { // beans.add(bean); //为什么list集合当中不能直接添加循环出来的这个bean? //这里通过属性拷贝,每次new 一个对象,避免了bean的属性值每次覆盖 //这是涉及到java的深浅拷贝问题 WebLogBean webLogBean = new WebLogBean(); try { BeanUtils.copyProperties(webLogBean, bean); } catch(Exception e) { e.printStackTrace(); } //beans.add(bean); beans.add(webLogBean); } //按时间排序处理 //将bean按时间先后顺序排序,排好序之后,就计算这个集合当中下一个时间和上一个时间的差值 ,如 //如果差值小于三十分钟,那么就代表一次会话,如果差值大于30分钟,那么就代表多次会话 //将我们的weblogBean塞到一个集合当中,我们就可以自定义排序,对集合当中的数据进行排序 Collections.sort(beans, new Comparator () { @Override public int compare(WebLogBean o1, WebLogBean o2) { try { Date d1 = toDate(o1.getTime_local()); Date d2 = toDate(o2.getTime_local()); if (d1 == null || d2 == null) return 0; return d1.compareTo(d2); } catch (Exception e) { e.printStackTrace(); return 0; } } }); int step = 1; //定义一个uuid作为我们的session编号 String session = UUID.randomUUID().toString(); ///经过排序之后,集合里面的数据都是按照时间来排好序了 for (int i = 0; i < beans.size(); i++) { WebLogBean bean = beans.get(i); // 如果仅有1条数据,则直接输出 if (1 == beans.size()) { // 设置默认停留时长为60s v.set(session+" 01"+key.toString()+" 01"+bean.getRemote_user() + " 01" + bean.getTime_local() + " 01" + bean.getRequest() + " 01" + step + " 01" + (60) + " 01" + bean.getHttp_referer() + " 01" + bean.getHttp_user_agent() + " 01" + bean.getBody_bytes_sent() + " 01" + bean.getStatus()); context.write(NullWritable.get(), v); session = UUID.randomUUID().toString(); break; } // 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出 if (i == 0) { continue; } // 求近两次时间差 long timeDiff = 0; try { timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local())); } catch (ParseException e) { e.printStackTrace(); } // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息 if (timeDiff < 30 * 60 * 1000) { v.set(session+" 01"+key.toString()+" 01"+beans.get(i - 1).getRemote_user() + " 01" + beans.get(i - 1).getTime_local() + " 01" + beans.get(i - 1).getRequest() + " 01" + step + " 01" + (timeDiff / 1000) + " 01" + beans.get(i - 1).getHttp_referer() + " 01" + beans.get(i - 1).getHttp_user_agent() + " 01" + beans.get(i - 1).getBody_bytes_sent() + " 01" + beans.get(i - 1).getStatus()); context.write(NullWritable.get(), v); step++; }else { // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit v.set(session+" 01"+key.toString()+" 01"+beans.get(i - 1).getRemote_user() + " 01" + beans.get(i - 1).getTime_local() + " 01" + beans.get(i - 1).getRequest() + " 01" + (step) + " 01" + (60) + " 01" + beans.get(i - 1).getHttp_referer() + " 01" + beans.get(i - 1).getHttp_user_agent() + " 01" + beans.get(i - 1).getBody_bytes_sent() + " 01" + beans.get(i - 1).getStatus()); context.write(NullWritable.get(), v); // 输出完上一条之后,重置step编号 step = 1; session = UUID.randomUUID().toString(); } // 如果此次遍历的是最后一条,则将本条直接输出 if (i == beans.size() - 1) { // 设置默认停留市场为60s v.set(session+" 01"+key.toString()+" 01"+bean.getRemote_user() + " 01" + bean.getTime_local() + " 01" + bean.getRequest() + " 01" + step + " 01" + (60) + " 01" + bean.getHttp_referer() + " 01" + bean.getHttp_user_agent() + " 01" + bean.getBody_bytes_sent() + " 01" + bean.getStatus()); context.write(NullWritable.get(), v); } } } private String toStr(Date date) { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.format(date); } private Date toDate(String timeStr) throws ParseException { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.parse(timeStr); } private long timeDiff(String time1, String time2) throws ParseException { Date d1 = toDate(time1); Date d2 = toDate(time2); return d1.getTime() - d2.getTime(); } private long timeDiff(Date time1, Date time2) throws ParseException { // date 调用 getTime获取毫秒值 return time1.getTime() - time2.getTime(); } }
edu.sx.clickstream.pageviews.ClickStreamDriver
代码:
package edu.sx.clickstream.pageviews;
import edu.sx.clickstream.pre.WebLogBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class ClickStreamDriver {
static {
try {
// 设置 HADOOP_HOME 目录
System.setProperty("hadoop.home.dir", "D:/hadoop");
// 加载库文件
System.load("D:/hadoop/bin/hadoop.dll");
} catch (UnsatisfiedlinkError e) {
System.err.println("Native code library failed to load.n" + e);
System.exit(1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//
FileInputFormat.addInputPath(job,new Path("file:///D:\hadoop\clickstreamoutput1\part-m-00000"));
// FileInputFormat.addInputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/input/2021-12-10/11-36_192.168.137.128"));
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job,new Path("file:///D:\hadoop\clickstreamoutput2"));
//
job.setJarByClass(ClickStreamDriver.class);
job.setMapperClass(ClickStreamMapper.class);
job.setReducerClass(ClickStreamReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WebLogBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
执行结果会在D:hadoopclickstreamoutput2下
5.采用MR实现数据预处理-获得点击流访问表visits数据模型 点击流访问表visits原理输入数据为pageviews:
表示每个会话(用户)访问步骤
visits数据为:
表示每位用户的访问情况。
- 它以session作为key
- 记录当前session的进入页面的数据
- 记录当前session离开页面的数据
- 此数据是从页面点击流数据pageviews演变过来的。
实现数据格式原理
- 以session作为key
- 以step作为排序字段
- 访问页面数据=step等于1的数据
- 离开页面数据=session的页面数-1的数据
- 统计页面数=session访问页的size个数
创建实体类
edu.sx.clickstream.visits.PageViewsBean
代码:
package edu.sx.clickstream.visits;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PageViewsBean implements Writable {
private String session;
private String remote_addr;
private String timestr;
private String request;
private int step;
private String staylong;
private String referal;
private String useragent;
private String bytes_send;
private String status;
public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) {
this.session = session;
this.remote_addr = remote_addr;
this.useragent = useragent;
this.timestr = timestr;
this.request = request;
this.step = step;
this.staylong = staylong;
this.referal = referal;
this.bytes_send = bytes_send;
this.status = status;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getTimestr() {
return timestr;
}
public void setTimestr(String timestr) {
this.timestr = timestr;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public int getStep() {
return step;
}
public void setStep(int step) {
this.step = step;
}
public String getStaylong() {
return staylong;
}
public void setStaylong(String staylong) {
this.staylong = staylong;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public String getUseragent() {
return useragent;
}
public void setUseragent(String useragent) {
this.useragent = useragent;
}
public String getBytes_send() {
return bytes_send;
}
public void setBytes_send(String bytes_send) {
this.bytes_send = bytes_send;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.timestr = in.readUTF();
this.request = in.readUTF();
this.step = in.readInt();
this.staylong = in.readUTF();
this.referal = in.readUTF();
this.useragent = in.readUTF();
this.bytes_send = in.readUTF();
this.status = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(timestr);
out.writeUTF(request);
out.writeInt(step);
out.writeUTF(staylong);
out.writeUTF(referal);
out.writeUTF(useragent);
out.writeUTF(bytes_send);
out.writeUTF(status);
}
}
生成Visits访问数据MR-实体类VisitBean
创建实体类
edu.sx.clickstream.visits.VisitBean
代码:
package edu.sx.clickstream.visits;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class VisitBean implements Writable {
private String session;
private String remote_addr;
private String inTime;
private String outTime;
private String inPage;
private String outPage;
private String referal;
private int pageVisits;
public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
this.session = session;
this.remote_addr = remote_addr;
this.inTime = inTime;
this.outTime = outTime;
this.inPage = inPage;
this.outPage = outPage;
this.referal = referal;
this.pageVisits = pageVisits;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getInTime() {
return inTime;
}
public void setInTime(String inTime) {
this.inTime = inTime;
}
public String getOutTime() {
return outTime;
}
public void setOutTime(String outTime) {
this.outTime = outTime;
}
public String getInPage() {
return inPage;
}
public void setInPage(String inPage) {
this.inPage = inPage;
}
public String getOutPage() {
return outPage;
}
public void setOutPage(String outPage) {
this.outPage = outPage;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public int getPageVisits() {
return pageVisits;
}
public void setPageVisits(int pageVisits) {
this.pageVisits = pageVisits;
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.inTime = in.readUTF();
this.outTime = in.readUTF();
this.inPage = in.readUTF();
this.outPage = in.readUTF();
this.referal = in.readUTF();
this.pageVisits = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(inTime);
out.writeUTF(outTime);
out.writeUTF(inPage);
out.writeUTF(outPage);
out.writeUTF(referal);
out.writeInt(pageVisits);
}
@Override
public String toString() {
return session + " 01" + remote_addr + " 01" + inTime + " 01" + outTime + " 01" + inPage + " 01" + outPage + " 01" + referal + " 01" + pageVisits;
}
}
生成Visits访问数据MR-Mapper类ClickStreamVisitMapper
创建实体类
edu.sx.clickstream.visits.ClickStreamVisitMapper
代码:
package edu.sx.clickstream.visits; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ClickStreamVisitMapper extends Mapper生成Visits访问数据MR-Reducer类ClickStreamVisitReducer{ PageViewsBean pvBean = new PageViewsBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(" 01"); int step = Integer.parseInt(fields[5]); //(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status) //299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200 pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]); //以我们的session来作为我们的key2,相同session的页面访问记录都会到同一个reduce里面去,形成一个集合 k.set(pvBean.getSession()); context.write(k, pvBean); } }
创建实体类
edu.sx.clickstream.visits.ClickStreamVisitReducer
代码:
package edu.sx.clickstream.visits; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; public class ClickStreamVisitReducer extends Reducer生成Visits访问数据MR-Driver类ClickStreamVisitDriver{ @Override protected void reduce(Text session, Iterable pvBeans, Context context) throws IOException, InterruptedException { // 将pvBeans按照step排序 ArrayList pvBeansList = new ArrayList (); for (PageViewsBean pvBean : pvBeans) { PageViewsBean bean = new PageViewsBean(); try { BeanUtils.copyProperties(bean, pvBean); pvBeansList.add(bean); } catch (Exception e) { e.printStackTrace(); } } Collections.sort(pvBeansList, new Comparator () { @Override public int compare(PageViewsBean o1, PageViewsBean o2) { return o1.getStep() > o2.getStep() ? 1 : -1; } }); // 取这次visit的首尾pageview记录,将数据放入VisitBean中 VisitBean visitBean = new VisitBean(); // 取visit的首记录 visitBean.setInPage(pvBeansList.get(0).getRequest()); visitBean.setInTime(pvBeansList.get(0).getTimestr()); // 取visit集合当中末尾的记录即可 visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest()); visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr()); // visit访问的页面数 visitBean.setPageVisits(pvBeansList.size()); // 来访者的ip visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr()); // 本次visit的referal visitBean.setReferal(pvBeansList.get(0).getReferal()); visitBean.setSession(session.toString()); context.write(NullWritable.get(), visitBean); } }
创建实体类
edu.sx.clickstream.visits.ClickStreamVisitDriver
代码:
package edu.sx.clickstream.visits;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class ClickStreamVisitDriver {
static {
try {
// 设置 HADOOP_HOME 目录
System.setProperty("hadoop.home.dir", "D:/hadoop");
// 加载库文件
System.load("D:/hadoop/bin/hadoop.dll");
} catch (UnsatisfiedlinkError e) {
System.err.println("Native code library failed to load.n" + e);
System.exit(1);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// TextInputFormat.addInputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/pageViewOut"));
// TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.137.128:8020/data/weblog/preprocess/clickStreamVisit2"));
FileInputFormat.addInputPath(job,new Path("file:///D:\hadoop\clickstreamoutput2\part-r-00000"));
FileOutputFormat.setOutputPath(job,new Path("file:///D:\hadoop\clickstreamoutput3"));
//
job.setJarByClass(ClickStreamVisitDriver.class);
job.setMapperClass(ClickStreamVisitMapper.class);
job.setReducerClass(ClickStreamVisitReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PageViewsBean.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(VisitBean.class);
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
结果如下:
去除静态资源访问的预处理后的数据位于D:hadoopclickstreamoutput1part-m-00000,复制一份并重命名为clickstreamdata-pre
预处理后的pageviews模型的数据位于D:hadoopclickstreamoutput2part-r-00000,重命名为clickstreamdata-pageviews
预处理后的visits模型的数据位于D:hadoopclickstreamoutput3part-r-00000,重命名为clickstreamdata-visits
将上面三个文件上传到lsn中,,默认路径为/home/ubuntu/Code,上传到虚拟机后,再将文件上传到linux中的hdfs上,路径为/sx/cleandlog
hadoop fs -mkdir /sx/clickstream hadoop fs -put /home/ubuntu/Code/clickstreamdata-pre /sx/clickstream hadoop fs -put /home/ubuntu/Code/clickstreamdata-pageviews /sx/clickstream hadoop fs -put /home/ubuntu/Code/clickstreamdata-visits /sx/clickstream总结
本部分读取的日志数据后,进行预处理,获取pageviews数据模型,获取访问visits数据模型。下一步骤为将对应的结果传到hive中即可。



