近期需要开发个功能,通过MR批量读取HFile文件,并以Result的格式写入到其他存储系统中。过程中遇到了一些坑,在此记录下。
整理流程:前期调研
基于MR的Hbase数据读取,常用的是基于Hbase官方的TableMapper实现。但是这里需要读取HFile文件,且不想经过Hbase(BulkLoad场景下,读取增量数据,但不想全扫描Hbase表),那这个方案就不合适啦。
1、遍历输入HDFS制定路径,筛选HFile文件绝对路径作为MR任务的输入
2、Mapper阶段,将HFile文件解析为单条的cell数据,并将
3、根据rowkey聚合,按rowkey组合为Hbase Result对象(单行单列的Cell转换为单行多列的Result),写入ES(聚合为Result是为了防止以cell方式写入ES覆盖的问题)
1. 自定义inputformat实现1.1 继承FileInputFormat
public class HFileInputFormat extends FileInputFormat{ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new HFileRecordReader((FileSplit) split, context .getConfiguration()); } }
1.2 实现RecordReader,继承org.apache.hadoop.mapreduce.RecordReader
public class HFileRecordReader2. cell无法作为map阶段的输出,需要实现Writeable接口extends RecordReader{ private HFile.Reader reader; private final HFileScanner scanner; private long entryNumber = 0L; public HFileRecordReader(FileSplit split, Configuration conf) throws IOException { final Path path = split.getPath(); reader = HFile.createReader(FileSystem.get(conf), path , conf); HFile.FileInfo fi = (HFile.FileInfo) reader.loadFileInfo(); scanner = reader.getScanner(false, false); System.out.println(fi.size()); scanner.seekTo(); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } @Override public boolean nextKeyValue() throws IOException, InterruptedException { entryNumber++; return scanner.next(); } @Override public Object getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return new ImmutableBytesWritable(CellUtil.cloneRow((KeyValue)scanner.getCell())); } @Override public Object getCurrentValue() throws IOException, InterruptedException { return (KeyValue)scanner.getCell(); } @Override public float getProgress() throws IOException, InterruptedException { if (reader != null) { return (entryNumber / reader.getEntries()); } return 1; } @Override public void close() throws IOException { if (reader != null) { reader.close(); } } }
public class HFileRecord implements Writable {
private byte operate;
private long timestamp;
private String row;
private String family;
private String qualifier;
private String value;
public HFileRecord () {}
public HFileRecord (byte operate, long timestamp, String row, String family, String qualifier, String value) {
this.operate = operate;
this.timestamp = timestamp;
this.row = row;
this.family = family;
this.qualifier = qualifier;
this.value =value;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(operate);
out.writeLong(timestamp);
out.writeUTF(row);
out.writeUTF(family);
out.writeUTF(qualifier);
out.writeUTF(value);
}
@Override
public void readFields(DataInput in) throws IOException {
this.operate = in.readByte();
this.timestamp = in.readLong();
this.row = in.readUTF();
this.family = in.readUTF();
this.qualifier = in.readUTF();
this.value = in.readUTF();
}
public byte getOperate() {
return this.operate;
}
public long getTimestamp() {
return this.timestamp;
}
public String getRow() {
return this.row;
}
public String getFamily() {
return this.family;
}
public String getQualifier() {
return this.qualifier;
}
public String getValue() {
return this.value;
}
}
3、编写mapper阶段逻辑
public class HFileReadMapper extends Mapper4、编写reducer阶段逻辑{ private static final Log LOG = LogFactory.getLog(HFileReadMapper.class); String outKey = null; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); context.getCounter("Convert", "mapper").increment(1); } @Override protected void map(ImmutableBytesWritable key, Cell cell, Context context) throws IOException, InterruptedException { HFileRecord hFileRecord = new HFileRecord(cell.getTypeByte(), cell.getTimestamp(), new String(CellUtil.cloneRow(cell)), new String(CellUtil.cloneFamily(cell)), new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.clonevalue(cell))); context.write(new Text(CellUtil.cloneRow(cell)), hFileRecord); } }
public class HFileReadReducer extends Reducer5、mr任务相关设置{ private static final Log LOG = LogFactory.getLog(HFileReadMapper.class); Result result; List cellAll = Lists.newArrayListWithCapacity(10); @Override protected void setup(Reducer.Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void reduce(Text text, Iterable | cellList, Context context) throws IOException, InterruptedException { for (HFileRecord hr : cellList) { cellAll.add(CellUtil.createCell(hr.getRow().getBytes(), hr.getFamily().getBytes(), hr.getQualifier().getBytes(), hr.getTimestamp(), hr.getOperate(), hr.getValue().getBytes())); } cellAll.sort(CellComparator.getInstance()); result = Result.create(cellAll); System.out.println("==============Get Value Start================"); for (int i=0; i<10; i++) { String field = "field" + i; byte[] bytei = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes(field)); if (null != bytei) { System.out.println("----" + field + " is : " + new String(bytei)); } } System.out.println("==============Get Value End================"); cellAll.clear(); } }
public class HFileToIndexer extends Configured implements Tool {
private static final Logger LOG = LogManager.getLogger(HFileToIndexer.class);
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new HFileToIndexer(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
try {
Configuration conf = HbaseConfiguration.create();
// create job
Job job = Job.getInstance(conf, "HFileToIndexer: Scan HFiles to Indexer");
job.setJarByClass(HFileToIndexer.class);
job.setInputFormatClass(HFileInputFormat.class);
job.setMapperClass(HFileReadMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(HFileRecord.class);
job.setReducerClass(HFileReadReducer.class);
// Run a mapper-only MR job that sends index documents directly to a live ES instance.
job.setOutputFormatClass(NullOutputFormat.class);
// 设置reduce为0,即没有reduce步骤
// job.setNumReduceTasks(0);
// HDFS遍历指定路径,并添加到 inputPath 中
String hfilePath = args[0];
try {
FileSystem fs = FileSystem.get(conf);
RemoteIterator lt = fs.listFiles(new Path(hfilePath), true);
while (lt.hasNext()) {
LocatedFileStatus file = lt.next();
if(file.isFile() && file.getPath().getName().length() == 32) {
LOG.info("file name is :" + file.getPath().getName());
FileInputFormat.addInputPath(job, new Path(file.getPath().toString()));
}
}
} catch (IOException ioe) {
LOG.error(ioe.getMessage());
}
if (!job.waitForCompletion(true)) {
LOG.error("Failure");
} else {
LOG.info("Success");
return 0;
}
} catch (Exception e) {
e.printStackTrace();
}
return 1;
}
}



