栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

XXL-Job启动源码详解,java入门电子书pdf

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

XXL-Job启动源码详解,java入门电子书pdf

throw new RuntimeException(“xxl-job method-jobhandler return-classtype invalid, for[” + bean.getClass() + “#” + method.getName() + "] , " +

“The correct method format like " public ReturnT execute(String param) " .”);

}

method.setAccessible(true);

// init and destory

Method initMethod = null;

Method destroyMethod = null;

if (xxlJob.init().trim().length() > 0) {

try {

initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());

initMethod.setAccessible(true);

} catch (NoSuchMethodException e) {

throw new RuntimeException(“xxl-job method-jobhandler initMethod invalid, for[” + bean.getClass() + “#” + method.getName() + “] .”);

}

}

if (xxlJob.destroy().trim().length() > 0) {

try {

destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());

destroyMethod.setAccessible(true);

} catch (NoSuchMethodException e) {

throw new RuntimeException(“xxl-job method-jobhandler destroyMethod invalid, for[” + bean.getClass() + “#” + method.getName() + “] .”);

}

}

// registry jobhandler

//将任务存储在jobHandlerRepository对象中,后续下发任务使用

registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));

}

}

}

之后执行super.start(),执行父类XxlJobExecutor的start方法。该方法主要有日志初始化,日志清理任务初始化,RPC调用触发器回调线程启动,调度中心列表初始化以及执行器端口初始化。

public void start() throws Exception { // init logpath

//初始化任务执行日志路径

XxlJobFileAppender.initLogPath(logPath);

// 日志定时清理任务

JobLogFileCleanThread.getInstance().start(logRetentionDays);

// 初始化触发器回调线程(用RPC回调调度中心接口)

TriggerCallbackThread.getInstance().start();

//初始化调度中心列表

initAdminBizList( adminAddresses, accessToken);

// init executor-server 执行器端口启动

initEmbedServer(address, ip, port, appname, accessToken);

}

XxlJobFileAppender.initLogPath(logPath)和JobLogFileCleanThread.getInstance().start(logRetentionDays)主要对执行日志进行初始化,就不多解释了,直接往下看。

TriggerCallbackThread.getInstance().start();

public void start() {

// 调度中心注册表会否为空

if (XxlJobExecutor.getAdminBizList() == null) {

logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");

return;

}

// callback

triggerCallbackThread = new Thread(new Runnable() {

@Override

public void run() {

// 监听阻塞队列

while(!toStop){

try {

HandleCallbackParam callback = getInstance().callBackQueue.take();

if (callback != null) {

// 组装callback返回的参数

List callbackParamList = new ArrayList();

int drainTonum = getInstance().callBackQueue.drainTo(callbackParamList);

callbackParamList.add(callback);

// 执行回调

if (callbackParamList!=null && callbackParamList.size()>0) {

doCallback(callbackParamList);

}

}

} catch (Exception e) {

if (!toStop) {

logger.error(e.getMessage(), e);

}

}

}

// last callback

try {

List callbackParamList = new ArrayList();

int drainTonum = getInstance().callBackQueue.drainTo(callbackParamList);

if (callbackParamList!=null && callbackParamList.size()>0) {

doCallback(callbackParamList);

}

} catch (Exception e) {

if (!toStop) {

logger.error(e.getMessage(), e);

}

}

logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

}

});

triggerCallbackThread.setDaemon(true);

triggerCallbackThread.setName(“xxl-job, executor TriggerCallbackThread”);

triggerCallbackThread.start();

// retry

triggerRetryCallbackThread = new Thread(new Runnable() {

@Override

public void run() {

while(!toStop){

try {

retryFailCallbackFile();

} catch (Exception e) {

if (!toStop) {

logger.error(e.getMessage(), e);

}

}

try {

TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

} catch (InterruptedException e) {

if (!toStop) {

logger.error(e.getMessage(), e);

}

}

}

logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");

}

});

triggerRetryCallbackThread.setDaemon(true);

triggerRetryCallbackThread.start();

}

doCallback(callbackParamList)如下

private void doCallback(List callbackParamList){

boolean callbackRet = false;

// 向所有的调度中心发送回调信息

for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {

try {

//本质上是调用注册中心的api/callback接口。记录调用结果。

ReturnT callbackResult = adminBiz.callback(callbackParamList);

if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {

callbackLog(callbackParamList, “
----------- xxl-job job callback finish.”);

callbackRet = true;

break;

} else {

callbackLog(callbackParamList, “
----------- xxl-job job callback fail, callbackResult:” + callbackResult);

}

} catch (Exception e) {

callbackLog(callbackParamList, “
----------- xxl-job job callback error, errorMsg:” + e.getMessage());

}

}

if (!callbackRet) {

appendFailCallbackFile(callbackParamList);

}

}

