栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot集成MaxCompute

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot集成MaxCompute

1、SDK方式集成

使用odps-sdk-core集成, 官方文档地址MaxCompute Java SDK介绍

1.1、依赖引入odps-sdk-core

    1.8
    
    0.40.8-public



  
  
    com.aliyun.odps
    odps-sdk-core
    ${max-compute-sdk.version}


1.2、编写连接工具类

编写MaxComputeSdkUtil以SDK方式连接MaxCompute

1.2.1、重要类和方法说明
  • 1、连接参数类:
@Data
public class MaxComputeSdkConnParam {
    
    private String aliyunAccessId;
    
    private String aliyunAccessKey;
    
    private String maxComputeEndpoint;
    
    private String projectName;
}
  • 2、查询表元数据信息实体

主要是字段:tableName, comment。还可以自己添加其他字段

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableMetaInfo {
    
    private String tableName;
    
    private String comment;
}

  • 3、公共方法(初始化)
private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";

private static final String FULL_SCAN_CONFIG = "odps.sql.allow.fullscan";

private static final String PAGE_SELECt_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;";

private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;";

private final Odps odps;


private final MaxComputeSdkConnParam connParam;

public MaxComputeSdkUtil(MaxComputeSdkConnParam param){
    this.connParam = param;
    // 构建odps客户端
    this.odps = buildOdps();
}


private Odps buildOdps() {
    // 阿里云账号密码  AccessId 和 AccessKey
    final String aliyunAccessId = connParam.getAliyunAccessId();
    final String aliyunAccessKey = connParam.getAliyunAccessKey();
    // 创建阿里云账户
    final AliyunAccount aliyunAccount = new AliyunAccount(aliyunAccessId, aliyunAccessKey);

    // 使用阿里云账户创建odps客户端
    final Odps odps = new Odps(aliyunAccount);

    // 传入了的话就是用传入的 没有传入使用默认的
    final String endpoint = connParam.getMaxComputeEndpoint();
    try {
        odps.setEndpoint(ObjectUtils.isEmpty(endpoint) ? defaultEndpoint : endpoint);
    } catch (Exception e) {
        // 端点格式不正确
        throw new BizException(ResultCode.MAX_COMPUTE_ENDPOINT_ERR);
    }

    // 设置项目
    odps.setDefaultProject(connParam.getProjectName());
    return odps;
}
  • 4、查询表信息
public List getTableInfos(){
    final Tables tables = odps.tables();
    List resultTables = new ArrayList<>();
    try {
        for (Table table : tables) {
            // tableName
            final String name = table.getName();
            // 描述
            final String comment = table.getComment();
            final TableMetaInfo info = new TableMetaInfo(name, comment);
            resultTables.add(info);
        }
    } catch (Exception e) {
        e.printStackTrace();
        final String errMsg = ObjectUtils.isEmpty(e.getMessage()) ? "" : e.getMessage();
        if (errMsg.contains("ODPS-0410051:Invalid credentials")){
            throw new BizException(ResultCode.MAX_COMPUTE_UNAME_ERR);
        }
        if (errMsg.contains("ODPS-0410042:Invalid signature value")){
            throw new BizException(ResultCode.MAX_COMPUTE_PWD_ERR);
        }
        if (errMsg.contains("ODPS-0420095: Access Denied")){
            throw new BizException(ResultCode.MAX_COMPUTE_PROJECT_ERR);
        }
    }
    return resultTables;
}

  • 5、执行SQL封装
public List> queryData(String querySql, boolean fullScan){
    try {
        // 配置全表扫描吗
        configFullScan(fullScan);
        // 使用任务执行SQL
        final Instance instance = SQLTask.run(odps, querySql);
        // 等待执行成功
        instance.waitForSuccess();
        // 封装返回结果
        List records = SQLTask.getResult(instance);
        // 结果转换为Map
        return buildMapByRecords(records);
    } catch (OdpsException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    }
}


private void configFullScan(boolean fullScan) {
        if (fullScan){
        // 开启全表扫描配置
        Map config = new HashMap<>();
        log.info("===>>开启全表扫描, 查询多个分区数据");
        config.put(FULL_SCAN_CONFIG, "true");
        odps.setGlobalSettings(config);
        }else {
        // 移除全表扫描配置
        odps.getGlobalSettings().remove(FULL_SCAN_CONFIG);
        }
    }


private List> buildMapByRecords(List records) {
        List> listMap = new ArrayList<>();
        for (Record record : records) {
        Column[] columns = record.getColumns();
        Map map = new LinkedHashMap<>();
        for (Column column : columns) {
        String name = column.getName();
        Object value = record.get(name);
        // maxCompute里面的空返回的是使用n
        if ("\N".equalsIgnoreCase(String.valueOf(value))) {
        map.put(name, "");
        } else {
        map.put(name, value);
        }
        }
        listMap.add(map);
        }
        return listMap;
    }
  • 6、分页查询分装
