栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

通过Map/Reduce将HDFS数据写入ES,ES数据写入HDFS

通过Map/Reduce将HDFS数据写入ES,ES数据写入HDFS

环境准备

系统 centos 7

java 1.8

hadoop 2.7

ES 7.15.2 (ES单机版本安装可以参考:https://blog.csdn.net/weixin_36340771/article/details/121389741)

准备hadoop本地运行环境 获得Hadoop文件

链接:https://pan.baidu.com/s/1MGriraZ8ekvzsJyWdPssrw
提取码:u4uc

配置HADOOP_HOME

解压上述文件,然后配置HADOOP_HOME,注意修改地址。

获得工程代码

https://github.com/BaoPiao/elasticsearch_demo

读取HDFS写入ES

​ 通过FileInputFormat读取HDFS数据,然后通过Mapper将数据转换为MapWritable数据类型,最后通过EsOutputFormat将数据写入ES。

参数说明 基本参数 网络参数

es.nodes连接的es集群信息

es.port连接es集群的端口号,默认是9200

es.nodes.path.prefix连接的前缀信息,默认是空(例如每次都想写入到es.node:9200/custom/path/prefix,那么这里就可以写为/custom/path/prefix)

写入参数

es.resource指定es索引

es.resource = twitter/tweet #索引叫twitter 类型是tweet

es.resource.write默认取值es.resource

​ 支持根据数据,写入不同的type中(media_type是字段)

es.resource.write = my-collection/{media_type}

​ 根据数据,写入不同的索引中,此外还支持日期类型用于滚动生成日志索引,具体请见参考资料2。

es.resource.write = my-collection_{media_type}
文档相关参数

es.input.json默认是false,输入文件格式为JSON

es.write.operation默认是index

​ index(default)如果已经存在会进行替换

​ create如果已经存在会报异常

​ update更新已经存在的,如果不存在报异常

​ upsert如果不存在就是插入,如果存在就是更新

es.output.json默认是false,输出文件格式是否为JSON

es.ingest.pipeline默认是none,指定处理管道

es.mapping.id指定文档id,这里填写的是字段名称,默认为空

es.mapping.parent指定父文档,这里填写的是字段名称或填写一个常量

es.mapping.version指定版本号,这里填写的是字段名称或填写一个常量

es.mapping.include指定那些字段写入ES

es.mapping.exclude指定哪些字段不写入ES

代码参考

这里只贴出driver代码,其余部分请参考github:https://github.com/BaoPiao/elasticsearch_demo

public class ElasticsearchHadoop {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.nodes", "192.168.10.108:9200");
        conf.set("es.resource", "/artists_2");
        conf.set("es.write.operation", "upsert");
        conf.set("es.mapping.include", "id");
        conf.set("es.mapping.id", "id");
        Job job = Job.getInstance(conf);
        job.setJarByClass(ElasticsearchHadoop.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapperClass(EsMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(MapWritable.class);
        FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.10.108:9000/test"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}
读取ES写入HDFS

​ 通过EsInputFormat读取ES数据,返回数据为,其中Text为key,MapWritable包含字段和字段值信息,最后编写自己的Reduce写到HDFS。

参数说明 元数据信息

es.read.metadata默认是flase,是否返回原数据信息,例如文档id、版本号等

es.read.metadata.field默认是_metadata字段,当且仅当es.read.metadata设置为true,返回元数据信息,数据由map封装

es.read.metadata.version默认是false,当且仅当es.read.metadata设置为true时,该值设置为true才生效返回版本号。

查询参数

es.resource指定es索引

es.resource = twitter/tweet #索引叫twitter 类型是tweet

es.query默认为空

  1. uri 查询方式
  2. dsl 查询方式(推荐使用)
  3. 读取文件方式
# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.resource.read默认值是es.resource

es.resource.read默认值是es.resource

代码参考

这里只贴出driver部分,具体请参考github:https://github.com/BaoPiao/elasticsearch_demo

public class ReadElasticsearch {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("es.nodes", "192.168.10.108:9200");
        conf.set("es.resource", "artists_2");
        conf.set("es.read.metadata", "true");
        conf.set("es.read.metadata.field", "_metadata");
        conf.set("es.query", "{n" +
                "    "query":{n" +
                "        "match_all":{}n" +
                "    }n" +
                "}");
        org.apache.hadoop.mapreduce.Job job = Job.getInstance(conf);
        job.setInputFormatClass(EsInputFormat.class);

        job.setReducerClass(EsReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MapWritable.class);

        FileOutputFormat.setOutputPath(job, new Path("D:\hadoop\outputES"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

最后

还有很多参数没详细列出来,例如写入批次大小、写入重试次数、超时时间等;读取scroll设置等,具体参考资料

参考资料

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapreduce.html

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#_essential_settings

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629458.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号