------------------------------------reduce端join------------------------------- package demo06.reducejoin; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class ReduceJoinMap extends Mapper{ Text k2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //获取文件路径名 FileSplit inputSplit = (FileSplit) context.getInputSplit(); Path path = inputSplit.getPath(); String pathName = path.getName(); String line = value.toString(); //逻辑处理 if(pathName.startsWith("p")){ //商品表数据 String[] splits = line.split(","); k2.set(splits[0]); context.write(k2,value); }else{ String[] splits = line.split(","); k2.set(splits[2]); context.write(k2,value); } } }
package demo06.reducejoin; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class ReduceJoinReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { String orderLine = ""; String productLine = ""; for (Text text: values) { if(text.toString().startsWith("p")){ productLine=text.toString(); } else{ orderLine=text.toString(); } } context.write(new Text(orderLine+"t"+productLine),NullWritable.get()); } }
-----------------------------------------------------map端join-------------------------------------------
在主程序里添加缓存文件
//添加我们的缓存文件
DistributedCache.addCacheFile(new URI("hdfs://node01:8020/cachefile/pdts.txt"),configuration);
package demo07.mapjoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; public class MapJoinMap extends Mapper{ //定义一个setup结果集保存数据 HashMap map; @Override protected void setup(Mapper .Context context) throws IOException, InterruptedException { map = new HashMap (); //从context中获取configuration Configuration configuration = context.getConfiguration(); //我们只有一个缓存文件 URI[] caches = DistributedCache.getCacheFiles(configuration); URI cacheFile = caches[0]; //cacheFile是hdfs://node01:8020/..... //获取一个文件系统 FileSystem fileSystem = FileSystem.get(cacheFile,configuration); //获取文件的输入流 FSDataInputStream fsDataInputStream = fileSystem.open(new Path(cacheFile)); //流读取为字符串 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream)); String line = bufferedReader.readLine(); String lineStr = null; while ((line = bufferedReader.readLine())!=null){ String[] lineArray = line.split(","); map.put(lineArray[0],line); } } @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String[] splits = value.toString().split(","); //获取我们商品表的数据 String product = map.get(splits[2]); //将商品表和订单表中的数据进行拼接 context.write(new Text(value.toString()+"t"+product),NullWritable.get()); } }



