throw new RuntimeException(“xxl-job method-jobhandler return-classtype invalid, for[” + bean.getClass() + “#” + method.getName() + "] , " +
“The correct method format like " public ReturnT
}
method.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException(“xxl-job method-jobhandler initMethod invalid, for[” + bean.getClass() + “#” + method.getName() + “] .”);
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException(“xxl-job method-jobhandler destroyMethod invalid, for[” + bean.getClass() + “#” + method.getName() + “] .”);
}
}
// registry jobhandler
//将任务存储在jobHandlerRepository对象中,后续下发任务使用
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}
}
}
之后执行super.start(),执行父类XxlJobExecutor的start方法。该方法主要有日志初始化,日志清理任务初始化,RPC调用触发器回调线程启动,调度中心列表初始化以及执行器端口初始化。
public void start() throws Exception { // init logpath
|
//初始化任务执行日志路径
XxlJobFileAppender.initLogPath(logPath);
// 日志定时清理任务
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化触发器回调线程(用RPC回调调度中心接口)
TriggerCallbackThread.getInstance().start();
//初始化调度中心列表
initAdminBizList( adminAddresses, accessToken);
// init executor-server 执行器端口启动
initEmbedServer(address, ip, port, appname, accessToken);
}
XxlJobFileAppender.initLogPath(logPath)和JobLogFileCleanThread.getInstance().start(logRetentionDays)主要对执行日志进行初始化,就不多解释了,直接往下看。
TriggerCallbackThread.getInstance().start();
public void start() {
|
// 调度中心注册表会否为空
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
// callback
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
// 监听阻塞队列
while(!toStop){
try {
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// 组装callback返回的参数
List
int drainTonum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// 执行回调
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// last callback
try {
List
int drainTonum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
}
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName(“xxl-job, executor TriggerCallbackThread”);
triggerCallbackThread.start();
// retry
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
while(!toStop){
try {
retryFailCallbackFile();
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
}
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();
}
doCallback(callbackParamList)如下
public ReturnT return XxlJobRemotingUtil.postBody(addressUrl+“api/callback”, accessToken, timeout, callbackParamList, String.class); } initAdminBizList( adminAddresses, accessToken); 初始化注册中心列表,用于后期和注册中心交互
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList(); } adminBizList.add(adminBiz); } } } } // init executor-server initEmbedServer(address, ip, port, appname, accessToken);<核心>
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { //初始化ip和端口,如果没有ip则自动获取本地ip port = port>0?port: NetUtil.findAvailablePort(9999); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); // generate address if (address==null || address.trim().length()==0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null address = “http://{ip_port}/”.replace("{ip_port}", ip_port_address); } // 启动服务 embedServer = new EmbedServer(); embedServer.start(address, port, appname, accessToken); } embedServer.start(address, port, appname, accessToken); 本质上是一个Netty服务,标准的Netty服务启动,我们只看EmbedHttpServerHandler,Netty处理请求的handler
executorBiz = new ExecutorBizImpl(); thread = new Thread(new Runnable() { @Override public void run() { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, 60L, TimeUnit.SECONDS, new linkedBlockingQueue new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, “xxl-rpc, EmbedServer bizThreadPool-” + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException(“xxl-job, EmbedServer bizThreadPool is EXHAUSTED!”); } }); try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind ChannelFuture future = bootstrap.bind(port).sync(); logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); //注册到调度中心 startRegistry(appname, address); // wait util stop future.channel().closeFuture().sync(); } catch (InterruptedException e) { if (e instanceof InterruptedException) { logger.info(">>>>>>>>>>> xxl-job remoting server stop."); } else { logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); } } finally { // stop try { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.start(); }
private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class); private ExecutorBiz executorBiz; //执行器 private String accessToken; //token private ThreadPoolExecutor bizThreadPool;//执行器线程池 public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) { this.executorBiz = executorBiz; this.accessToken = accessToken; this.bizThreadPool = bizThreadPool; } @Override protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { // request parse //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8);//获取请求数据 String uri = msg.uri(); HttpMethod httpMethod = msg.method(); boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); // invoke bizThreadPool.execute(new Runnable() { @Override public void run() { // 处理请求 Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // 格式化为JSON String responseJson = GsonTool.toJson(responseObj); // 写回客户端 writeResponse(ctx, keepAlive, responseJson); } }); } private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { // valid if (HttpMethod.POST != httpMethod) { return new ReturnT } if (uri==null || uri.trim().length()==0) { return new ReturnT } if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.equals(accessTokenReq)) { return new ReturnT } // services mapping try { //接收注册中心请求接口处理 if ("/beat".equals(uri)) { return executorBiz.beat(); } else if ("/idleBeat".equals(uri)) { IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); } else if ("/run".equals(uri)) { //注册中心执行接口 TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); } else if ("/kill".equals(uri)) { KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); } else if ("/log".equals(uri)) { LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); } else { return new ReturnT } } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT } } 我们主要看下run方法的执行过程
public ReturnT // 根据jobid加载对应的job执行信息,第一次执行为null JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;//根绝jobThread获取job处理handler String removeOldReason = null; // valid:jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); //获取任务类型 if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());//获取任务的执行器 // 校验新老job是否一致,不一致将老的进行初始化。有可能任务更新。通过jobid获取的是老的 if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread removeOldReason = “change jobhandler or glue type, and terminate the old job thread.”; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = newJobHandler; //将新处理handler赋值给老的 if (jobHandler == null) { return new ReturnT } } } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof GlueJobHandler && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change handler or gluesource updated, need kill old thread removeOldReason = “change job source or glue type, and terminate the old job thread.”; jobThread = null; jobHandler = null; } if (jobHandler == null) { try { IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT } } } else if (glueTypeEnum!=null && glueTypeEnum.isscript()) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof scriptJobHandler && ((scriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change script or gluesource updated, need kill old thread removeOldReason = “change job source or glue type, and terminate the old job thread.”; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHa 【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】 浏览器打开:qq.cn.hn/FTf 免费领取 ndler = new scriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); } } else { return new ReturnT } // executor block strategy if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = “block strategy effect:” + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } } // 如果jobThread 为null,则将任务信息注册到jobThreadRepository对象进行缓存,并启动线程 if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } //将任务推送到自己现成的任务队列中区 ReturnT return pushResult; } 看下registJobThread方法,该方法主要是根据任务信息,创建一个jobThread,之后启动该线程。然后将其缓存到jobThreadRepository中。如果存在老的任务,则将老的任务停掉。
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler);//创建新得jobThread对象 newJobThread.start();//启动线程 logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); //将新的jobThread放入map中,并弹出老的。 JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map’s put method return the old value!!! if (oldJobThread != null) { //将老的停掉 oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; } 该线程执行如下,主要就是获取队列中的任务,然后通过新建FutureTask线程执行任务。之后将执行结果推到TriggerCallbackThread的队列中。通过TriggerCallbackThread推到任务调度中心,进行记录结果信息。
public void run() { // init try { handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(!toStop){ running = false; idleTimes++; TriggerParam triggerParam = null; Java相关栏目本月热门文章
热门相关搜索
路由器设置
木托盘
宝塔面板
儿童python教程
心情低落
朋友圈
vim
双一流学科
专升本
我的学校
日记学校
西点培训学校
汽修学校
情书
化妆学校
塔沟武校
异形模板
西南大学排名
最精辟人生短句
6步教你追回被骗的钱
南昌大学排名
清朝十二帝
北京印刷学院排名
北方工业大学排名
北京航空航天大学排名
首都经济贸易大学排名
中国传媒大学排名
首都师范大学排名
中国地质大学(北京)排名
北京信息科技大学排名
中央民族大学排名
北京舞蹈学院排名
北京电影学院排名
中国戏曲学院排名
河北政法职业学院排名
河北经贸大学排名
天津中德应用技术大学排名
天津医学高等专科学校排名
天津美术学院排名
天津音乐学院排名
天津工业大学排名
北京工业大学耿丹学院排名
北京警察学院排名
天津科技大学排名
北京邮电大学(宏福校区)排名
北京网络职业学院排名
北京大学医学部排名
河北科技大学排名
河北地质大学排名
河北体育学院排名
|