public List> queryData(String querySql, Integer page, Integer size, boolean fullScan){
    // 重写SQl,添加limit offset, limit
    // 1、替换分号
    querySql = querySql.replaceAll(";", "");
    // 2、格式化SQL
    Integer offset = (page - 1 ) * size;
    // 得到执行sql
    final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size);
    log.info("=======>>>执行分页sql为:{}", execSql);

    // 调用执行SQL数据
    return queryData(execSql, fullScan);
}



public PageResult> pageQueryMap(String querySql, Integer page, Integer size){
    // 1、替换分号
    querySql = querySql.replaceAll(";", "");
    String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql);
    log.info("=======>>>执行分页统计总数sql为:{}", countSql);
    // 查询总数
    final List> countMap = queryData(countSql, false);
    if (CollectionUtils.isEmpty(countMap)){
        return new PageResult<>(0L, new ArrayList<>());
    }

    long count = 0L;
    for (Object value : countMap.get(0).values()) {
        count = Long.parseLong(String.valueOf(value));
    }

    if (count == 0){
        return new PageResult<>(0L, new ArrayList<>());
    }

    // 执行分页查询 开启全表扫描
    final List> resultList = queryData(querySql, page, size, true);

    return new PageResult<>(count, resultList);
}



public PageResult pageQuery(String querySql, Integer page, Integer size, Class clazz){
    final PageResult> result = pageQueryMap(querySql, page, size);
    List rows = new ArrayList<>();
    for (Map row : result.getRows()) {
        final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
        rows.add(t);
    }
    return new PageResult<>(result.getTotal(), rows);
}
1.2.2 工具类测试

使用测试数据测试工具类

public static void main(String[] args) {
    // 构建连接参数
    final MaxComputeSdkConnParam connParam = new MaxComputeSdkConnParam();
    connParam.setAliyunAccessId("您的阿里云账号accessId");
    connParam.setAliyunAccessKey("您的阿里云账号accessKey");
    connParam.setProjectName("项目名");

    // 实例化工具类
    final MaxComputeSdkUtil sdkUtil = new MaxComputeSdkUtil(connParam);

    // 查询所有表
    final List tableInfos = sdkUtil.getTableInfos();
    for (TableMetaInfo tableInfo : tableInfos) {
        System.out.println(tableInfo.getTableName());
    }

    // 分页查询数据
    final PageResult> page = sdkUtil.pageQueryMap("select * from ods_cust;", 2, 10);
    System.out.println(page.getTotal());
    for (Map map : page.getRows()) {
        System.out.println(JSONObject.toJSONString(map));
    }
}
1.2.3 为什么要开启全表扫描

maxCompute存在使用限制如下:

当使用select语句时,屏显最多只能显示10000行结果。当select语句作为子句时则无此限制,select子句会将全部结果返回给上层查询。
select语句查询分区表时默认禁止全表扫描。
自2018年1月10日20:00:00后,在新创建的项目上执行SQL语句时,默认情况下,针对该项目里的分区表不允许执行全表扫描操作。在查询分区表数据时必须指定分区,由此减少SQL的不必要I/O,从而减少计算资源的浪费以及按量计费模式下不必要的计算费用。

如果您需要对分区表进行全表扫描,可以在全表扫描的SQL语句前加上命令set odps.sql.allow.fullscan=true;,并和SQL语句一起提交执行。假设sale_detail表为分区表,需要同时执行如下语句进行全表查询:

2、JDBC方式集成

使用odps-jdbc集成, 官方文档地址MaxCompute Java JDBC介绍


    1.8
    
    3.0.1



  
  
    com.aliyun.odps
    odps-jdbc
    ${max-compute-jdbc.version}
    jar-with-dependencies
  

2.2、编写连接工具类

编写MaxComputeSdkUtil以JDBC方式连接MaxCompute

2.2.1、重要类和方法说明
  • 1、连接参数类:
@Data
public class MaxComputeJdbcConnParam {
  
  private String aliyunAccessId;
  
  private String aliyunAccessKey;
  
  private String endpoint;
  
  private String projectName;
}
  • 2、公共方法(初始化)
private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";

private static final String SELECT_ALL_TABLE_SQL = "select table_name, table_comment from Information_Schema.TABLES";

private static final String SELECt_FIELD_BY_TABLE_SQL = "select column_name, column_comment from Information_Schema.COLUMNS where table_name = '%s'";

private static final String PAGE_SELECt_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;";

private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;";

private final Connection conn;


private final MaxComputeJdbcConnParam connParam;

public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {
    this.connParam = connParam;
    this.conn = buildConn();
}


private Connection buildConn() {
    try {
        Class.forName(DRIVER_NAME);
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR);
    }

    try {
        // JDBCURL连接模板
        String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";
        // 使用驱动管理器连接获取连接
        return DriverManager.getConnection(
                String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()),
                connParam.getAliyunAccessId(), connParam.getAliyunAccessKey());
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR);
    }
}
  • 3、查询表信息
public List getTableInfos(){
    List resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 创建statement 使用SQL直接查询
        statement = conn.createStatement();
        // 执行查询语句
        resultSet = statement.executeQuery(SELECt_ALL_TABLE_SQL);
        while (resultSet.next()){
            final String tableName = resultSet.getString("table_name");
            final String tableComment = resultSet.getString("table_comment");
            final TableMetaInfo info = new TableMetaInfo(tableName, tableComment);
            resultList.add(info);
        }

        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 关闭resultSet
        closeResultSet(resultSet);
        // 关闭statement
        closeStatement(statement);
    }
}
  • 4、执行SQL封装
