栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

线程池使用,kafka工作原理

线程池使用,kafka工作原理

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

executor.initialize();

return executor;

}

@Override

public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {// 异步任务中异常处理

return new AsyncUncaughtExceptionHandler() {

@Override

public void handleUncaughtException(Throwable arg0, Method arg1, Object… arg2) {

log.error("====" + arg0.getMessage() + "=", arg0);

log.error(“exception method:” + arg1.getName());

}

};

}

}

2.2、TaskExecutePool.java

package com.sgcc.dlsc.demoa.config.threadpool;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration

@Slf4j

@EnableAsync

public class TaskExecutePool {

public final static String TaskExecutor = “TaskExecutor”;

@Autowired

private TaskThreadPoolConfig config;

@Bean(name = TaskExecutePool.TaskExecutor)

public Executor gatewayTaskExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

log.debug(config.toString());

executor.setCorePoolSize(config.getCorePoolSize());

executor.setMaxPoolSize(config.getMaxPoolSize());

executor.setQueueCapacity(config.getQueueCapacity());

executor.setKeepAliveSeconds(config.getKeepAliveSeconds());

executor.setThreadNamePrefix(config.getNamePrefix());

// rejection-policy:当pool已经达到max size的时候,如何处理新任务

// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//执行初始化

executor.initialize();

return executor;

}

}

2.3、TaskThreadPoolConfig.java

package com.sgcc.dlsc.demoa.config.threadpool;

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Configuration;

@Data

@Configuration

@ConfigurationProperties(prefix = “spring.threadpool”) // 该注解的locations已经被启用,现在只要是在环境中,都会优先加载

public class TaskThreadPoolConfig {

private int corePoolSize;

private int maxPoolSize;

private int keepAliveSeconds;

private int queueCapacity;

private String namePrefix;

}

3、使用示范


3.1、编写service接口

package com.sgcc.dlsc.demoa.service;

public interface ThreadPoolService {

void testThreadPool();

}

3.2、编写实现类

package com.sgcc.dlsc.demoa.service.impl;

import com.sgcc.dlsc.demoa.config.threadpool.TaskExecutePool;

import com.sgcc.dlsc.demoa.service.ThreadPoolService;

import lombok.extern.slf4j.Slf4j;

import org.springframework.scheduling.annotation.Async;

import org.springframework.stereotype.Service;

@Slf4j

@Service

public class ThreadPoolServiceImpl implements ThreadPoolService {

@Async(TaskExecutePool.TaskExecutor)

@Override

public void testThreadPool() {

log.info(“我正在处理任务中”);

try {

Thread.sleep(15000);

} catch (InterruptedException e) {

e.printStackTrace();

}

log.info(“任务处理完成”);

}

}

4、调用测试


package com.sgcc.dlsc.demoa.controller;

import com.sgcc.comm.util.ResultUtil;

import com.sgcc.dlsc.demoa.service.ThreadPoolService;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestMapping;

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673484.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号