某大型电商公司从后台服务器收集到30W条的日志用户行为数据,经过数据初步清洗得到数据如下表sale_user.zip,假如你是该公司一员开发工程师,需要你利用大数据集群为公司存储、计算、分析这些数据,并给出分析结果。需求如下:
1.在Linux系统下将用户行为数据sale_user.zip将解压(解压后文件为sale_user.csv)。(8分)
采用MR程序完成数据清洗:
a) 将数据集中的标题行删除;
b) 在数据集中添加一个表示省份的列province,同时为每一行在该列上生成一个随机省份值,如"北京",“上海”,“广州”,“深圳”;(2分)
c) 将数据集中的time字段中的小时数字去掉,只保留年-月-日;(2分)
d) 输出路径为hdfs上:hdfs://master:9000/员工姓名,例如zhangsan (root用户需要给输出目录赋予权限,参考命令如:hadoop fs -chmod -R 777 /员工姓名 )。(2分)
2.Hive****数据分析。
a) 创建hive外部表user_action_external_hive,location指向第一步处理后数据的存储目录hdfs://master:9000/员工姓名;(3分)
b) 使用Hive分析统计不同地区的用户在网站上各种行为的次数,即浏览总次数、加入购物车总次数、收藏总次数、购买总次数,并将结果写入一个新建的hive表,表名为user_action_stat。(5分)
3.将数据从Hive导出到元数据库MySQL。
a) 在MySQL中创建一个以员工名(如zhangsan)命名的数据库,在该库中创建一个表action_stat。利用Sqoop工具,将Hive中的user_action_stat中数据导入action_stat表中;(6分)
b) 在MySQL中,查询action_stat表中前10条记录。(2分)
提示:
sale_user.zip部分数据如下(具体样本数据,考试时发放),共5个字段:
| user_id | item_id | behavior_type | item_category | time |
|---|---|---|---|---|
| 10001082 | 285259775 | 1 | 4076 | 2014-12-08 18 |
| 10001082 | 4368907 | 1 | 5503 | 2014-12-12 12 |
| 10001082 | 4368907 | 1 | 5503 | 2014-12-12 12 |
| 10001082 | 53616768 | 1 | 9762 | 2014-12-02 15 |
数据集结构为:
user_id:用户id
item_id:商品id
behavior_type:用户行为类型,包括浏览、收藏、加购物车、购买,对应值分别为1,2,3,4。
item_category:商品分类。
time:用户操作时间(格式为:年-月-日 小时)。
数据源:「hive数据分析—电商数据分析」https://www.aliyundrive.com/s/4UaUqNVaoXR
项目实施:先将源数据进行解压处理,将文件放到要执行的文件位置。
MR程序代码: mapper部分package com.cqcvc.hive; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class LMapper extends Mapperreduce部分:{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { // 设置偏移量第一行为0则直接不等于0 if (key.get() != 0) { context.write(value, NullWritable.get()); } } }
package com.cqcvc.hive; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class LReduce extends ReducerDriver部分:{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { String s = key.toString(); // 添加随机数据:建立数组然后使用random()函数获取随机下标添加到数据集的后面去。 String[] provinces = { "北京", "上海", "广州", "深圳" }; String[] arr = s.split(","); String time = arr[4]; // 修改时间以及添加provinces数据 try { SimpleDateFormat d1 = new SimpleDateFormat("yyyy-mm-dd hh"); Date dd = d1.parse(time.toString()); SimpleDateFormat d2 = new SimpleDateFormat("yyyy年mm月dd日"); String dd1 = d2.format(dd); String k = arr[0] + "," + arr[1] + "," + arr[2] + "," + arr[3] + "," + dd1 + ","+ provinces[(int) (Math.random() * 4)]; context.write(new Text(k), NullWritable.get()); } catch (ParseException e) { e.printStackTrace(); } } }
package com.cqcvc.hive;
import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException,
InterruptedException {
//实例化conf
Configuration conf = new Configuration();
//链接hdfs
conf.set("fs.defaultFS", "hdfs://192.168.87.201:9000");
System.setProperty("HADOOP_USER_NAME", "root");
//配置MR运行的模式 local和YARN
//conf.set("mapreduce.framework.name", "local");
//实例化job
Job job = Job.getInstance(conf);
//关联编写的三个class程序方便jar包的导出
job.setJarByClass(LDriver.class);
job.setMapperClass(LMapper.class);
job.setReducerClass(LReduce.class);
//
job.setNumReduceTasks(1);
//设置map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//设置最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//文件上传地址
Path[] pathsArr = { new Path("/data/sale_user.csv") };
FileInputFormat.setInputPaths(job, pathsArr);
//文件结果输出地址
FileOutputFormat.setOutputPath(job, new Path("/yuangong"));
//提交文件任务
job.waitForCompletion(true);
}
}
执行方法_pro:
导入集群执行MR的jar包文件:
这里可以将编写好的MR放到Hadoop的集群上去使用也可以直接在eclipse上使用,打包的方法:
eclipse–>Export–>java–>JAR file
找到需要打包的三个mapreduce的java程序,勾选上然后找到select the export destination:
设置jar包导出的位置,点击Next两次然后找到select the class ot the application entry point:
选择导出的包的main执行的那个class程序也就是我们的Driver。
打包后可以直接将jar包文件上传到hadoop集群上去,使用命令hadoop jar 包文件的名称.jar来执行包文件。
hive部分 建立外部表create EXTERNAL table user1( user_id String, item_id String, behavior_type String, item_category string, time string, province string) row format delimited fields terminated by ',' ;
这里出现了一个问题:
FAILED: ParseException line 1:22 cannot recognize input near 'user' '(' 'user_id' in table name
解决方法:
换一个数据表的名字如user1,因为识别问题导致程序识别的时候一直显示出错是在user附近。
导入数据使用hive sql命令:
load data inpath '/yuangong' into table user1;
将数据导入到user1数据表中去,如果出现错误,就将hadoop集群上的/yuangong文件删除,重新运行一次外部表数据会自动导入到数据表中去。
hive sql部分:题目:
使用Hive分析统计不同地区的用户在网站上各种行为的次数,即浏览总次数、加入购物车总次数、收藏总次数、购买总次数,并将结果写入一个新建的hive表,表名为user_action_stat。
先建立需要存储数据的表:
create table user_action_stat( province string, user_id int, Look int, Collection int, Cart int, Buy int) row format delimited fields terminated by ',';
然后书写insert语句将内容插入到表中去:
insert overwrite table user_action_stat select province, user_id,sum(if(behavior_type=1,1,0)), sum(if(behavior_type=2,1,0)), sum(if(behavior_type=3,1,0)), sum(if(behavior_type=4,1,0)) from user1 group by province,user_id;MySQL部分:
将得到的表存放到MySQL数据库中去(我们使用的是Sqoop工具):
sqoop export --connect jdbc:mysql://slave1/zhangsan?characterEncoding=utf8 --username root --password 123456 --table action_stat --export-dir /Hive_WH/chengzhi.db/user_action_stat
最后在MySQL中编写sql查询语句:
SELECt * FROM action_stat LIMIT 10;



