栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spring Batch读取txt文件并写入数据库的方法教程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spring Batch读取txt文件并写入数据库的方法教程

项目需求

近日需要实现用户推荐相关的功能,也就是说向用户推荐他可能喜欢的东西。

我们的数据分析工程师会将用户以及用户可能喜欢的东西整理成文档给我,我只需要将数据从文档中读取出来,然后对数据进行进一步的清洗(例如去掉特殊符号,长度如果太长则截取)。然后将处理后的数据存入数据库(Mysql)。

所以分为三步:

  • 读取文档获得数据
  • 对获得的数据进行处理
  • 更新数据库(新增或更新)

考虑到这个数据量以后会越来越大,这里没有使用 poi 来读取数据,而直接使用了 SpringBatch。

实现步骤

本文假设读者已经能够使用 SpringBoot 连接处理 Mysql,所以这部分文中会省略。

1、创建 Maven 项目,并在 pom.xml 中添加依赖


 org.springframework.boot
 spring-boot-starter-parent
 1.5.2.RELEASE


 1.8


 
  org.springframework.boot
  spring-boot-starter-batch
 
 
  org.springframework.boot
  spring-boot-starter-data-jpa
 
 
  org.springframework.boot
  spring-boot-starter-test
  test
 
 
  org.mybatis.spring.boot
  mybatis-spring-boot-starter
  1.2.0
 
 
 
  org.projectlombok
  lombok
  1.12.6
 
 
  org.apache.commons
  commons-lang3
  3.4
 
 
 
  mysql
  mysql-connector-java
  runtime
 
 
  com.alibaba
  druid
  1.0.26
 
 
  org.springframework.boot
  spring-boot-starter-web
 

这里是这个小项目中用到的所有依赖,包括连接数据库的依赖以及工具类等。

2、编写 Model 类

我们要从文档中读取的有效列就是 uid,tag,type,就是用户 ID,用户可能包含的标签(用于推送),用户类别(用户用户之间互相推荐)。

UserMap.java 中的 @Entity,@Column 注解,是为了利用 JPA 生成数据表而写的,可要可不要。

UserMap.java

@Data
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
//@Entity(name = "user_map")
public class UserMap extends baseModel {
 @Column(name = "uid", unique = true, nullable = false)
 private Long uid;
 @Column(name = "tag")
 private String tag;
 @Column(name = "type")
 private Integer type;
}

3、实现批处理配置类

BatchConfiguration.java

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
 @Autowired
 public JobBuilderFactory jobBuilderFactory;
 @Autowired
 public StepBuilderFactory stepBuilderFactory;
 @Autowired
 @Qualifier("prodDataSource")
 DataSource prodDataSource;
 @Bean
 public FlatFileItemReader reader() {
  FlatFileItemReader reader = new FlatFileItemReader<>();
  reader.setResource(new ClassPathResource("c152.txt"));
  reader.setLineMapper(new DefaultLineMapper() {{
   setLineTokenizer(new DelimitedLineTokenizer("|") {{
    setNames(new String[]{"uid", "tag", "type"});
   }});
   setFieldSetMapper(new BeanWrapperFieldSetMapper() {{
    setTargetType(UserMap.class);
   }});
  }});
  return reader;
 }
 @Bean
 public JdbcBatchItemWriter importWriter() {
  JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>();
  writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
  writer.setSql("INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)");
  writer.setDataSource(prodDataSource);
  return writer;
 }
 @Bean
 public JdbcBatchItemWriter updateWriter() {
  JdbcBatchItemWriter writer = new JdbcBatchItemWriter<>();
  writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
  writer.setSql("UPDATE user_map SET type = (:type),tag = (:tag) WHERe uid = (:uid)");
  writer.setDataSource(prodDataSource);
  return writer;
 }
 @Bean
 public UserMapItemProcessor processor(UserMapItemProcessor.ProcessStatus processStatus) {
  return new UserMapItemProcessor(processStatus);
 }
 @Bean
 public Job importUserJob(JobCompletionNotificationListener listener) {
  return jobBuilderFactory.get("importUserJob")
    .incrementer(new RunIdIncrementer())
    .listener(listener)
    .flow(importStep())
    .end()
    .build();
 }
 @Bean
 public Step importStep() {
  return stepBuilderFactory.get("importStep")
    .chunk(100)
    .reader(reader())
    .processor(processor(import))
    .writer(importWriter())
    .build();
 }
 @Bean
 public Job updateUserJob(JobCompletionNotificationListener listener) {
  return jobBuilderFactory.get("updateUserJob")
    .incrementer(new RunIdIncrementer())
    .listener(listener)
    .flow(updateStep())
    .end()
    .build();
 }
 @Bean
 public Step updateStep() {
  return stepBuilderFactory.get("updateStep")
    .chunk(100)
    .reader(reader())
    .processor(processor(UPDATE))
    .writer(updateWriter())
    .build();
 }
}

