栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

多线程分批次查询数据

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

多线程分批次查询数据

多线程分批次查询数据
  • 在springboot+mybaties系统中结合多线程实现分批次查询数据

在springboot+mybaties系统中结合多线程实现分批次查询数据

需求: 在系统开发和对接过程中,常常出现大数据量获取的情况,比如你是A系统,去获取B系统的数据,B系统只是给你接口。并且返回只能返回10w条数据,但是实际上你查出来的量到达100w甚至上千万,那么怎么办呢?其实核心思想大家都应该猜的到那就是多线程分批次查询,比如100w分十个线程,然后是一个线程查询10w数据。那么接下来我就用最简单的具体代码例子给大家实践一下吧,话不多说,上Code!
最核心的代码如下,实际应用时把这个多线程放到你要的接口里面即可

package cc.mrbird.febs.system.controller;

import cc.mrbird.febs.common.annotation.Log;
import cc.mrbird.febs.common.controller.baseController;
import cc.mrbird.febs.common.domain.QueryRequest;
import cc.mrbird.febs.common.exception.FebsException;
import cc.mrbird.febs.common.utils.MD5Util;
import cc.mrbird.febs.common.utils.ThredQuery;
import cc.mrbird.febs.system.dao.UserMapper;
import cc.mrbird.febs.system.domain.User;
import cc.mrbird.febs.system.domain.UserConfig;
import cc.mrbird.febs.system.service.UserConfigService;
import cc.mrbird.febs.system.service.UserService;
import cc.mrbird.febs.system.service.impl.UserServiceImpl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Maps;
import com.wuwenze.poi.ExcelKit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

@Slf4j
@Validated
@RestController
@RequestMapping("user")
public class UserController extends baseController {

    private String message;

    @Autowired
    private UserServiceImpl userService;
    @Autowired
    UserMapper userMapper;
    @Autowired
    private UserConfigService userConfigService;
    //创建线程池
    //创建线程 7个参数
    //private static ExecutorService executorService = Executors.newFixedThreadPool(5);
    //1.corePoolSize 线程池种的的线程数量
    //2.maxmumPoolSize 线程池种最大的线程数量
    //3.keepAliveTime 线程池的数量大于线程数量时,多余的线程会在多长时间内销毁 一般设置0
    //4.TimeUnit keepAlive的时间单位 一般设置分钟  TimeUnit.MILLISECONDS
    //5.workQueue:任务队列,被提交但是未被执行的任务 一般设置10
    //6.threadFactory:线程工厂, 一般设置默认值  Executors.defaultThreadFactory(),
    //7.handler:拒绝略,任务太多来不及处理,如何拒绝  也设置 ThreadPoolExecutor.DiscardPolicy()
    //四种拒绝策略分别是:
    //          7.1 AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 (默认)
    //          7.2 DiscardPolicy:也是丢弃任务,但是不抛出异常。
    //          7.3 DiscardOldestPolicy:忽略最早的任务(把最早添加任务到队列的任务忽略掉,然后执行当前的任务)
    //          7.4 CallerRunsPolicy:把超出的任务交给当前线程执行
    ExecutorService executorService=new ThreadPoolExecutor(
            5,
            5, 0l,
            TimeUnit.MILLISECONDS,
            new linkedBlockingDeque(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardPolicy());

    
    @PostMapping("queryUserByThread")
    public Map queryUserByThread(QueryRequest queryRequest, User user) throws FebsException {
        //Page page=new Page();
        //page.setSize(10);
        //page.setCurrent(1);
        //userMapper.findUserDetail(page, user);
        try {
                log.error("start....");
                long start = System.currentTimeMillis();
                List result = new ArrayList<>();//返回结果
                //查询数据库总数量
                int count =userMapper.countUser();
                int num = 10;//一次查询多少条
                //需要查询的次数
                int times = count / num;
                if (count % num != 0) {
                    times = times + 1;
                }
                int bindex = 1;
                //Callable用于产生结果
                List>> tasks = new ArrayList>>();
                for (int i = 0; i < times; i++) {
                    Callable> qfe = new ThredQuery(userService,user,bindex,num);
                    tasks.add(qfe);
                    bindex += bindex;
                }
                //定义固定长度的线程池  防止线程过多
                ExecutorService executorService=new ThreadPoolExecutor(
                        15,
                        30, 0l,
                        TimeUnit.MILLISECONDS,
                        new linkedBlockingDeque(10),
                        Executors.defaultThreadFactory(),
                        new ThreadPoolExecutor.DiscardPolicy());
                //Future用于获取结果
                List>> futures = executorService.invokeAll(tasks);
                //处理线程返回结果
                if (futures != null && futures.size() > 0) {
                    for (Future> future : futures) {
                        result.addAll(future.get());
                    }
                }
                executorService.shutdown();//关闭线程池
                long end = System.currentTimeMillis();
                System.out.println("线程查询数据用时:" + (end - start) + "ms");

            System.out.println("查询完成!");
            HashMap map= Maps.newHashMap();
            map.put("list",result);
            return map;
        } catch (Exception e) {
            message = "查询失败";
            log.error(message, e);
            throw new FebsException(message);
        }
    }
}

首先我们需要创建一个多线程的查询的对象, ThredQuery.java.。其中需要用到通过ApplicationContext获取bean,

package cc.mrbird.febs.common.utils;


import cc.mrbird.febs.system.domain.User;
import cc.mrbird.febs.system.service.impl.UserServiceImpl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.context.ApplicationContext;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

public class ThredQuery implements Callable> {
    