public List> queryData(String querySql){
    List> resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 创建statement
        statement = conn.createStatement();

        // 执行查询语句
        resultSet = statement.executeQuery(querySql);

        // 构建结果返回
        buildMapByRs(resultList, resultSet);

        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 关闭resultSet
        closeResultSet(resultSet);
        // 关闭statement
        closeStatement(statement);
    }
}


private void buildMapByRs(List> resultList, ResultSet resultSet) throws SQLException {
    // 获取元数据
    ResultSetMetaData metaData = resultSet.getMetaData();
    while (resultSet.next()) {
        // 获取列数
        int columnCount = metaData.getColumnCount();
        Map map = new HashMap<>();
        for (int i = 0; i < columnCount; i++) {
            String columnName = metaData.getColumnName(i + 1);
            Object object = resultSet.getObject(columnName);
            // maxCompute里面的空返回的是使用n
            if ("\N".equalsIgnoreCase(String.valueOf(object))) {
                map.put(columnName, "");
            } else {
                map.put(columnName, object);
            }
        }
        resultList.add(map);
    }
}


private void closeStatement(Statement statement){
    if (statement != null){
        try {
            statement.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

private void closeResultSet(ResultSet resultSet){
    if (resultSet != null){
        try {
            resultSet.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
  • 5、分页查询分装
public List> queryData(String querySql, Integer page, Integer size){
    List> resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 1、替换分号
        querySql = querySql.replaceAll(";", "");
        // 创建statement
        statement = conn.createStatement();
        // 2、格式化SQL
        int offset = (page - 1 ) * size;
        final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size);
        log.info("=======>>>执行分页sql为:{}", execSql);
        // 执行查询语句
        resultSet = statement.executeQuery(execSql);

        // 构建结果返回
        buildMapByRs(resultList, resultSet);
        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 关闭resultSet
        closeResultSet(resultSet);
        // 关闭statement
        closeStatement(statement);
    }
}



public PageResult> pageQueryMap(String querySql, Integer page, Integer size){
    // 1、替换分号
    querySql = querySql.replaceAll(";", "");
    String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql);
    log.info("=======>>>执行分页统计总数sql为:{}", countSql);
    // 查询总数
    final List> countMap = queryData(countSql);
    if (CollectionUtils.isEmpty(countMap)){
        return new PageResult<>(0L, new ArrayList<>());
    }

    long count = 0L;
    for (Object value : countMap.get(0).values()) {
        count = Long.parseLong(String.valueOf(value));
    }

    if (count == 0){
        return new PageResult<>(0L, new ArrayList<>());
    }

    // 执行分页查询 开启全表扫描
    final List> resultList = queryData(querySql, page, size);

    return new PageResult<>(count, resultList);
}



public PageResult pageQuery(String querySql, Integer page, Integer size, Class clazz){
    final PageResult> result = pageQueryMap(querySql, page, size);
    List rows = new ArrayList<>();
    for (Map row : result.getRows()) {
        final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
        rows.add(t);
    }
    return new PageResult<>(result.getTotal(), rows);
}
2.2.2 工具类测试

使用测试数据测试工具类

public static void main(String[] args) {
    final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();
    connParam.setAliyunAccessId("您的阿里云账号accessId");
    connParam.setAliyunAccessKey("您的阿里云账号accessKey");
    connParam.setProjectName("项目名");
    connParam.setEndpoint("http://service.cn-hangzhou.maxcompute.aliyun.com/api");
    final MaxComputeJdbcUtil jdbcUtil = new MaxComputeJdbcUtil(connParam);

    // 获取表信息
    final List tableInfos = jdbcUtil.getTableInfos();
    for (TableMetaInfo tableInfo : tableInfos) {
        System.out.println(tableInfo);
    }

    // 获取字段信息
    final String tableName = tableInfos.get(new Random().nextInt(tableInfos.size())).getTableName();
    final List fields = jdbcUtil.getFieldByTableName(tableName);
    for (TableColumnMetaInfo field : fields) {
        System.out.println(field.getFieldName() + "-" + field.getComment());
    }

    // 执行查询
    final List> list = jdbcUtil.queryData("select * from ods_cust;");
    for (Map map : list) {
        System.out.println(JSONObject.toJSONString(map));
    }

    // 执行分页查询
    final List> list2 = jdbcUtil.queryData("select * from ods_cust;", 2, 10);
    for (Map map : list2) {
        System.out.println(JSONObject.toJSONString(map));
    }

    // 执行分页查询 并返回count
    final PageResult> list3 = jdbcUtil.pageQueryMap("select * from ods_cust;", 2, 10);
    System.out.println(list3.getTotal());
    for (Map map : list3.getRows()) {
        System.out.println(JSONObject.toJSONString(map));
    }

    jdbcUtil.close();
}
项目地址

springboot集成maxCompute

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1039383.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号