栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

JAVA 千万级数据多线程保存入库

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

JAVA 千万级数据多线程保存入库

创建一个抽象类,以便继承次接口

package com.mm.fa.data.process.bigt.utils;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.linkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@Data
public abstract class BigDataInsert {

    @Resource
    JdbcTemplate jdbcTemplate;

    int groupCount = 2000;

    int threadPoolCount = 5;

    // 创建一个固定大小的线程池
    private ExecutorService service = null;

    public void insertBigData(List list, String sql) {
        // 创建线程池对象
        service = Executors.newFixedThreadPool(threadPoolCount);
        // 将需保存集合分组
        List> listList = new ArrayList<>();
        if (list.size() > groupCount) {
            listList = fixedGrouping(list, groupCount);
        } else {
            listList.add(list);
        }

        //循环10次,每次十万数据,一共100万
        for (int i = 0; i < listList.size(); i++) {
            int finalI = i;
            List> finalListList = listList;
            // 多线程保存
            service.execute(() -> {

                StringBuilder sb = new StringBuilder();
                sb.append(sql);
                List finalList = new linkedList<>();
                StringBuilder spliceS = new StringBuilder();
                finalListList.get(finalI).stream().forEach(item -> {
                    spliceS.append("(");
                    List resultList = pstmToSetValue(item);
                    String join = String.join(",", resultList);
                    spliceS.append(join).append(")");
                    finalList.add(spliceS.toString());
                    spliceS.setLength(0);
                });
                String valueSql = String.join(",", finalList);
                sb.append(valueSql);
                try {
                    this.jdbcTemplate.execute(sb.toString());
                    log.info("insert into data successfully...");
                } catch (Exception e) {
                    log.error("insert into data error...{}", e.getMessage());
                }
            });
        }

        service.shutdown();
    }

    
    public static  List> fixedGrouping(List source, int n) {

        if (null == source || source.size() == 0 || n <= 0) {
            return null;
        }
        List> result = new ArrayList>();
        int remainder = source.size() % n;
        int size = (source.size() / n);
        for (int i = 0; i < size; i++) {
            List subset = null;
            subset = source.subList(i * n, (i + 1) * n);
            result.add(subset);
        }
        if (remainder > 0) {
            List subset = null;
            subset = source.subList(size * n, size * n + remainder);
            result.add(subset);
        }
        return result;
    }

    public abstract List pstmToSetValue(T dto);

}

Servcice接口 继承 抽象类

package com.mm.fa.data.api.service;

import cn.hutool.core.collection.CollectionUtil;
import com.mm.fa.common.Result;
import com.mm.fa.data.process.bigt.utils.BigDataInsert;
import com.mm.fa.ext.supplyChain.dto.ForecastSalesDTO;
import com.mm.fa.ext.supplyChain.mapper.Supplier360DataMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.linkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Service
@Slf4j
public class SupplierSalesDataService extends BigDataInsert> {


    @Resource
    private Supplier360DataMapper mapper;

    String SQL = "INSERT INTO supply_chain.t_supplier_sales_data(`manufacturer` , `product_code` , `product_name`, `" +
            "quantity`, `unit_price`) VALUES ";

    @Override
    public List pstmToSetValue(Map dto) {
        List list = new linkedList<>();
        list.add("'" + dto.get("manufacturer") + "'");
        list.add("'" + dto.get("productCode") + "'");
        list.add("'" + dto.get("productName") + "'");
        list.add("'" + dto.get("quantity") + "'");
        list.add("'" + dto.get("unitPrice") + "'");
        return list;
    }

    public Result saveSupplierSalesData() {

        List> resultList = new ArrayList<>();
        List strList = mapper.getSupplierNameList();
        if (CollectionUtil.isNotEmpty(strList)) {
            List>>> tasks = new ArrayList<>();//添加任务
            for (String str : strList) {
                Callable>> qfe = () -> {
                    try {
                        List> newDataList = mapper.selectSupplierSalesData(str);
                        return newDataList;
                    } catch (Exception e) {
                        log.error("供应商销售情况中间表多线程查询调用失败:" + e);
                        throw new Exception("供应商销售情况中间表多线程查询调用异常!");
                    }
                };
                tasks.add(qfe);
            }

            try {
                //定义固定长度的线程池  防止线程过多
                ExecutorService execService = Executors.newFixedThreadPool(100);

                List>>> futures = execService.invokeAll(tasks);
                // 处理线程返回结果
                if (futures != null && futures.size() > 0) {
                    for (Future>> future : futures) {
                        if (CollectionUtil.isNotEmpty(future.get())) {
                            resultList.addAll(future.get());
                        }
                    }
                }
                execService.shutdown();
            } catch (Exception e) {
                log.error("供应商销售情况中间表获取数据失败:" + e);
            }
        }

        if(CollectionUtil.isNotEmpty(resultList)){
            log.info("供应商销售情况中间表获取数据{}", resultList.size());

            //删除库里数据
            mapper.deleteAll("t_supplier_sales_data");
            log.info("数据删除成功!");

            //保存数据
            log.info(" 供应商销售情况中间表 开启多线程数据保存!");
            insertBigData(resultList, SQL);
        }

         return Result.create();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/759793.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号