    private ApplicationContext ac = SpringContextUtil.getApplicationContext();
    private UserServiceImpl userServiceimpl;//需要通过够方法把对应的业务service传进来 实际用的时候把类型变为对应的类型
    private User user;//查询条件 根据条件来定义该类的属性
    private int bindex;//分页 页面
    private int num;//数量


    
    public ThredQuery(UserServiceImpl userServiceimpl, User user, int bindex, int num) {
        this.userServiceimpl = userServiceimpl;
        this.user = user;
        this.bindex = bindex;
        this.num = num;
    }

    @Override
    public List call() throws Exception {
        //通过service查询得到对应结果
        List list = new ArrayList<>();
        //需要调用的业务方法(方法名称,参数类型)
        //获取业务接口bean
        userServiceimpl = (UserServiceImpl) ac.getBean("userService");
        Method idMethod = userServiceimpl.getClass().getMethod("findUserThread", Page.class);
        Page page=new Page();
        page.setSize(num);
        page.setCurrent(bindex);
        //调用业务方法查询数据,返回集合
        Object invoke = idMethod.invoke(userServiceimpl, page);
        IPage userPageInof=(IPage)invoke;
        List records = userPageInof.getRecords();
        //上面的反射本质实现这行代码分页查询  : userService.findUserThread(page)
        //至于为什么不能直接写 userService.findUserThread(page)可能和 实现 Callable接口或者多线程相关,希望有哪位大佬能为我解答
        return records;
    }
}

SpringContextUtil.java.的代码如下:

package cc.mrbird.febs.common.utils;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;


@SuppressWarnings({"unused"})
@Component
public class SpringContextUtil implements ApplicationContextAware {

  public static final Logger logger = LoggerFactory.getLogger(SpringContextUtil.class);

  private static ApplicationContext applicationContext;

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      this.applicationContext = applicationContext;
      logger.info("applicationContext:" + SpringContextUtil.applicationContext);
  }

  public static ApplicationContext getApplicationContext() {
      return applicationContext;
  }

  public static Object getBean(String name) {
      return getApplicationContext().getBean(name);
  }

  public static  T getBean(Class clazz) {
      return getApplicationContext().getBean(clazz);
  }

  public static  T getBean(String name, Class clazz) {
      return getApplicationContext().getBean(name, clazz);
  }

}

然后需要编写调用的查询接口 UserService.java 和实现类 UserServiceImpl.java。还有查询的对应表的对象User.java 和对应的 xml文件如下

package cc.mrbird.febs.system.service;

import cc.mrbird.febs.common.domain.QueryRequest;
import cc.mrbird.febs.system.domain.User;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;

import java.util.List;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;


public interface UserService extends IService {

    
    List queryUserByThread(User user);
    
    
    IPage findUserDetail(User user, QueryRequest queryRequest);
}
package cc.mrbird.febs.system.domain;

import cc.mrbird.febs.common.converter.TimeConverter;
import cc.mrbird.febs.common.domain.RegexpConstant;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.wuwenze.poi.annotation.Excel;
import com.wuwenze.poi.annotation.ExcelField;
import lombok.Data;
import lombok.ToString;

