栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ElasticJob‐Lite:Script & HTTP作业

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ElasticJob‐Lite:Script & HTTP作业

ElasticJob‐Lite:script & HTTP作业

在上一篇博客中,博主介绍了Simple和Dataflow作业:

  • ElasticJob‐Lite:Simple & Dataflow作业

ElasticJob的作业分类基于class和type两种类型。基于class的作业需要开发者自行通过实现接口的方式织入业务逻辑;基于type的作业则无需编码,只需要提供相应配置即可。基于class的作业接口的方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount()、getShardingItem()等方法分别获取分片总数和运行在本作业服务器的分片序列号等。

ElasticJob目前提供Simple、Dataflow这两种基于class的作业类型,并提供script、HTTP这两种基于type的作业类型,用户可通过实现SPI接口自行扩展作业类型。

通过实现SPI接口自行扩展作业类型以后再进行介绍,本篇博客介绍script和HTTP作业。

添加依赖(3.0.1是目前最新的Releases版本):

        
            org.apache.shardingsphere.elasticjob
            elasticjob-lite-core
            3.0.1
        
script作业

ElasticJob支持shell、python以及perl等所有类型脚本。可通过属性script.command.line配置待执行脚本,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;


public class Application {
    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), "script",
                createJobConfiguration()).schedule();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }
    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MyscriptJob", 3)
                .description("脚本作业")
                .cron("30 * * * * ?")
                .setProperty("script.command.line", "python F:\workspace\IDEA\my\job\src\main\java\com\kaven\job\script.py")
                .overwrite(true)
                .failover(true)
                .build();
    }
}

elasticJobType参数需要全部大写(比如script和HTTP)。

        new ScheduleJobBootstrap(createRegistryCenter(), "script",
                createJobConfiguration()).schedule();

定义脚本作业只需要一行(这里脚本路径是绝对路径)。

                .setProperty("script.command.line", "python F:\workspace\IDEA\my\job\src\main\java\com\kaven\job\script.py")

script.py:

import sys
import time

print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), ':', sys.argv[-1])

输出如下图所示:


脚本路径也可以使用相对路径(相对于项目的路径F:\workspace\IDEA\my\job)。

                .setProperty("script.command.line",
                        "python .\src\main\java\com\kaven\job\script.py")

参数

给脚本传参数,就跟手动执行脚本一样,在命令后面添加参数即可。

    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MyscriptJob4", 3)
                .description("脚本作业")
                .cron("30 * * * * ?")
                .setProperty("script.command.line",
                        "python F:\workspace\IDEA\my\job\src\main\java\com\kaven\job\script.py kaven")
                .overwrite(true)
                .failover(true)
                .build();
    }

修改脚本:

import sys
import time

print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()), ':', sys.argv[:])

输出如下图所示:

HTTP作业

先搭建一个Spring Boot项目,在项目中定义一个接口。

pom.xml:



    4.0.0
    
    com.kaven
    server
    pom
    1.0-SNAPSHOT

    
        org.springframework.boot
        spring-boot-starter-parent
        2.3.2.RELEASE
    

    
        
            org.springframework.boot
            spring-boot-starter-web
        
    

MessageController接口定义:

package com.kaven.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;



@RestController
public class MessageController {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final String MESSAGE_HEAD = "MessageController";

    @GetMapping("/message")
    public String getMessage(String messageBody, @RequestHeader String shardingContext) {
        String message = MESSAGE_HEAD + " " +  messageBody;
        System.out.println(formatter.format(new Date()) + " " + message + " - " + shardingContext);
        return message;
    }
}

Server启动类:

package com.kaven;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;



@SpringBootApplication
public class Server {
    public static void main(String[] args) {
        SpringApplication.run(Server.class);
    }
}

Application类:

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.http.props.HttpJobProperties;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;


public class Application {
    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), "HTTP",
                createJobConfiguration()).schedule();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }

    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MyHTTPJob", 3)
                .description("HTTP作业")
                .cron("30 * * * * ?")
                .setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message?messageBody=MyHTTPJob")
                .setProperty(HttpJobProperties.METHOD_KEY, "GET")
//                .setProperty(HttpJobProperties.DATA_KEY, "messageBody=MyHTTPJob")
                .overwrite(true)
                .failover(true)
                .build();
    }
}

定义HTTP作业也比较简单,只需要设置HTTP作业的相关参数,比如接口的URI、类型、参数以及连接超时时间等,HttpJobProperties.DATA_KEY用于给POST这类请求设置参数。

                .setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message?messageBody=MyHTTPJob")
                .setProperty(HttpJobProperties.METHOD_KEY, "GET")
