栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

reduce端的join算法和map端的join算法

reduce端的join算法和map端的join算法

------------------------------------reduce端join-------------------------------
package demo06.reducejoin;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;

public class ReduceJoinMap extends Mapper {
    Text k2 = new Text();

    
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        //获取文件路径名
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        Path path = inputSplit.getPath();
        String pathName = path.getName();
        
        String line = value.toString();

        //逻辑处理
        if(pathName.startsWith("p")){
            //商品表数据
            String[] splits = line.split(",");
            k2.set(splits[0]);
            context.write(k2,value);
        }else{
            String[] splits = line.split(",");
            k2.set(splits[2]);
            context.write(k2,value);
        }
    }
}

package demo06.reducejoin;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ReduceJoinReduce extends Reducer {

    
    @Override
    protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
        String orderLine = "";
        String productLine = "";
        for (Text text: values) {
            if(text.toString().startsWith("p")){
                productLine=text.toString();
            } else{
                orderLine=text.toString();
            }

        }

        context.write(new Text(orderLine+"t"+productLine),NullWritable.get());
    }
}

-----------------------------------------------------map端join-------------------------------------------

在主程序里添加缓存文件

//添加我们的缓存文件
DistributedCache.addCacheFile(new URI("hdfs://node01:8020/cachefile/pdts.txt"),configuration);

package demo07.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class MapJoinMap extends Mapper {
    //定义一个setup结果集保存数据
    HashMap map;
    
    @Override
    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
        map = new HashMap();
        //从context中获取configuration
        Configuration configuration = context.getConfiguration();
        //我们只有一个缓存文件
        URI[] caches = DistributedCache.getCacheFiles(configuration);
        URI cacheFile = caches[0];
        //cacheFile是hdfs://node01:8020/.....
        //获取一个文件系统
        FileSystem fileSystem = FileSystem.get(cacheFile,configuration);
        //获取文件的输入流
        FSDataInputStream fsDataInputStream = fileSystem.open(new Path(cacheFile));
        //流读取为字符串
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream));
        String line = bufferedReader.readLine();
        String lineStr = null;
        while ((line = bufferedReader.readLine())!=null){
            String[] lineArray = line.split(",");
            map.put(lineArray[0],line);
        }


    }

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String[] splits = value.toString().split(",");
        //获取我们商品表的数据
        String product = map.get(splits[2]);
        //将商品表和订单表中的数据进行拼接
        context.write(new Text(value.toString()+"t"+product),NullWritable.get());
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/601817.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号