import javax.validation.constraints.Email;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.Pattern;
import javax.validation.constraints.Size;
import java.io.Serializable;
import java.util.Date;

@Data
@TableName("t_user")
@Excel("用户信息表")
public class User implements Serializable {

    private static final long serialVersionUID = -4852732617765810959L;
    
    public static final String STATUS_VALID = "1";

    public static final String STATUS_LOCK = "0";

    public static final String DEFAULT_AVATAR = "default.jpg";

    
    public static final String SEX_MALE = "0";

    public static final String SEX_FEMALE = "1";

    public static final String SEX_UNKNOW = "2";

    // 默认密码
    public static final String DEFAULT_PASSWORD = "1234qwer";

    @TableId(value = "USER_ID", type = IdType.AUTO)
    private Long userId;

    @Size(min = 4, max = 10, message = "{range}")
    @ExcelField(value = "用户名")
    private String username;

    private String password;

    private Long deptId;

    @ExcelField(value = "部门")
    private transient String deptName;

    @Size(max = 50, message = "{noMoreThan}")
    @Email(message = "{email}")
    @ExcelField(value = "邮箱")
    private String email;

    @Pattern(regexp = RegexpConstant.MOBILE_REG, message = "{mobile}")
    @ExcelField(value = "手机号")
    private String mobile;

    @NotBlank(message = "{required}")
    @ExcelField(value = "状态", writeConverterExp = "0=锁定,1=有效")
    private String status;

    @ExcelField(value = "创建时间", writeConverter = TimeConverter.class)
    private Date createTime;

    private Date modifyTime;

    @ExcelField(value = "最后登录时间", writeConverter = TimeConverter.class)
    private Date lastLoginTime;

    @NotBlank(message = "{required}")
    @ExcelField(value = "性别", writeConverterExp = "0=男,1=女,2=保密")
    private String ssex;

    @Size(max = 100, message = "{noMoreThan}")
    @ExcelField(value = "个人描述")
    private String description;

    private String avatar;

    @NotBlank(message = "{required}")
    private transient String roleId;
    @ExcelField(value = "角色")
    private transient String roleName;

    // 排序字段
    private transient String sortField;

    // 排序规则 ascend 升序 descend 降序
    private transient String sortOrder;

    private transient String createTimeFrom;
    private transient String createTimeTo;

    private transient String id;

    
    public Long getAuthCacheKey() {
        return userId;
    }
}

```javascript
package cc.mrbird.febs.system.dao;

import cc.mrbird.febs.system.domain.User;
import com.baomidou.mybatisplus.core.mapper.baseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;

import java.util.List;

public interface UserMapper extends baseMapper {

    IPage findUserDetail(Page page, @Param("user") User user);

    
    User findDetail(String username);

    List findByPage(int pageNum, int threadSize);

    int countUser();
}



    
       SELECT * FROM t_user  WHERe id>=
       (SELECt id FROM t_user LIMIT #{offset},1)  LIMIT #{pageSize};
    


    

最后留给大家一个问题就说在写多线程查询对象实现callable接口重写里面的call方法时,必须这样按照反射去写实现查询功能,为什么不能直接写 userService.findUserThread(page)?个人猜测和多线程相关知识有关,希望哪位大佬可以详细的解答一下,欢迎各位同学和同僚在评论区中评论!

  @Override
    public List call() throws Exception {
        //通过service查询得到对应结果
        List list = new ArrayList<>();
        //需要调用的业务方法(方法名称,参数类型)
        //获取业务接口bean
        userServiceimpl = (UserServiceImpl) ac.getBean("userService");
        Method idMethod = userServiceimpl.getClass().getMethod("findUserThread", Page.class);
        Page page=new Page();
        page.setSize(num);
        page.setCurrent(bindex);
        //调用业务方法查询数据,返回集合
        Object invoke = idMethod.invoke(userServiceimpl, page);
        IPage userPageInof=(IPage)invoke;
        List records = userPageInof.getRecords();
        //上面的反射本质实现这行代码分页查询  : userService.findUserThread(page)
        //至于为什么不能直接写 userService.findUserThread(page)可能和 实现 Callable接口或者多线程相关,希望有哪位大佬能处理解答
        return records;
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582082.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号