//                .setProperty(HttpJobProperties.DATA_KEY, "messageBody=MyHTTPJob")

HttpJobProperties类:

package org.apache.shardingsphere.elasticjob.http.props;


public final class HttpJobProperties {
    public static final String URI_KEY = "http.uri";
    public static final String METHOD_KEY = "http.method";
    public static final String DATA_KEY = "http.data";
    public static final String CONNECT_TIMEOUT_KEY = "http.connect.timeout.milliseconds";
    public static final String READ_TIMEOUT_KEY = "http.read.timeout.milliseconds";
    public static final String CONTENT_TYPE_KEY = "http.content.type";
    public static final String SHARDING_CONTEXT_KEY = "shardingContext";
}

输出如下图所示:

多参数接口

如果GET接口有多个参数,在URI上进行拼接即可。

                .setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message?arg1=1&arg2=2")

而如果是POST接口有多个参数,就不能多次调用setProperty(HttpJobProperties.DATA_KEY, "key=value")来完成多个参数的传值,因为值会被覆盖(即只会取最后一次设置的值,ElasticJob使用Java的Properties来存储参数,而Properties继承Hashtable,当键相同时,值会被覆盖,而参数的键都是HttpJobProperties.DATA_KEY)。

    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MyHTTPJob", 3)
                .description("HTTP作业")
                .cron("30 * * * * ?")
                .setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message")
                .setProperty(HttpJobProperties.METHOD_KEY, "POST")
                .setProperty(HttpJobProperties.DATA_KEY, "arg1=1")
                .setProperty(HttpJobProperties.DATA_KEY, "arg2=2")
                .overwrite(true)
                .failover(true)
                .build();
    }

接口:

package com.kaven.controller;

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;



@RestController
public class MessageController {
    @PostMapping("/message")
    public void getMessage(String arg1, String arg2) {
        System.out.println(arg1 + arg2);
    }
}

输出如下图所示:

第一个参数被覆盖了,所以为null,因此给POST接口传多个参数,博主想到的方法是将多个参数组合成一个实例,再将其转换成JSON字符串,接口那边再将这个JSON字符串转换成对应的实例。大家如果有更好的方法可以评论区留言。

添加依赖:

        
            com.google.code.gson
            gson
            2.8.9
        
        
            org.projectlombok
            lombok
            1.18.22
            provided
        

存储多个参数的Data类:

package com.kaven.job.data;

import lombok.*;



@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class Data {
    private String username;
    private String password;
    private School school;

    @Setter
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    @ToString
    public static class School{
        private String name;
        private String address;
    }
}

Application类:

package com.kaven.job;

import com.google.gson.Gson;
import com.kaven.job.data.Data;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.http.props.HttpJobProperties;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;


public class Application {
    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), "HTTP",
                createJobConfiguration()).schedule();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }

    private static JobConfiguration createJobConfiguration() {
        Data data = Data.builder()
                .username("kaven")
                .password("itkaven")
                .school(Data.School.builder().name("xxx").address("中国").build())
                .build();
        String dataStr = (new Gson()).toJson(data);
        System.out.println(dataStr);
        return JobConfiguration.newBuilder("MyHTTPJob", 3)
                .description("HTTP作业")
                .cron("30 * * * * ?")
                .setProperty(HttpJobProperties.URI_KEY, "http://localhost:8080/message")
                .setProperty(HttpJobProperties.METHOD_KEY, "POST")
                .setProperty(HttpJobProperties.DATA_KEY, "args=" + dataStr)
                .overwrite(true)
                .failover(true)
                .build();
    }
}

下面这部分就是将要传的多个参数组合成Data实例,然后再将其转换成JSON字符串。

        Data data = Data.builder()
                .username("kaven")
                .password("itkaven")
                .school(Data.School.builder().name("xxx").address("中国").build())
                .build();
        String dataStr = (new Gson()).toJson(data);

接口(将JSON字符串再转换成想要的包含多个参数信息的Data实例):

package com.kaven.controller;

import com.google.gson.Gson;
import com.kaven.data.Data;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;



@RestController
public class MessageController {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final String MESSAGE_HEAD = "MessageController";

    @PostMapping("/message")
    public void getMessage(String args, @RequestHeader String shardingContext) {
        Gson gson = new Gson();
        Data data = gson.fromJson(args, Data.class);
        System.out.println(formatter.format(new Date()) + " " + data + " " + shardingContext);
    }
}

输出如下图所示:

ElasticJob的script和HTTP作业就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/642890.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号