栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

educoder平台项目——旅游网站之数据分析

educoder平台项目——旅游网站之数据分析

第1关:统计每个城市的宾馆平均价格

package com.processdata;

import java.io.IOException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.util.HbaseUtil;

/**
 * 使用MapReduce程序处理Hbase中的数据并将最终结果存入到另一张表 1中
 */
public class HbaseMapReduce extends Configured implements Tool {

    public static class MyMapper extends TableMapper {
        public static final byte[] column = "price".getBytes();
        public static final byte[] family = "hotel_info".getBytes();

        @Override
        protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
                throws IOException, InterruptedException {
            /********** Begin *********/
            String cityId = Bytes.toString(result.getValue("cityInfo".getBytes(), "cityId".getBytes()));     byte[] value = result.getValue(family, column);     Double value1 = Double.parseDouble(Bytes.toString(value));     DoubleWritable i = new DoubleWritable(value1);     String priceKey = cityId;     context.write(new Text(priceKey),i); 
		 
		 
		 
		 
		  	/********** End *********/
        }
    }

    public static class MyTableReducer extends TableReducer {
        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            /********** Begin *********/
            double sum = 0; 
            int length = 0; 
            for (DoubleWritable price : values){ 
                length++; 
                sum += price.get(); 
                } 
                Put put = new Put(Bytes.toBytes(key.toString())); 
                put.addColumn("average_infos".getBytes(),"price".getBytes(),Bytes.toBytes(String.valueOf(sum/length))); 
                context.write(null,put);
		 
		 
		 
			/********** End *********/
        }

    }

    
    
    
    
    
    public int run(String[] args) throws Exception {
        //配置Job
        Configuration conf = HbaseConfiguration.create(getConf());
        conf.set("hbase.zookeeper.quorum", "127.0.0.1");  //hbase 服务地址
        conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号
        Scanner sc = new Scanner(System.in);
        String arg1 = sc.next();
        String arg2 = sc.next();
        //String arg1 = "t_city_hotels_info";
        //String arg2 = "average_table";
        try {
			HbaseUtil.createTable("average_table", new String[] {"average_infos"});
		} catch (Exception e) {
			// 创建表失败
			e.printStackTrace();
		}
        Job job = configureJob(conf,new String[]{arg1,arg2});
        return job.waitForCompletion(true) ? 0 : 1;
    }

    private Job configureJob(Configuration conf, String[] args) throws IOException {
        String tablename = args[0];
        String targetTable = args[1];
        Job job = new Job(conf,tablename);
        Scan scan = new Scan();
        scan.setCaching(300);
        scan.setCacheBlocks(false);//在mapreduce程序中千万不要设置允许缓存
        //初始化Mapreduce程序
        TableMapReduceUtil.initTableMapperJob(tablename,scan,MyMapper.class, Text.class, DoubleWritable.class,job);
        //初始化Reduce
        TableMapReduceUtil.initTableReducerJob(
                targetTable,        // output table
                MyTableReducer.class,    // reducer class
                job);
        job.setNumReduceTasks(1);
        return job;
    }
}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699198.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号