prodDataSource 是假设用户已经设置好的,如果不知道怎么配置,也可以参考之前的文章进行配置:Springboot 集成 Mybatis。

reader(),这方法从文件中读取数据,并且设置了一些必要的参数。紧接着是写操作 importWriter() 和 updateWriter() ,读者看其中一个就好,因为我这里是需要更新或者修改的,所以分为两个。

processor(ProcessStatus status) ,该方法是对我们处理数据的类进行实例化,这里我根据 status 是 import 还是 UPDATE 来获取不同的处理结果。

其他的看代码就可以看懂了,哈哈,不详细说了。

4、将获得的数据进行清洗

UserMapItemProcessor.java

public class UserMapItemProcessor implements ItemProcessor {
 private static final int MAX_TAG_LENGTH = 200;
 
 private ProcessStatus processStatus;
 public UserMapItemProcessor(ProcessStatus processStatus) {
  this.processStatus = processStatus;
 }
 @Autowired
 IUserMapService userMapService;
 private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9\u4E00-\u9FA5_-]+$";
 public static final Pattern TAG_PATTERN = Pattern.compile(TAG_PATTERN_STR);
 private static final Logger LOG = LoggerFactory.getLogger(UserMapItemProcessor.class);
 @Override
 public UserMap process(UserMap userMap) throws Exception {
  Long uid = userMap.getUid();
  String tag = cleanTag(userMap.getTag());
  Integer label = userMap.getType() == null ? Integer.valueOf(0) : userMap.getType();
  if (StringUtils.isNotBlank(tag)) {
   Map params = new HashMap<>();
   params.put("uid", uid);
   UserMap userMapFromDB = userMapService.selectOne(params);
   if (userMapFromDB == null) {
    if (this.processStatus == ProcessStatus.import) {
     return new UserMap(uid, tag, label);
    }
   } else {
    if (this.processStatus == ProcessStatus.UPDATe) {
     if (!tag.equals(userMapFromDB.getTag()) && !label.equals(userMapFromDB.getType())) {
      userMapFromDB.setType(label);
      userMapFromDB.setTag(tag);
      return userMapFromDB;
     }
    }
   }
  }
  return null;
 }
 
 private static String cleanTag(String tag) {
  if (StringUtils.isNotBlank(tag)) {
   try {
    tag = tag.substring(tag.indexOf("{") + 1, tag.lastIndexOf("}"));
    String[] tagArray = tag.split(",");
    Optional reduce = Arrays.stream(tagArray).parallel()
      .map(str -> str.split(":")[0])
      .map(str -> str.replaceAll("'", ""))
      .map(str -> str.replaceAll(" ", ""))
      .filter(str -> TAG_PATTERN.matcher(str).matches())
      .reduce((x, y) -> x + "," + y);
    Function str = (s -> s.length() > MAX_TAG_LENGTH ? s.substring(0, MAX_TAG_LENGTH) : s);
    return str.apply(reduce.get());
   } catch (Exception e) {
    LOG.error(e.getMessage(), e);
   }
  }
  return null;
 }
 protected enum ProcessStatus {
  import,
  UPDATE;
 }
 public static void main(String[] args) {
  String distinctTag = cleanTag("Counter({'《重新定义》': 3, '轻想上的轻小说': 3, '小说': 2, 'Fate': 2, '同人小说': 2, '雪狼八组': 1, " +
    "'社会': 1, '人文': 1, '短篇': 1, '重新定义': 1, 'AMV': 1, '《FBD》': 1, '《雪狼六组》': 1, '战争': 1, '《灰羽联盟》': 1, " +
    "'谁说轻想没人写小说': 1})");
  System.out.println(distinctTag);
 }
}

读取到的数据格式如 main() 方法所示,清理之后的结果如:

轻想上的轻小说,小说,Fate,同人小说,雪狼八组,社会,人文,短篇,重新定义,AMV,战争,谁说轻想没人写小说 。

去掉了特殊符号以及数字等。使用了 Java8 的 Lambda 表达式。

并且这里在处理的时候,判断如果该数据用户已经存在,则进行更新,如果不存在,则新增。

5、Job 执行结束回调类

JobCompletionNotificationListener.java

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
 private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
 private final JdbcTemplate jdbcTemplate;
 @Autowired
 public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
  this.jdbcTemplate = jdbcTemplate;
 }
 @Override
 public void afterJob(JobExecution jobExecution) {
  System.out.println("end .....");
 }
}

具体的逻辑可自行实现。

完成以上几个步骤,运行项目,就可以读取并写入数据到数据库了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对考高分网的支持。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/146461.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号