一、XXL-JOB 版本:
com.xuxueli xxl-job-core2.2.0
每个版本差异比较大,大家可以尝试下载几个版本,阅读一下源码,看别人的写法也是一种享受。
二、源码分析:
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
private static ApplicationContext applicationContext;
public XxlJobSpringExecutor() {
}
public void afterSingletonsInstantiated() {
//代码入口,bean完成初装配后,织入的代码
this.initJobHandlerMethodRepository(applicationContext);
GlueFactory.refreshInstance(1);
try {
super.start();//調用XxlJobExecutor.start()方法
} catch (Exception var2) {
throw new RuntimeException(var2);
}
}
public void destroy() {
super.destroy();
}
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext != null) {
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
String[] var3 = beanDefinitionNames;
int var4 = beanDefinitionNames.length;
label86:
for(int var5 = 0; var5 < var4; ++var5) {
String beanDefinitionName = var3[var5];
Object bean = applicationContext.getBean(beanDefinitionName);
Map annotatedMethods = null;
try {
//查找方法上有@XxlJob注解的方法
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new metadataLookup() {
public XxlJob inspect(Method method) {
return (XxlJob)AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable var19) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", var19);
}
if (annotatedMethods != null && !annotatedMethods.isEmpty()) {
Iterator var9 = annotatedMethods.entrySet().iterator();
while(true) {
Method method;
XxlJob xxlJob;
do {
if (!var9.hasNext()) {
continue label86;
}
Entry methodXxlJobEntry = (Entry)var9.next();
method = (Method)methodXxlJobEntry.getKey();
xxlJob = (XxlJob)methodXxlJobEntry.getValue();
} while(xxlJob == null);
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
//判段是否已經存在相同的處理器
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
if (method.getParameterTypes().length != 1 || !method.getParameterTypes()[0].isAssignableFrom(String.class)) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , The correct method format like " public ReturnT execute(String param) " .");
}
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , The correct method format like " public ReturnT execute(String param) " .");
}
method.setAccessible(true);
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException var18) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException var17) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
注冊到job處理容器中
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}
}
}
}
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//第一步,容器启动完成,初始化上下文对象
XxlJobSpringExecutor.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}
重點查看XxlJobExecutor的start()方法:
public class XxlJobExecutor {
private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
private String adminAddresses;
private String accessToken;
private String appname;
private String address;
private String ip;
private int port;
private String logPath;
private int logRetentionDays;
private static List adminBizList;
private EmbedServer embedServer = null;
private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap();
private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap();
public XxlJobExecutor() {
}
public void setAdminAddresses(String adminAddresses) {
this.adminAddresses = adminAddresses;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public void setAppname(String appname) {
this.appname = appname;
}
public void setAddress(String address) {
this.address = address;
}
public void setIp(String ip) {
this.ip = ip;
}
public void setPort(int port) {
this.port = port;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public void setLogRetentionDays(int logRetentionDays) {
this.logRetentionDays = logRetentionDays;
}
public void start() throws Exception {
XxlJobFileAppender.initLogPath(this.logPath);
this.initAdminBizList(this.adminAddresses, this.accessToken);
JobLogFileCleanThread.getInstance().start((long)this.logRetentionDays);
TriggerCallbackThread.getInstance().start();
//使用netty起用服務端监听
this.initEmbedServer(this.address, this.ip, this.port, this.appname, this.accessToken);
}
public void destroy() {
this.stopEmbedServer();
if (jobThreadRepository.size() > 0) {
Iterator var1 = jobThreadRepository.entrySet().iterator();
while(var1.hasNext()) {
Entry item = (Entry)var1.next();
JobThread oldJobThread = removeJobThread((Integer)item.getKey(), "web container destroy and kill the job.");
if (oldJobThread != null) {
try {
oldJobThread.join();
} catch (InterruptedException var5) {
logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), var5);
}
}
}
jobThreadRepository.clear();
}
jobHandlerRepository.clear();
JobLogFileCleanThread.getInstance().toStop();
TriggerCallbackThread.getInstance().toStop();
}
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses != null && adminAddresses.trim().length() > 0) {
String[] var3 = adminAddresses.trim().split(",");
int var4 = var3.length;
for(int var5 = 0; var5 < var4; ++var5) {
String address = var3[var5];
if (address != null && address.trim().length() > 0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList();
}
adminBizList.add(adminBiz);
}
}
}
}
public static List getAdminBizList() {
return adminBizList;
}
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
port = port > 0 ? port : NetUtil.findAvailablePort(9999);
ip = ip != null && ip.trim().length() > 0 ? ip : IpUtil.getIp();
if (address == null || address.trim().length() == 0) {
String ip_port_address = IpUtil.getIpPort(ip, port);
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
this.embedServer = new EmbedServer();
this.embedServer.start(address, port, appname, accessToken);
}
private void stopEmbedServer() {
try {
this.embedServer.stop();
} catch (Exception var2) {
logger.error(var2.getMessage(), var2);
}
}
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return (IJobHandler)jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name) {
return (IJobHandler)jobHandlerRepository.get(name);
}
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
JobThread oldJobThread = (JobThread)jobThreadRepository.put(jobId, newJobThread);
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
return newJobThread;
}
public static JobThread removeJobThread(int jobId, String removeOldReason) {
JobThread oldJobThread = (JobThread)jobThreadRepository.remove(jobId);
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
return oldJobThread;
} else {
return null;
}
}
public static JobThread loadJobThread(int jobId) {
JobThread jobThread = (JobThread)jobThreadRepository.get(jobId);
return jobThread;
}
}
public class EmbedServer {
private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
private ExecutorBiz executorBiz;
private Thread thread;
public EmbedServer() {
}
//启用netty服务
public void start(final String address, final int port, final String appname, final String accessToken) {
this.executorBiz = new ExecutorBizImpl();
this.thread = new Thread(new Runnable() {
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0, 200, 60L, TimeUnit.SECONDS, new linkedBlockingQueue(2000), new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
}, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
ServerBootstrap bootstrap = new ServerBootstrap();
((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer() {
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new EmbedServer.EmbedHttpServerHandler(EmbedServer.this.executorBiz, accessToken, bizThreadPool)});
}
}).childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
EmbedServer.logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
EmbedServer.this.startRegistry(appname, address);
future.channel().closeFuture().sync();
} catch (InterruptedException var14) {
if (var14 instanceof InterruptedException) {
EmbedServer.logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
EmbedServer.logger.error(">>>>>>>>>>> xxl-job remoting server error.", var14);
}
} finally {
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception var13) {
EmbedServer.logger.error(var13.getMessage(), var13);
}
}
}
});
this.thread.setDaemon(true);
this.thread.start();
}
public void stop() throws Exception {
if (this.thread != null && this.thread.isAlive()) {
this.thread.interrupt();
}
this.stopRegistry();
logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");
}
public void startRegistry(String appname, String address) {
ExecutorRegistryThread.getInstance().start(appname, address);
}
public void stopRegistry() {
ExecutorRegistryThread.getInstance().toStop();
}
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler {
private static final Logger logger = LoggerFactory.getLogger(EmbedServer.EmbedHttpServerHandler.class);
private ExecutorBiz executorBiz;
private String accessToken;
private ThreadPoolExecutor bizThreadPool;
public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz;
this.accessToken = accessToken;
this.bizThreadPool = bizThreadPool;
}
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
final String requestData = msg.content().toString(CharsetUtil.UTF_8);
final String uri = msg.uri();
final HttpMethod httpMethod = msg.method();
final boolean keepAlive = HttpUtil.isKeepAlive(msg);
final String accessTokenReq = msg.headers().get("XXL-JOB-ACCESS-TOKEN");
this.bizThreadPool.execute(new Runnable() {
public void run() {
Object responseObj = EmbedHttpServerHandler.this.process(httpMethod, uri, requestData, accessTokenReq);
String responseJson = GsonTool.toJson(responseObj);
EmbedHttpServerHandler.this.writeResponse(ctx, keepAlive, responseJson);
}
});
}
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
if (HttpMethod.POST != httpMethod) {
return new ReturnT(500, "invalid request, HttpMethod not support.");
} else if (uri != null && uri.trim().length() != 0) {
if (this.accessToken != null && this.accessToken.trim().length() > 0 && !this.accessToken.equals(accessTokenReq)) {
return new ReturnT(500, "The access token is wrong.");
} else {
try {
if ("/beat".equals(uri)) {
return this.executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = (IdleBeatParam)GsonTool.fromJson(requestData, IdleBeatParam.class);
return this.executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = (TriggerParam)GsonTool.fromJson(requestData, TriggerParam.class);
return this.executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = (KillParam)GsonTool.fromJson(requestData, KillParam.class);
return this.executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = (LogParam)GsonTool.fromJson(requestData, LogParam.class);
return this.executorBiz.log(logParam);
} else {
return new ReturnT(500, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception var6) {
logger.error(var6.getMessage(), var6);
return new ReturnT(500, "request error:" + ThrowableUtil.toString(var6));
}
}
} else {
return new ReturnT(500, "invalid request, uri-mapping empty.");
}
}
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.writeAndFlush(response);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
ctx.close();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.channel().close();
logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
} else {
super.userEventTriggered(ctx, evt);
}
}
}
}
配置文件:
# 调度中心部署跟地址:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调" xxl.job.admin.addresses=http://192.168.115.128:8080/xxl-job-admin # 执行器"AppName"和地址信息配置:AppName执行器心跳注册分组依据;地址信息用于"调度中心请求并触发任务"和"执行器注册"。执行器默认端口为9999,执行器IP默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用。单机部署多个执行器时,注意要配置不同执行器端口 xxl.job.executor.appname=xxl-job-spring xxl.job.executor.ip=192.168.0.107 xxl.job.executor.port=9999 # 执行器通讯TOKEN,非空时启用 xxl.job.accessToken= # 执行器运行日志文件存储的磁盘位置,需要对该路径拥有读写权限 xxl.job.executor.logpath=D: # 执行器Log文件定期清理功能,指定日志保存天数,日志文件过期自动删除。限制至少保持3天,否则功能不生效; xxl.job.executor.logretentiondays=-1
代码示范:
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
//import com.xxl.job.core.handler.annotation.JobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import org.springframework.stereotype.Component;
@Component
public class DemoJobHandler extends IJobHandler {
@Override
@XxlJob(value = "testJobHandler")
public ReturnT execute(String param) {
XxlJobLogger.log("XXL-JOB, testJobHandler.");
System.out.println("XXL-JOB测试");
return SUCCESS;
}
}



