从ES中读取数据,并将数据输出到本地(本地执行ES)
pom文件mapper程序如下:job执行文件
本程序总共需要两个文件,一个是job执行文件,另一个是对读取的数据进行处理的mapper文件。
因程序是maven程序,需要有一个pom文件,pom文件如下:
pom文件job执行文件4.0.0 org.example ES2hadoop 1.0-SNAPSHOT huaweicloudsdk https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/ true true alimaven aliyun maven https://maven.aliyun.com/repository/public true false 1.18.14 1.8 8 8 UTF-8 UTF-8 org.apache.hadoop hadoop-client 3.1.1-hw-ei-312005 provided org.elasticsearch elasticsearch-hadoop 7.16.2 org.slf4j slf4j-api 1.7.28 org.apache.hadoop hadoop-common 3.1.1-hw-ei-312005 org.apache.hadoop hadoop-mapreduce-client-core 3.1.1-hw-ei-312005 org.apache.maven.plugins maven-shade-plugin 3.2.4 package shade com.google.code.findbugs:jsr305 org.slf4j:* log4j:* org.apache.hadoop:* *:* meta-INF @Override protected void map(Text key, linkedMapWritable value, Context context) throws IOException, InterruptedException { LOG.info("key {} value {}", key, value); context.write(key, value); //数据不做任何处理,直接输出 } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } }
package com.es_hadoop_example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.linkedMapWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class E2HJob {
private static Logger LOG = LoggerFactory.getLogger(E2HJob.class);
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
//禁止speculative(推测执行)机制,该机制会启动多个相同task,使数据重复索引
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.set("es.nodes", "127.0.0.1:9200"); //ElasticSearch节点
conf.set("es.resource", "ecommerce/product"); //ElaticSearch source: Index/Type
// conf.set("es.resource", "user/_doc");
// conf.set("es.resource", "kibana_sample_data_ecommerce/_doc"); //无法执行成功???
conf.set("es.nodes.wan.only","true"); // 禁用网络中其他节点的自动发现.强制系统使用“es.nodes”属性,默认情况下会尝试连接到 localhost.
Job job = Job.getInstance(conf, "JOBE2H"); //构建job对象
job.setJarByClass(E2HJob.class); //指定jar包运行主类
job.setInputFormatClass(EsInputFormat.class); //指定输入格式的类
job.setMapperClass(E2HMapper.class); //指定map类
job.setMapOutputKeyClass(Text.class); //指定map输出的 key的类
job.setMapOutputValueClass(linkedMapWritable.class); //指定map输出的 value的类
FileSystem fs = FileSystem.get(conf);
Path outPath =new Path("D:\test\es_data");
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath); //指定输出路径
System.out.println(job.waitForCompletion(true));//打印执行结果,结果为true表明执行成功
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
}