adminBiz.callback(callbackParamList)

调用注册中心api接口

@Override

public ReturnT callback(List callbackParamList) {

return XxlJobRemotingUtil.postBody(addressUrl+“api/callback”, accessToken, timeout, callbackParamList, String.class);

}

initAdminBizList( adminAddresses, accessToken); 初始化注册中心列表,用于后期和注册中心交互

//扫描xxl.job.admin.addresses配置,将他们加入注册中心列表adminBizList对象中。用于后期发送回调

private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {

if (adminAddresses!=null && adminAddresses.trim().length()>0) {

for (String address: adminAddresses.trim().split(",")) {

if (address!=null && address.trim().length()>0) {

AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

if (adminBizList == null) {

adminBizList = new ArrayList();

}

adminBizList.add(adminBiz);

}

}

}

}

// init executor-server

initEmbedServer(address, ip, port, appname, accessToken);<核心>

//初始化xxljob执行器服务

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

//初始化ip和端口,如果没有ip则自动获取本地ip

port = port>0?port: NetUtil.findAvailablePort(9999);

ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

// generate address

if (address==null || address.trim().length()==0) {

String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null

address = “http://{ip_port}/”.replace("{ip_port}", ip_port_address);

}

// 启动服务

embedServer = new EmbedServer();

embedServer.start(address, port, appname, accessToken);

}

embedServer.start(address, port, appname, accessToken); 本质上是一个Netty服务,标准的Netty服务启动,我们只看EmbedHttpServerHandler,Netty处理请求的handler

public void start(final String address, final int port, final String appname, final String accessToken) {

executorBiz = new ExecutorBizImpl();

thread = new Thread(new Runnable() {

@Override

public void run() {

// param

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(

0,

200,

60L,

TimeUnit.SECONDS,

new linkedBlockingQueue(2000),

new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

return new Thread(r, “xxl-rpc, EmbedServer bizThreadPool-” + r.hashCode());

}

},

new RejectedExecutionHandler() {

@Override

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

throw new RuntimeException(“xxl-job, EmbedServer bizThreadPool is EXHAUSTED!”);

}

});

try {

// start server

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel channel) throws Exception {

channel.pipeline()

.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle

.addLast(new HttpServerCodec())

.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL

.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));

}

})

.childOption(ChannelOption.SO_KEEPALIVE, true);

// bind

ChannelFuture future = bootstrap.bind(port).sync();

logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

//注册到调度中心

startRegistry(appname, address);

// wait util stop

future.channel().closeFuture().sync();

} catch (InterruptedException e) {

if (e instanceof InterruptedException) {

logger.info(">>>>>>>>>>> xxl-job remoting server stop.");

} else {

logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);

}

} finally {

// stop

try {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

}

});

thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave

thread.start();

}

public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler {

private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);

private ExecutorBiz executorBiz; //执行器

private String accessToken; //token

private ThreadPoolExecutor bizThreadPool;//执行器线程池

public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {

this.executorBiz = executorBiz;

this.accessToken = accessToken;

this.bizThreadPool = bizThreadPool;

}

@Override

protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

// request parse

//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);

String requestData = msg.content().toString(CharsetUtil.UTF_8);//获取请求数据

String uri = msg.uri();

HttpMethod httpMethod = msg.method();

boolean keepAlive = HttpUtil.isKeepAlive(msg);

String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

// invoke

bizThreadPool.execute(new Runnable() {

@Override

public void run() {

// 处理请求

Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

// 格式化为JSON

String responseJson = GsonTool.toJson(responseObj);

// 写回客户端

writeResponse(ctx, keepAlive, responseJson);

}

});

}

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

// valid

if (HttpMethod.POST != httpMethod) {

return new ReturnT(ReturnT.FAIL_CODE, “invalid request, HttpMethod not support.”);

}

if (uri==null || uri.trim().length()==0) {

return new ReturnT(ReturnT.FAIL_CODE, “invalid request, uri-mapping empty.”);

}

if (accessToken!=null

&& accessToken.trim().length()>0

&& !accessToken.equals(accessTokenReq)) {

return new ReturnT(ReturnT.FAIL_CODE, “The access token is wrong.”);

}

// services mapping

try {

//接收注册中心请求接口处理

if ("/beat".equals(uri)) {

return executorBiz.beat();

} else if ("/idleBeat".equals(uri)) {

IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);

return executorBiz.idleBeat(idleBeatParam);

} else if ("/run".equals(uri)) { //注册中心执行接口

TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);

