自动,复杂地处理大量信息,无需用户交互即可最有效地进行处理。这些操作通常包括基于时间的事件(例如月末计算,通知或通信)。定期应用非常大的数据集(例如,保险利益确定或费率调整)重复处理复杂的业务规则。从内部和外部系统接收的信息的集成,通常需要格式化,验证和以事务方式进行的处理到记录系统中。批处理每天用于为企业处理数十亿笔事务 Spring Batch介绍
核心流程图如下:
Spring Batch 是一个轻量级的,全面的批处理框架,旨在支持开发对企业系统的日常运行至关重要的强大的批处理应用程序,提供了可重用的功能,这些功能对于处理大量记录至关重要,可用于简单的用例(例如,将文件读入数据库或运行存储过程),也可以用于复杂的大量用例(例如,在数据库之间移动大量数据,对其进行转换等)。上)。大量批处理作业可以以高度可扩展的方式利用框架来处理大量信息
总结起来就是:量大,复杂,流程化数据处理利器!
实现需求1、通过远程接口访问得到告警信息的原始数据
2、对数据进行清洗,聚合进行本地化操作,如:发送MQ,保存本地库等
核心依赖Spring 配置4.0.0 org.springframework.boot spring-boot-starter-parent 2.5.1 *** *** 0.0.1-SNAPSHOT *** 17 2020.0.3 UTF-8 org.springframework.cloud spring-cloud-dependencies ${spring-cloud.version} pom import org.springframework.boot spring-boot-starter-batch org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-config org.springframework.cloud spring-cloud-starter-bus-amqp org.projectlombok lombok org.springframework.boot spring-boot-starter-data-jdbc mysql mysql-connector-java runtime org.apache.httpcomponents httpclient com.aliyun.oss aliyun-sdk-oss 3.10.2 org.codehaus.jettison jettison com.baomidou mybatis-plus-boot-starter 3.4.3.1 org.apache.commons commons-lang3 3.11
spring:
application:
name: ***
batch:
job:
#默认自动执行定义的Job(true),改为false,需要jobLaucher.run执行
enabled: false
jdbc:
#spring batch在数据库里面创建默认的数据表,如果不是always则会提示相关表不存在
initialize-schema: always
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: ***
username: ***
password: ***
hikari:
connection-test-query: SELECt 1
rabbitmq:
listener:
simple:
missing-queues-fatal: false
addresses: ***
port: ***
username: ***
password: ***
virtual-host: ***
#--------------------mybatis配置----------------------------------
mybatis-plus:
mapper-locations:
- classpath*:mapper
@Data
public class RunningOverVO implements Serializable {
private static final long serialVersionUID = 1L;
private Annotations annotations;
private Date endsAt;
private String fingerprint;
private List receivers;
private Date startsAt;
private Status status;
private Date updatedAt;
private String generatorURL;
private Labels labels;
@Data
public class Labels {
private String acpus;
private String alertname;
private String comment;
private String dc;
private String to;
private String instance;
private String job;
private String jobid;
private String jobname;
private String lc;
private String lnodes_s;
private String nnodes;
private String priority;
private String queue;
private String service;
private String severity;
private String ss;
private Date start;
private String state;
private Date submit;
private String user;
}
@Data
static class Receivers {
private String name;
}
@Data
class Annotations {
private String description;
private String summary;
}
@Data
class Status {
private List inhibitedBy;
private List silencedBy;
private String state;
}
}
@Data
public class WarningInfoItem implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private Integer typeId;
private String clusterId;
private String clusterUser;
private String jobId;
private Date startTime;
private String job;
private Date createAt;
}
@Data
public class WarningInfo implements java.io.Serializable {
private static final long serialVersionUID = 1L;
@TableId(type = IdType.AUTO)
private BigInteger id;
private String userId;
private Integer typeId;
private String msg;
private Date datetime;
private Date createAt;
private Date ackAt;
private String isOss;
private String jobs;
private String remark;
private String desc = "详细内容进入系统查看";
private Boolean isDetailInfo;
@Override
public String toString() {
return "{" +
""id":" + id +
", "userId":"" + userId + '"' +
", "typeId":" + typeId +
", "msg":"" + msg + '"' +
", "datetime":"" + datetime + '"' +
", "createAt":"" + createAt + '"' +
", "ackAt":"" + ackAt + '"' +
", "jobs":"" + jobs + '"' +
'}';
}
}
Batch配置
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public JobRepository batchJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDatabaseType("mysql");
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDataSource(dataSource);
return jobRepositoryFactoryBean.getObject();
}
@Bean
public SimpleJobLauncher batchJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
// 设置jobRepository
jobLauncher.setJobRepository(batchJobRepository(dataSource, transactionManager));
return jobLauncher;
}
}
Step定义
Step1
Reader
读取远程接口,将原始数据转换为原始告警Bean,JsonItemReader为ItemReader接口的实现类,泛型为原始数据转换后的实体bean类型
@Bean("runningOverFirstReader")
@StepScope
public JsonItemReader runningOverReaderFirst(){
CloseableHttpClient httpclient = HttpClients.createDefault();
ResponseHandler responseHandler = new ResponseHandler() {
@Override
public String handleResponse(final HttpResponse response)
throws IOException {//
int status = response.getStatusLine().getStatusCode();
if (status >= 200 && status < 300) {
HttpEntity entity = response.getEntity();
return entity != null ? EntityUtils.toString(entity) : null;
} else {
throw new ClientProtocolException(
"Unexpected response status: " + status);
}
}
};
HttpGet httpGet = new HttpGet(warningRunningOverUrl);
httpGet.addHeader("Accept", "application/json;charset=UTF-8");
String response = null;
try {
response = httpclient.execute(httpGet,responseHandler);
} catch (IOException e) {
e.printStackTrace();
return null;
}
if(!StringUtils.hasText(response)){
return null;
}
ByteArrayResource byteArrayResource = new ByteArrayResource(response.getBytes());
JacksonJsonObjectReader jsonObjectReader = new JacksonJsonObjectReader(RunningOverVO.class);
jsonObjectReader.setMapper(new JsonMapper());
JsonItemReader jsonItemReader = new JsonItemReader<>(byteArrayResource,jsonObjectReader);
jsonItemReader.setName("runningOverReaderFirst");
jsonItemReader.setResource(byteArrayResource);
return jsonItemReader;
}
Process
对原始数据进行清洗,得到告警数据临时表对象,注意**ItemProcessor
@Bean("runningOverFirstProcess")
public ItemProcessor runningOverProcessFirst() {
ItemProcessor itemProcessor = ehrOrg -> {
RunningOverVO.Labels labels = ehrOrg.getLabels();
WarningInfoItem warningInfoItem = new WarningInfoItem();
warningInfoItem.setClusterId(labels.getDc());
warningInfoItem.setClusterUser(labels.getUser());
warningInfoItem.setJobId(labels.getJobid());
warningInfoItem.setTypeId(2);
warningInfoItem.setStartTime(labels.getStart());
warningInfoItem.setCreateAt(Calendar.getInstance().getTime());
//拼装job json字符串
StringBuffer sb = new StringBuffer();
sb.append("{");
sb.append(""job_id":"" + labels.getJobid() + "",");
sb.append(""job_name":"" + labels.getJobname()+ "",");
sb.append(""partition":"" + labels.getQueue()+ "",");
sb.append(""node_num":"" + labels.getNnodes()+ "",");
sb.append(""cores_num":"" + labels.getAcpus() + "",");
sb.append(""run_time":"" + getRunningDay(labels.getStart()) + """);
sb.append("}");
warningInfoItem.setJob(sb.toString());
return warningInfoItem;
};
return itemProcessor;
}
private static long getRunningDay(Date sTime) throws ParseException {
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date nowTime=sdf.parse(sdf.format(Calendar.getInstance().getTime()));
Date startTime =sdf.parse(sdf.format(sTime));
Calendar cal = Calendar.getInstance();
cal.setTime(nowTime);
long time1 = cal.getTimeInMillis();
cal.setTime(startTime);
long time2 = cal.getTimeInMillis();
long between_days=(time1-time2)/(1000*3600*24);
return between_days;
}
Writer
自定义的writer,继承ItemWriter接口,完成临时表数据入库批量操作,实现方法public void write(List list) throws Exception中,List即批量处理的数据集合,在后面Step对象创建中配置;也可以使用ItemWriter很多现成的实现类,基本上你能想到的都已经提供:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VguAk47T-1642488842343)(C:UsersparateraAppDataRoamingTyporatypora-user-imagesimage-20220118142554284.png)]
@Component
public class RunningOverFirstWriter implements ItemWriter {
@Autowired
private JdbcTemplate jdbcTemplate;
public RunningOverFirstWriter(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void write(List list) throws Exception {
String batchSQL = "insert IGNORE into warning_info_item(type_id,cluster_id,cluster_user,job_id,start_time,job,create_at) values (?,?,?,?,?,?,?)";
List
Step2
Reader
将Step1落库的临时表数据根据业务进行聚合,这一步根据自己的需要来,没必要看这么长的SQL
@Bean("runningOverSecondReader")
@StepScope
public ListItemReader
Process
数据进行二次清洗,根据业务来,Process阶段本来就是可选的,主要是照顾流程化作业需要
@Bean("runningOverSecondProcess")
public ItemProcessor, Map> runningOverProcessSecond() {
Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
ObjectMapper mapper = new ObjectMapper();
ItemProcessor, Map> itemProcessor = ehrOrg -> {
ehrOrg.put("date_time",ehrOrg.get("date_time") != null?format.format(Date.from(((LocalDateTime)ehrOrg.get("date_time")).toInstant(ZoneOffset.of("+8")))):"");
ehrOrg.put("createAt",ehrOrg.get("createAt") != null?format.format(Date.from(((LocalDateTime)ehrOrg.get("createAt")).toInstant(ZoneOffset.of("+8")))):"");
//组装报警描述
String jobStr = (String) ehrOrg.get("jobs");
if(StringUtils.hasText(jobStr)){
List> list = mapper.readValue(jobStr, List.class);
for (Map map : list) {
String msg = "您的作业" + map.get("job_id") +"等已经运行超过1 天,请注意检查运行情况,避免机时浪费。 如有问题,可咨询在线支持工程师或您的客户经理,感谢您的支持";
ehrOrg.put("msg",msg);
break;
}
}
return ehrOrg;
};
return itemProcessor;
}
Writer
自定义Writer,可以同时实现多个业务
@Component
@Slf4j
public class RunningOverSecondWriter implements ItemWriter {
private static String QUEUE_CONSOLE_NOTICE = "***";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private WarningMapper warningMapper;
@Value("${console.aliyun.ak}")
private String ossAK;
@Value("${console.aliyun.sk}")
private String ossSK;
@Value("${console.aliyun.endpoint}")
private String endpoint;
@Value("${console.warning.bucket.name}")
private String bucketName;
public RunningOverSecondWriter(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void write(List list) throws Exception {
OSS ossClient = new OSSClientBuilder().build(endpoint, ossAK, ossSK);
ObjectMapper mapper = new ObjectMapper();
List lstWarning = new ArrayList<>();
DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Object o : list) {
Map m = (Map)o;
String jobs = (String) m.get("jobs");
String isOss = "0";
StringBuffer remark = new StringBuffer();
//生成告警id,取当前时间纳秒数
long id = System.nanoTime();
m.put("id",id);
if(StringUtils.isNotEmpty(jobs)){
List lstMap = mapper.readValue(jobs,List.class);
if(!CollectionUtils.isEmpty(lstMap) && lstMap.size() > 5){
//将字符流转换为json文件,保存OSS标志,后面需要用到
isOss = "1";
//保存到Oss,同时设置文件路径
jobs = uploadJobs2Oss(ossClient,jobs,id).toString();
}
remark.append("n" +
" n" +
" 作业号n" +
" n" +
" n" +
" 作业名n" +
" n" +
" n" +
" 队列n" +
" n" +
" n" +
" 节点数目n" +
" n" +
" n" +
" 占用核数n" +
" n" +
" n" +
" 运行天数n" +
" n" +
" ");
for (Map map : lstMap) {
remark.append("n");
remark.append("n");
remark.append(map.get("job_id")).append("n");
remark.append("n");
remark.append("n");
remark.append(map.get("job_name")).append("n");
remark.append("n");
remark.append("n");
remark.append(map.get("partition")).append("n");;
remark.append("n");
remark.append("");
remark.append(map.get("node_num")).append("n");;
remark.append("n");
remark.append("");
remark.append(map.get("cores_num")).append("n");;
remark.append("n");
remark.append("");
remark.append(map.get("run_time")).append("n");;
remark.append("n");
remark.append("");
}
//显示详情
m.put("remark",remark.toString());
}
WarningInfo warningInfo = settingWarningInfo(format, m, jobs, isOss);
//发送MQ
rabbitTemplate.convertAndSend(QUEUE_CONSOLE_NOTICE,mapper.writevalueAsString(warningInfo));
lstWarning.add(warningInfo);
}
//关闭OSS客户端连接
ossClient.shutdown();
//批量插入warning_info
warningMapper.batchInsert(lstWarning);
//删除中间表原始数据
jdbcTemplate.execute("delete from warning_info_item");
}
private WarningInfo settingWarningInfo(DateFormat format, Map m, String jobs, String isOss) throws ParseException {
WarningInfo warningInfo = new WarningInfo();
warningInfo.setId(BigInteger.valueOf((Long) m.get("id")));
warningInfo.setUserId((String) m.get("user_id"));
warningInfo.setTypeId((Integer) m.get("type_id"));
warningInfo.setMsg((String) m.get("msg"));
warningInfo.setIsOss(isOss);
warningInfo.setJobs(jobs);
warningInfo.setIsDetailInfo(true);
warningInfo.setRemark((String) m.get("remark"));
warningInfo.setDatetime(format.parse((String) m.get("date_time")));
warningInfo.setCreateAt(format.parse((String) m.get("createAt")));
return warningInfo;
}
private URL uploadJobs2Oss(OSS ossClient, String content, long fileName){
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, "warning/" + fileName + ".json", new ByteArrayInputStream(content.getBytes()));
// 上传字符串。
ossClient.putObject(putObjectRequest);
//获取文件签名路径用于保存到warning_info表
// 设置签名URL过期时间为3600秒(1小时)* 24 * 30,我擦,为什么会是减法
Date expiration = new Date(Calendar.getInstance().getTime().getTime() - 3600 * 1000 * 24 * 30);
// 生成以GET方法访问的签名URL,访客可以直接通过浏览器访问相关内容。
URL url = ossClient.generatePresignedUrl(bucketName, "warning/" + fileName + ".json", expiration);
return url;
}
Step
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("runningOverFirstReader")
private JsonItemReader runningOverFirstReader;
@Autowired
@Qualifier("runningOverFirstProcess")
private ItemProcessor runningOverFirstProcess;
@Autowired
private RunningOverFirstWriter runningOverFirstWriter;
@Autowired
@Qualifier("runningOverSecondReader")
private ListItemReader runningOverSecondReader;
@Autowired
@Qualifier("runningOverSecondProcess")
private ItemProcessor runningOverSecondProcess;
@Autowired
private RunningOverSecondWriter runningOverSecondWriter;
public Step runningOverFirstStep() {
return stepBuilderFactory.get("runningOverFirstStep")
//提交事务之前要处理的数据个数,可以理解为"分页" 理论上如果数据够用,越大越好,但是如果不够用,会额外消耗准备时间(类似于i++操作)
.chunk(100)
.reader(runningOverFirstReader)
.processor(runningOverFirstProcess)
.writer(runningOverFirstWriter)
.build();
}
public Step runningOverSecondStep() {
return stepBuilderFactory.get("runningOverSecondStep")
.chunk(50)
.reader(runningOverSecondReader)
.processor(runningOverSecondProcess)
.writer(runningOverSecondWriter)
.build();
}
Flow
这个非常类似于工作流,我甚至觉得可以当成工作流来应用!
@Bean
public Flow runningOverFlow() {
return new FlowBuilder("runningOverFlow")
.start(runningOverFirstStep())
.next(runningOverSecondStep())
.build();
}
Job
最小执行单元
@Bean
public Job runningOverJob() {
return jobBuilderFactory.get("runningOverJob")
.start(runningOverFlow())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
测试执行
通过Job执行器执行Job
//这个地方纯粹的只是为了打印一下每个Step执行时间,你完全可以new 一个空参数对象
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
try {
jobLauncher.run(runningOverJob, jobParameters);
} catch (Exception e.printStack()) {
e.printStack()
}



