栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot 整合 sqoop

Springboot 整合 sqoop

        Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

                                                                                                             来自:百度百科

       以上是对sqoop的一个简单说明,具体我就不再多赘述。日常企业开发过程中,我们可能面对增删改查的业务比较多,但是作为一个程序员,我觉得不要局限于此,可能面对业务的场景不同。自然而然的对整个业务技术框架的认知也是有一定的局限性。今天跟大家分享这个Sqoop框架,基于springBoot进行整合。也许能够帮助你在你的简历中锦上添花,希望能够你带来薪资上的变化。

       说起sqoop,我们必须要了解它的用途,主要应用于 RDBMS 与 Hadoop ( HDFS / Hive / Hbase )数据传输迁移。我们主要通过这个工具主要作为归档数据同步使用辅助企业智能推荐及可视化大屏使用。为什么会用到sqoop,应为它解决了关系数据库与Hadoop之间的数据传输问题。基于它底层MR的本质,具有性能高、易用、灵活的特点。

        下面通过实际的案例,对整个整合过程做进一步的详解。

1.首先我们先创建一个简单的springboot工程。具体过程我就不多赘述,可参考以下地址:springboot超详细搭建过程 - 简书

2.引入依赖文件

 
    
            mysql
            mysql-connector-java
        
        
        
            org.apache.sqoop
            sqoop
            1.4.7
            hadoop260
        


        
            org.apache.commons
            commons-lang3
            3.0
        

        
            org.apache.commons
            commons-lang3
            3.0
        
        
        
            org.apache.hadoop
            hadoop-common
            2.8.4
        
        
            org.apache.hadoop
            hadoop-hdfs
            2.8.4
        
        
            org.apache.hadoop
            hadoop-mapreduce-client-core
            2.8.4
        
        
            org.apache.hadoop
            hadoop-mapreduce-client-common
            2.8.4
        
        
            org.apache.hadoop
            hadoop-mapreduce-client-jobclient
            2.8.4
            test
        
        
            org.apache.avro
            avro-mapred
            1.8.1
        
        
            org.apache.hive
            hive-common
            2.3.2
        
        
            org.apache.avro
            avro
            1.8.1
        
    

注意:

 3.创建Crotoller类

package com.ienglish.batch.data.sqoop.controller;



import com.ienglish.batch.data.sqoop.entity.sqoopBean;
import com.ienglish.batch.data.sqoop.service.SqoopService;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/data")
public class SqoopController {
    @Autowired
    private SqoopService sqoopService;

    
    @PostMapping("/mysqlTohdfs")
    @ResponseBody
    public sqoopBean mysqlTohdfs(String connect, String driver, String username, String password, String table, int m, String targetdir, String hdfsAddr) throws Exception {
        return sqoopService.mysqlTohdfs(connect, driver, username, password, table, m, targetdir, hdfsAddr);
    }

    
    @PostMapping("/mysql2hbase")
    @ResponseBody
    public sqoopBean transformMysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception {
        return sqoopService.mysql2Hbase(jdbc, driver, username, password, mysqlTable, hbaseTableName, columnFamily, rowkey, m);
    }


}

3.创建接口类

package com.ienglish.batch.data.sqoop.service;

import com.ienglish.batch.data.sqoop.entity.sqoopBean;




public interface SqoopService {
    
    public sqoopBean mysqlTohdfs(String connect, String driver, String username, String password, String table, int m, String targetdir, String hdfsAddr) throws Exception;

    
    public sqoopBean mysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception;


}

4.创建实现类

package com.ienglish.batch.data.sqoop.service.serviceimpl;



import com.ienglish.batch.data.sqoop.entity.sqoopBean;
import com.ienglish.batch.data.sqoop.service.SqoopService;
import org.apache.hadoop.conf.Configuration;
import org.springframework.stereotype.Service;
import org.apache.sqoop.Sqoop;
import org.apache.sqoop.tool.SqoopTool;
import org.apache.sqoop.util.OptionsFileUtil;
import java.sql.Timestamp;
import java.util.Date;

@Service
public class SqoopServiceImpl implements SqoopService {

    @Override
    public sqoopBean mysqlTohdfs(String connect, String driver, String username,
                                 String password, String table, int m, String targetdir,
                                 String hdfsAddr) throws Exception {
        String[] args = new String[]{
                "--connect", connect,
                "--driver", driver,
                "-username", username,
                "-password", password,
                "--table", table,
                "-m", String.valueOf(m),
                "--target-dir", targetdir,
        };
        sqoopBean sqoopBean = new sqoopBean();
        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        conf.set("fs.default.name", hdfsAddr);
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        sqoopBean.setI(Sqoop.runSqoop(sqoop, expandArguments));
        sqoopBean.setTs(new Timestamp(new Date().getTime()));
        return sqoopBean;


    }

    @Override
    public sqoopBean mysql2Hbase(String jdbc, String driver, String username, String password, String mysqlTable, String hbaseTableName, String columnFamily, String rowkey, int m) throws Exception {
        String[] args = new String[]{
                "--connect", jdbc,
                "--driver", driver,
                "-username", username,
                "-password", password,
                "--table", mysqlTable,
                "--hbase-table", hbaseTableName,
                "--column-family", columnFamily,
                "--hbase-create-table",
                "--hbase-row-key", rowkey,
                "-m", String.valueOf(m),
        };
        sqoopBean sqoopBean = new sqoopBean();
        String[] expandArguments = OptionsFileUtil.expandArguments(args);
        SqoopTool tool = SqoopTool.getTool("import");
        Configuration conf = new Configuration();
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);
        sqoopBean.setI(Sqoop.runSqoop(sqoop, expandArguments));
        sqoopBean.setTs(new Timestamp(new Date().getTime()));
        return sqoopBean;
    }
}


以上就是正常的整合配置过程,这只是一个demo,可以根据实际需求进行封装使用。

具体执行结果如下:

 

文件传输成功。

获取源码扫描下方二维码回复:"sqoop整合"

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600805.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号