return executorBiz.run(triggerParam);

} else if ("/kill".equals(uri)) {

KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);

return executorBiz.kill(killParam);

} else if ("/log".equals(uri)) {

LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);

return executorBiz.log(logParam);

} else {

return new ReturnT(ReturnT.FAIL_CODE, “invalid request, uri-mapping(”+ uri +") not found.");

}

} catch (Exception e) {

logger.error(e.getMessage(), e);

return new ReturnT(ReturnT.FAIL_CODE, “request error:” + ThrowableUtil.toString(e));

}

}

我们主要看下run方法的执行过程

@Override

public ReturnT run(TriggerParam triggerParam) {

// 根据jobid加载对应的job执行信息,第一次执行为null

JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());

IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;//根绝jobThread获取job处理handler

String removeOldReason = null;

// valid:jobHandler + jobThread

GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); //获取任务类型

if (GlueTypeEnum.BEAN == glueTypeEnum) {

// new jobhandler

IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());//获取任务的执行器

// 校验新老job是否一致,不一致将老的进行初始化。有可能任务更新。通过jobid获取的是老的

if (jobThread!=null && jobHandler != newJobHandler) {

// change handler, need kill old thread

removeOldReason = “change jobhandler or glue type, and terminate the old job thread.”;

jobThread = null;

jobHandler = null;

}

// valid handler

if (jobHandler == null) {

jobHandler = newJobHandler; //将新处理handler赋值给老的

if (jobHandler == null) {

return new ReturnT(ReturnT.FAIL_CODE, “job handler [” + triggerParam.getExecutorHandler() + “] not found.”);

}

}

} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

// valid old jobThread

if (jobThread != null &&

!(jobThread.getHandler() instanceof GlueJobHandler

&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {

// change handler or gluesource updated, need kill old thread

removeOldReason = “change job source or glue type, and terminate the old job thread.”;

jobThread = null;

jobHandler = null;

}

if (jobHandler == null) {

try {

IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());

jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());

} catch (Exception e) {

logger.error(e.getMessage(), e);

return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());

}

}

} else if (glueTypeEnum!=null && glueTypeEnum.isscript()) {

// valid old jobThread

if (jobThread != null &&

!(jobThread.getHandler() instanceof scriptJobHandler

&& ((scriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {

// change script or gluesource updated, need kill old thread

removeOldReason = “change job source or glue type, and terminate the old job thread.”;

jobThread = null;

jobHandler = null;

}

// valid handler

if (jobHandler == null) {

jobHa

【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】

浏览器打开:qq.cn.hn/FTf 免费领取

ndler = new scriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));

}

} else {

return new ReturnT(ReturnT.FAIL_CODE, “glueType[” + triggerParam.getGlueType() + “] is not valid.”);

}

// executor block strategy

if (jobThread != null) {

ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);

if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {

// discard when running

if (jobThread.isRunningOrHasQueue()) {

return new ReturnT(ReturnT.FAIL_CODE, “block strategy effect:”+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());

}

} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {

// kill running jobThread

if (jobThread.isRunningOrHasQueue()) {

removeOldReason = “block strategy effect:” + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

jobThread = null;

}

} else {

// just queue trigger

}

}

// 如果jobThread 为null,则将任务信息注册到jobThreadRepository对象进行缓存,并启动线程

if (jobThread == null) {

jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);

}

//将任务推送到自己现成的任务队列中区

ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam);

return pushResult;

}

看下registJobThread方法,该方法主要是根据任务信息,创建一个jobThread,之后启动该线程。然后将其缓存到jobThreadRepository中。如果存在老的任务,则将老的任务停掉。

private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap();

public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){

JobThread newJobThread = new JobThread(jobId, handler);//创建新得jobThread对象

newJobThread.start();//启动线程

logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

//将新的jobThread放入map中,并弹出老的。

JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map’s put method return the old value!!!

if (oldJobThread != null) {

//将老的停掉

oldJobThread.toStop(removeOldReason);

oldJobThread.interrupt();

}

return newJobThread;

}

该线程执行如下,主要就是获取队列中的任务,然后通过新建FutureTask线程执行任务。之后将执行结果推到TriggerCallbackThread的队列中。通过TriggerCallbackThread推到任务调度中心,进行记录结果信息。

 @Override

public void run() {

// init

try {

handler.init();

} catch (Throwable e) {

logger.error(e.getMessage(), e);

}

// execute

while(!toStop){

running = false;

idleTimes++;

TriggerParam triggerParam = null;

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号