通过对各个城市的直销拒单率,求得省份的直销拒单率,并按拒单率降序排序,取前8写入hive;
数据格式 实现代码和操作过程(1).编写Bean类
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class bean implements WritableComparable{ private String province; public bean() { super(); } private double norate; public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public double getNorate() { return norate; } public void setNorate(double norate) { this.norate = norate; } @Override public String toString() { return province +"t" + norate; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(province); dataOutput.writeDouble(norate); } @Override public void readFields(DataInput dataInput) throws IOException { this.province = dataInput.readUTF(); this.norate = dataInput.readDouble(); } @Override //排序,大到小 public int compareTo(bean o) { int res; if (o.getNorate()>norate){ res=1; }else if (o.getNorate() (2).编写Mapper类
import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class m4 extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //获取一行数据,将数据进行切割 String[] words = value.toString().split(","); //判断所需数据是否为空,为空则忽略 if (words[24].equals("null") || words[24].equals("")||words[24].equals("城市直销拒单率")||words[4].equals("")||words[4].equals("null")) { return; } else { //省份和城市作为key,直销拒单率作为value context.write(new Text(words[3]+"t"+words[4]),new DoubleWritable(Double.parseDouble(words[24].replace("%",""))/100)); } } } (3).编写Reduce类
import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.spark.sql.catalyst.expressions.If; import java.io.IOException; import java.util.Iterator; import java.util.TreeMap; public class r4 extends Reducer{ //定义一个容器用于存放数据 private TreeMap treeMap = new TreeMap (); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { bean bean = new bean(); //定义i,用于累加拒单率总和 Double sum = 0.0; //定义c,用于累加城市酒店总和 Double count = 0.0; for (DoubleWritable value : values ) { sum = value.get() + sum; count++; } //v为省份的直销拒单率 double v = sum / count; bean.setNorate(v); bean.setProvince(key.toString().split("t")[1]); //添加数据 treeMap.put(bean, NullWritable.get()); //当数据条数大于8,删除拒单率最小的数据 if (treeMap.size() > 8) { treeMap.remove(treeMap.lastKey()); } } @Override protected void cleanup(Reducer .Context context) throws IOException, InterruptedException { //遍历treemap集合 Iterator iterator = treeMap.keySet().iterator(); while (iterator.hasNext()) { five.bean next = iterator.next(); context.write(next, NullWritable.get()); } } } (4).编写drive类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class d4 { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //获取job对象 Job job = Job.getInstance(new Configuration()); //指点map/reduce类 job.setMapperClass(m4.class); job.setReducerClass(r4.class); //指定jar包运行的路劲 job.setJarByClass(d4.class); //指定map输出的数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); //指定最后输出的数据类型 job.setOutputKeyClass(bean.class); job.setOutputValueClass(NullWritable.class); //指定输入/输出路劲 FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/file3_1/jd_4706.csv")); FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000//bbbbbb")); job.waitForCompletion(true); } }(5).将项目打包提交到Hadoop集群
(6).查看文件内容
(7).在hive创建数据库five和表five_tb
(8).导入数据到表five_tb中
(9).查看表five_tb
简简单单写代码



