栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

从ES中读取数据,并将数据输出到本地(本地执行)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

从ES中读取数据,并将数据输出到本地(本地执行)

从ES中读取数据,并将数据输出到本地(本地执行ES)

文章目录

从ES中读取数据,并将数据输出到本地(本地执行ES)

pom文件mapper程序如下:job执行文件
本程序总共需要两个文件,一个是job执行文件,另一个是对读取的数据进行处理的mapper文件。

因程序是maven程序,需要有一个pom文件,pom文件如下:

pom文件


    4.0.0

    org.example
    ES2hadoop
    1.0-SNAPSHOT


    
        
            huaweicloudsdk
            https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/
            
                true
            
            
                true
            
        
        
        
            alimaven
            aliyun maven
            https://maven.aliyun.com/repository/public
            
                true
            
            
                false
            
        
    

    
        1.18.14

        1.8
        8
        8
        UTF-8
        UTF-8
    

    
        
            org.apache.hadoop
            hadoop-client
            3.1.1-hw-ei-312005
            provided
        


























        
            org.elasticsearch
            elasticsearch-hadoop
            7.16.2

        
        
            org.slf4j
            slf4j-api
            1.7.28
        
        
            org.apache.hadoop
            hadoop-common
            3.1.1-hw-ei-312005
        
        
            org.apache.hadoop
            hadoop-mapreduce-client-core
            3.1.1-hw-ei-312005
        

    
    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.2.4
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    com.google.code.findbugs:jsr305
                                    org.slf4j:*
                                    log4j:*
                                    org.apache.hadoop:*

                                
                            
                            
                                
                                    
                                    *:*
                                    
                                        meta-INF
    @Override
    protected void map(Text key, linkedMapWritable value, Context context)
            throws IOException, InterruptedException {
        LOG.info("key {} value {}", key, value);
        context.write(key, value); //数据不做任何处理,直接输出
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
    }

}
job执行文件
package com.es_hadoop_example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.linkedMapWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class E2HJob {
    private static Logger LOG = LoggerFactory.getLogger(E2HJob.class);

    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();

            //禁止speculative(推测执行)机制,该机制会启动多个相同task,使数据重复索引
            conf.setBoolean("mapreduce.map.speculative", false);
            conf.setBoolean("mapreduce.reduce.speculative", false);

            conf.set("es.nodes", "127.0.0.1:9200"); //ElasticSearch节点
            conf.set("es.resource", "ecommerce/product"); //ElaticSearch source: Index/Type
//            conf.set("es.resource", "user/_doc");
//            conf.set("es.resource", "kibana_sample_data_ecommerce/_doc"); //无法执行成功???

            
            conf.set("es.nodes.wan.only","true"); // 禁用网络中其他节点的自动发现.强制系统使用“es.nodes”属性,默认情况下会尝试连接到 localhost.

            
            Job job = Job.getInstance(conf, "JOBE2H"); //构建job对象
            job.setJarByClass(E2HJob.class); //指定jar包运行主类
            job.setInputFormatClass(EsInputFormat.class); //指定输入格式的类
            job.setMapperClass(E2HMapper.class); //指定map类
            job.setMapOutputKeyClass(Text.class); //指定map输出的 key的类
            job.setMapOutputValueClass(linkedMapWritable.class); //指定map输出的 value的类

            FileSystem fs = FileSystem.get(conf);
            Path outPath =new Path("D:\test\es_data");
            if(fs.exists(outPath)) {
                fs.delete(outPath, true);
            }
            FileOutputFormat.setOutputPath(job, outPath); //指定输出路径

            System.out.println(job.waitForCompletion(true));//打印执行结果,结果为true表明执行成功
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);

        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/733204.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号