springbatch 执行2次writer是怎么回事

发布网友 发布时间:2022-04-22 05:21

我来回答

2个回答

懂视网 时间:2022-04-10 17:31

Spring-Batch学习总结(3)——如何数据输入
一.ItemReader概述
1.ItemReader:提供数据的接口
2.在这个接口中只有一个方法read(),它读取一个数据并且移动到下一个数据上去,在读取结束时必须返回一个null,否则表明数据没有读取完毕;
例:
OverViewApplication:

package com.dhcc.batch.batchDemo.input.overview;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class OverViewApplication {

 public static void main(String[] args) {
 SpringApplication.run(OverViewApplication.class, args);
 }
}

InputOverViewDemoJobConfiguration:

package com.dhcc.batch.batchDemo.input.overview;

import java.util.Arrays;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class InputOverViewDemoJobConfiguration {
 @Autowired
 private JobBuilderFactory jobBuilderFactory;
 @Autowired
 private StepBuilderFactory stepBuilderFactory;

 @Bean
 public Job inputOverViewDemoJob() {
 return jobBuilderFactory.get("inputOverViewDemoJob").start(inputOverViewDemoJobStep()).build();

 }

 public Step inputOverViewDemoJobStep() {
 return stepBuilderFactory.get("inputOverViewDemoJobStep").<String, String>chunk(2)
  .reader(inputOverViewDemoReader()).writer(outputOverViewDemoWriter()).build();
 }

 private ItemWriter<? super String> outputOverViewDemoWriter() {
 return new ItemWriter<String>() {

  @Override
  public void write(List<? extends String> items) throws Exception {
  for (String item : items) {
   System.out.println("output writer data: " + item);
  }
  }
 };
 }

 @Bean
 public InputOverVierDemoItemReader inputOverViewDemoReader() {
 List<String> data = Arrays.asList("dazhonghua", "xiaoriben", "meilijian", "falanxi", "deyizhi", "aierlan",
  "fandigang", "bajisitan", "baieluosi");
 return new InputOverVierDemoItemReader(data);
 }
}

InputOverVierDemoItemReader:

package com.dhcc.batch.batchDemo.input.overview;

import java.util.Iterator;
import java.util.List;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

public class InputOverVierDemoItemReader implements ItemReader<String> {
 private final Iterator<String> iterator;

 public InputOverVierDemoItemReader(List<String> data) {
 this.iterator = data.iterator();
 }

 @Override
 public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
 if (iterator.hasNext()) {
  return this.iterator.next();
 } else {
  return null;
 }
 }

}

运行结果:
技术分享图片
二.从数据库中读取数据
1.在实际应用中,我们都需要从数据库中读取数据,并且进行分页读取,在spring-batch中为我们提供了JDBCPagingItemReader这个类进行数据库数据读取
2.例:再举这个例子之前我们在数据库中建立了个person_buf表,并向表中插入了100001条数据

技术分享图片
接下来我们读取这个表中的数据为例,进行学习:
InputItemReaderJDBCApplication:

package com.dhcc.batch.batchDemo.input.db.jdbc;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class InputItemReaderJDBCApplication {
 public static void main(String[] args) {
 SpringApplication.run(InputItemReaderJDBCApplication.class, args);

 }
}

InputDBJdbcItemReaderConfigruation:

package com.dhcc.batch.batchDemo.input.db.jdbc;

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class InputDBJdbcItemReaderConfigruation {

 @Autowired
 private JobBuilderFactory jobBuilderFactory;

 @Autowired
 private StepBuilderFactory stepBuilderFactory;

 @Autowired
 @Qualifier("DBJdbcWriterDemo")
 private ItemWriter<? super Person> DBJbbcWriterDemo;

 @Autowired
 private DataSource dataSource;

 @Bean
 public Job DBJdbcItemReaderJob() {
 return jobBuilderFactory.get("DBJdbcItemReaderJob4")
  .start(DBJdbcItemReaderJobStep())
  .build();

 }

 @Bean
 public Step DBJdbcItemReaderJobStep() {
 return stepBuilderFactory.get("DBJdbcItemReaderJobStep4")
  .<Person, Person>chunk(100)
  .reader(DBJbbcReaderDemo())
  .writer(DBJbbcWriterDemo)
  .build();
 }

 @Bean
 @StepScope
 public JdbcPagingItemReader<Person> DBJbbcReaderDemo() {
 JdbcPagingItemReader<Person> reader = new JdbcPagingItemReader<>();
 reader.setDataSource(this.dataSource); // 设置数据源
 reader.setFetchSize(100); // 设置一次最大读取条数
 reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到Person对中
 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
 queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列
 queryProvider.setFromClause("from person_buf"); // 设置要查询的表
 Map<String, Order> sortKeys = new HashMap<String, Order>();// 定义一个集合用于存放排序列
 sortKeys.put("id", Order.ASCENDING);// 按照升序排序
 queryProvider.setSortKeys(sortKeys);
 reader.setQueryProvider(queryProvider);// 设置排序列
 return reader;
 }

}

DBJdbcWriterDemo:

package com.dhcc.batch.batchDemo.input.db.jdbc;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
@Component("DBJdbcWriterDemo")
public class DBJdbcWriterDemo implements ItemWriter<Person>{

 @Override
 public void write(List<? extends Person> items) throws Exception {
 for(Person person:items) {
  System.out.println(person);
 }
 }

}

Person

package com.dhcc.batch.batchDemo.input.db.jdbc;

import java.util.Date;

public class Person {
 private Integer id;
 private String name;
 private String perDesc;
 private Date createTime;
 private Date updateTime;
 private String sex;
 private Float score;
 private Double price;

 public Person() {
 super();
 }

 public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,
  Double price) {
 super();
 this.id = id;
 this.name = name;
 this.perDesc = perDesc;
 this.createTime = createTime;
 this.updateTime = updateTime;
 this.sex = sex;
 this.score = score;
 this.price = price;
 }

 public Integer getId() {
 return id;
 }

 public void setId(Integer id) {
 this.id = id;
 }

 public String getName() {
 return name;
 }

 public void setName(String name) {
 this.name = name;
 }

 public Date getCreateTime() {
 return createTime;
 }

 public String getPerDesc() {
 return perDesc;
 }

 public void setPerDesc(String perDesc) {
 this.perDesc = perDesc;
 }

 public void setCreateTime(Date createTime) {
 this.createTime = createTime;
 }

 public Date getUpdateTime() {
 return updateTime;
 }

 public void setUpdateTime(Date updateTime) {
 this.updateTime = updateTime;
 }

 public String getSex() {
 return sex;
 }

 public void setSex(String sex) {
 this.sex = sex;
 }

 public Float getScore() {
 return score;
 }

 public void setScore(Float score) {
 this.score = score;
 }

 public Double getPrice() {
 return price;
 }

 public void setPrice(Double price) {
 this.price = price;
 }

 @Override
 public String toString() {
 return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="
  + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";
 }

}

PersonRowMapper:

package com.dhcc.batch.batchDemo.input.db.jdbc;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;
/**
 * 实现将数据库中的每条数据映射到Person对象中
 * @author Administrator
 *
 */
public class PersonRowMapper implements RowMapper<Person> {

 /**
 * rs一条结果集,rowNum代表当前行
 */
 @Override
 public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
 return new Person(rs.getInt("id")
  ,rs.getString("name")
  ,rs.getString("per_desc")
  ,rs.getDate("create_time")
  ,rs.getDate("update_time")
  ,rs.getString("sex")
  ,rs.getFloat("score")
  ,rs.getDouble("price"));
 }

}

运行结果:
技术分享图片

1.FlatFileItemReader:
(1)set lines to skip:设置跳过前几行
(2)set resource:指定源文件地址
(3)设置一个行解析器映射行对应的结果集
2.例:我们在项目中的resources中放入了两个csv文件,我们以读取springbatchtest1.csv为例:
(1)观察文件存放路径:
技术分享图片

(2)展示部分文件结构:

技术分享图片

(3)编写代码:
AlipayTranDo :

package com.dhcc.batch.batchDemo.input.flatFile;

public class AlipayTranDo {
 private String tranId;
 private String channel;
 private String tranType;
 private String counterparty;
 private String goods;
 private String amount;
 private String isDebitCredit;
 private String state;

 public AlipayTranDo(String tranId, String channel, String tranType, String counterparty, String goods,
  String amount, String isDebitCredit, String state) {
  super();
  this.tranId = tranId;
  this.channel = channel;
  this.tranType = tranType;
  this.counterparty = counterparty;
  this.goods = goods;
  this.amount = amount;
  this.isDebitCredit = isDebitCredit;
  this.state = state;
 }

 public String getTranId() {
  return tranId;
 }

 public void setTranId(String tranId) {
  this.tranId = tranId;
 }

 public String getChannel() {
  return channel;
 }

 public void setChannel(String channel) {
  this.channel = channel;
 }

 public String getTranType() {
  return tranType;
 }

 public void setTranType(String tranType) {
  this.tranType = tranType;
 }

 public String getCounterparty() {
  return counterparty;
 }

 public void setCounterparty(String counterparty) {
  this.counterparty = counterparty;
 }

 public String getGoods() {
  return goods;
 }

 public void setGoods(String goods) {
  this.goods = goods;
 }

 public String getAmount() {
  return amount;
 }

 public void setAmount(String amount) {
  this.amount = amount;
 }

 public String getIsDebitCredit() {
  return isDebitCredit;
 }

 public void setIsDebitCredit(String isDebitCredit) {
  this.isDebitCredit = isDebitCredit;
 }

 public String getState() {
  return state;
 }

 public void setState(String state) {
  this.state = state;
 }

 @Override
 public String toString() {
  return "AlipayTranDO{" +
   "tranId=‘" + tranId + ‘‘‘ +
   ", channel=‘" + channel + ‘‘‘ +
   ", tranType=‘" + tranType + ‘‘‘ +
   ", counterparty=‘" + counterparty + ‘‘‘ +
   ", goods=‘" + goods + ‘‘‘ +
   ", amount=‘" + amount + ‘‘‘ +
   ", isDebitCredit=‘" + isDebitCredit + ‘‘‘ +
   ", state=‘" + state + ‘‘‘ +
   ‘}‘;
 }
 }

AlipayTranDoFileMapper:

package com.dhcc.batch.batchDemo.input.flatFile;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

public class AlipayTranDoFileMapper implements FieldSetMapper<AlipayTranDo> {

 @Override
 public AlipayTranDo mapFieldSet(FieldSet fieldSet) throws BindException {
 return new AlipayTranDo(fieldSet.readString("tranId")
  , fieldSet.readString("channel")
  ,fieldSet.readString("tranType")
  , fieldSet.readString("counterparty")
  , fieldSet.readString("goods")
  ,fieldSet.readString("amount")
  , fieldSet.readString("isDebitCredit")
  , fieldSet.readString("state")
  );
 }

}

FlatFileWriterDemo:

package com.dhcc.batch.batchDemo.input.flatFile;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
@Component("FlatFileWriterDemo")
public class FlatFileWriterDemo implements ItemWriter<AlipayTranDo>{

 @Override
 public void write(List<? extends AlipayTranDo> items) throws Exception {
 for(AlipayTranDo alipayTranDo:items) {
  System.out.println(alipayTranDo);
 }
 }

}

InputFaltFileItemReaderConfigruation:

package com.dhcc.batch.batchDemo.input.flatFile;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class InputFaltFileItemReaderConfigruation {

 @Autowired
 private JobBuilderFactory jobBuilderFactory;

 @Autowired
 private StepBuilderFactory stepBuilderFactory;

 @Autowired
 @Qualifier("FlatFileWriterDemo")
 private ItemWriter<? super AlipayTranDo> FlatFileWriterDemo;

 @Bean
 public Job FaltFileItemReaderJob() {
 return jobBuilderFactory.get("FaltFileItemReaderJob")
  .start(FaltFileItemReaderJobStep())
  .build();

 }

 @Bean
 public Step FaltFileItemReaderJobStep() {
 return stepBuilderFactory.get("FaltFileItemReaderJobStep")
  .<AlipayTranDo, AlipayTranDo>chunk(100)
  .reader(FaltFileReaderDemo())
  .writer(FlatFileWriterDemo)
  .build();
 }

 @Bean
 @StepScope
 public FlatFileItemReader<AlipayTranDo> FaltFileReaderDemo() {
 FlatFileItemReader<AlipayTranDo> reader=new FlatFileItemReader<AlipayTranDo>();
 reader.setResource(new ClassPathResource("/data/init/springbatchtest1.csv"));
 reader.setLinesToSkip(5);

 DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
 tokenizer.setNames(new String[] 
  {"tranId","channel","tranType","counterparty","goods","amount","isDebitCredit","state"}
 );
 DefaultLineMapper<AlipayTranDo> lineMapper=new DefaultLineMapper<AlipayTranDo>();
 lineMapper.setLineTokenizer(tokenizer);
 lineMapper.setFieldSetMapper(new AlipayTranDoFileMapper());
 lineMapper.afterPropertiesSet();
 reader.setLineMapper(lineMapper);
 return reader;
 }

}

InputItemReaderFileApplication:

package com.dhcc.batch.batchDemo.input.flatFile;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class InputItemReaderFileApplication {
 public static void main(String[] args) {
 SpringApplication.run(InputItemReaderFileApplication.class, args);

 }
}

运行结果:
技术分享图片

四.使用ItemReader从xml文件中读取数据
1.使用StaxEventItemReader<T>读取xml数据
2.例:在项目中加入一个weather.xml文件,以读取此文件为例
观察xml文件数据结构:

<datas>
<data>
<date>2018-10-08</date>
<icon>d07|n02</icon>
<weather>小雨转阴</weather>
<temperature>19/9℃</temperature>
<winddirect>西南风</winddirect>
</data>
<data>
<date>2018-10-09</date>
<icon>d01|n00</icon>
<weather>多云转晴</weather>
<temperature>21/7℃</temperature>
<winddirect>东北风</winddirect>
</data>
<data>
<date>2018-10-10</date>
<icon>d00|n01</icon>
<weather>晴转多云</weather>
<temperature>20/8℃</temperature>
<winddirect>东北风3-4级转</winddirect>
</data>
<data>
<date>2018-10-11</date>
<icon>d01|n01</icon>
<weather>多云</weather>
<temperature>18/9℃</temperature>
<winddirect>东北风</winddirect>
</data>
<data>
<date>2018-10-12</date>
<icon>d00|n02</icon>
<weather>晴转阴</weather>
<temperature>18/12℃</temperature>
<winddirect>东北风</winddirect>
</data>
<data>
<date>2018-10-13</date>
<icon>d02|n02</icon>
<weather>阴</weather>
<temperature>18/13℃</temperature>
<winddirect>北风转南风</winddirect>
</data>
<data>
<date>2018-10-14</date>
<icon>d01|n02</icon>
<weather>多云转阴</weather>
<temperature>17/11℃</temperature>
<winddirect>东北风3-4级转</winddirect>
</data>
</datas>
使用spring batch读取xml文件数据代码如下:
WeatherData:

package com.dhcc.batch.batchDemo.input.xmlFile;

public class WeatherData {
 private String date;
 private String icon;
 private String weather;
 private String temperature;
 private String winddirect;

 public WeatherData() {
 super();
 }

 public WeatherData(String date, String icon, String weather, String temperature, String winddirect) {
 super();
 this.date = date;
 this.icon = icon;
 this.weather = weather;
 this.temperature = temperature;
 this.winddirect = winddirect;
 }

 public String getDate() {
 return date;
 }

 public void setDate(String date) {
 this.date = date;
 }

 public String getIcon() {
 return icon;
 }

 public void setIcon(String icon) {
 this.icon = icon;
 }

 public String getWeather() {
 return weather;
 }

 public void setWeather(String weather) {
 this.weather = weather;
 }

 public String getTemperature() {
 return temperature;
 }

 public void setTemperature(String temperature) {
 this.temperature = temperature;
 }

 public String getWinddirect() {
 return winddirect;
 }

 public void setWinddirect(String winddirect) {
 this.winddirect = winddirect;
 }

 @Override
 public String toString() {
 return "WeatherData [date=" + date + ", icon=" + icon + ", weather=" + weather + ", temperature=" + temperature
  + ", winddirect=" + winddirect + "]";
 }

}

XMLFileDemoJobConfiguration :

package com.dhcc.batch.batchDemo.input.xmlFile;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.oxm.xstream.XStreamMarshaller;

@Configuration
public class XMLFileDemoJobConfiguration {
 @Autowired
 private JobBuilderFactory jobBuilderFactory;

 @Autowired
 private StepBuilderFactory stepBuilderFactory;

 @Autowired
 @Qualifier("XMLFileWriterDemo")
 private ItemWriter<? super WeatherData> XMLFileWriterDemo;

 @Bean
 public Job XMLFileDemoReaderJobTwo() {
 return jobBuilderFactory.get("XMLFileDemoReaderJobTwo")
  .start(XMLFileDemoReaderStepTwo())
  .build();

 }

 @Bean
 public Step XMLFileDemoReaderStepTwo() {
 return stepBuilderFactory.get("XMLFileDemoReaderStepTwo")
  .<WeatherData,WeatherData>chunk(2)
  .reader(XMLFileItemReader())
  .writer(XMLFileWriterDemo)
  .build();
 }

 @Bean
 @StepScope
 public StaxEventItemReader<WeatherData> XMLFileItemReader() {
 StaxEventItemReader<WeatherData> reader=new StaxEventItemReader<WeatherData>();
 reader.setResource(new ClassPathResource("/xdata/init/weather.xml"));
 reader.setFragmentRootElementName("data");
 XStreamMarshaller unmarshaller=new XStreamMarshaller();
 Map<String,Class> map=new HashMap<>();
 map.put("data", WeatherData.class);
 unmarshaller.setAliases(map);
 reader.setUnmarshaller(unmarshaller);//序列化
 return reader;

 }

}

XMLFileWriterDemo :

package com.dhcc.batch.batchDemo.input.xmlFile;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
@Component("XMLFileWriterDemo")
public class XMLFileWriterDemo implements ItemWriter<WeatherData>{

 @Override
 public void write(List<? extends WeatherData> items) throws Exception {
 for(WeatherData weatherInfo:items) {
  System.out.println(weatherInfo);
 }
 }

}

XmlFileReaderApplication :

package com.dhcc.batch.batchDemo.input.xmlFile;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class XmlFileReaderApplication {

 public static void main(String[] args) {
 SpringApplication.run(XmlFileReaderApplication.class, args);
 }
}

运行结果:
技术分享图片

观察结果可得,我们成功的读取了xml文件
五.从多个文件读取数据
1.在一个给定的目录下一次读取多个文件时非常常见的
2.我们可以使用MultiResourceItemReader来注册一个input file并且设置代理的ItemReader去处理每一个源文件
例:我们在项目中同时存放两个csv文件,如下所示:
技术分享图片

我们以读取这两个文件为例:
代码:
实体类与上面例子实体类一样AlipaytranDo,不做展示
AlipayTranDoMutipleMapper :

package com.dhcc.batch.batchDemo.input.mutiple;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

public class AlipayTranDoMutipleMapper implements FieldSetMapper<AlipayTranDo> {

 @Override
 public AlipayTranDo mapFieldSet(FieldSet fieldSet) throws BindException {
 return new AlipayTranDo(fieldSet.readString("tranId")
  , fieldSet.readString("channel")
  ,fieldSet.readString("tranType")
  , fieldSet.readString("counterparty")
  , fieldSet.readString("goods")
  ,fieldSet.readString("amount")
  , fieldSet.readString("isDebitCredit")
  , fieldSet.readString("state")
  );
 }

}

MutipleFileDemoReaderApplication :

package com.dhcc.batch.batchDemo.input.mutiple;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class MutipleFileDemoReaderApplication {

 public static void main(String[] args) {
 SpringApplication.run(MutipleFileDemoReaderApplication.class, args);
 }
}

MutipleFileDemoReaderConfiguration :

package com.dhcc.batch.batchDemo.input.mutiple;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;

@Configuration
public class MutipleFileDemoReaderConfiguration {
 @Autowired
 private JobBuilderFactory jobBuilderFactory;
 @Autowired
 private StepBuilderFactory stepBuilderFactory;

 @Autowired
 @Qualifier("MutipleFileItemWriterDemo")
 private ItemWriter<? super AlipayTranDo> MutipleFileItemWriterDemo;

 @Value("classpath*:/data/init/springbatchtest*.csv")
 private Resource[] resources;

 @Bean
 public Job MutipleFileDemoReaderJob3() {
 return jobBuilderFactory.get("MutipleFileDemoReaderJob3")
  .start(MutipleFileDemoItemReaderJobStep3())
  .build();

 }

 @Bean
 public Step MutipleFileDemoItemReaderJobStep3() {
 return stepBuilderFactory.get("MutipleFileDemoItemReaderJobStep3")
  .<AlipayTranDo, AlipayTranDo>chunk(100)
  .reader(MutipleResourceItemReaderDemo())
  .writer(MutipleFileItemWriterDemo)
  .build();
 }

 @Bean
 @StepScope
 public MultiResourceItemReader<AlipayTranDo> MutipleResourceItemReaderDemo() {
 MultiResourceItemReader<AlipayTranDo> reader=new MultiResourceItemReader<AlipayTranDo>();
 reader.setDelegate(FaltFileReaderDemo());
 reader.setResources(resources);
 return reader;
 }

 @Bean
 @StepScope
 public FlatFileItemReader<AlipayTranDo> FaltFileReaderDemo() {
 FlatFileItemReader<AlipayTranDo> reader=new FlatFileItemReader<AlipayTranDo>();
// reader.setResource(new ClassPathResource("alipayTranDo"));
 reader.setLinesToSkip(5);

 DelimitedLineTokenizer tokenizer=new DelimitedLineTokenizer();
 tokenizer.setNames(new String[] 
  {"tranId","channel","tranType","counterparty","goods","amount","isDebitCredit","state"}
 );
 DefaultLineMapper<AlipayTranDo> lineMapper=new DefaultLineMapper<AlipayTranDo>();
 lineMapper.setLineTokenizer(tokenizer);
 lineMapper.setFieldSetMapper(new AlipayTranDoMutipleMapper());
 lineMapper.afterPropertiesSet();
 reader.setLineMapper(lineMapper);
 return reader;
 }

}

MutipleFileItemWriterDemo :

package com.dhcc.batch.batchDemo.input.mutiple;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
@Component("MutipleFileItemWriterDemo")
public class MutipleFileItemWriterDemo implements ItemWriter<AlipayTranDo>{

 @Override
 public void write(List<? extends AlipayTranDo> items) throws Exception {
 for(AlipayTranDo alipayTranDo:items) {
  System.out.println(alipayTranDo);
 }
 }

}

六.ItemReader异常处理及重启
1.对于chunk类型的Step,spring batch为我们提供了用于管理它的状态
2.状态的管理是通过ItemStream接口来实现的
3.ItemStream接口:
(1)open():每一次step执行会调用
(2)Update():每一个chunk去执行都会调用
(3)Close():所有的chunk执行完毕会调用
4.图形:
技术分享图片

此处不再展示例子

最后附上项目pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.dhcc.batch</groupId>
 <artifactId>batchDemo</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>batchDemo</name>
 <description>Demo project for Spring Boot</description>

 <parent>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>2.0.4.RELEASE</version>
 <relativePath /> <!-- lookup parent from repository -->
 </parent>

 <properties>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 <java.version>1.8</java.version>
 </properties>

 <dependencies>
 <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
 </dependency>

 <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
 </dependency>

 <dependency>
  <groupId>org.springframework.batch</groupId>
  <artifactId>spring-batch-test</artifactId>
  <scope>test</scope>
 </dependency>

 <!-- https://mvnrepository.com/artifact/org.springframework/spring-oxm -->
 <!-- 用于读取xml文件序列化 -->
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-oxm</artifactId>
  <version>5.0.8.RELEASE</version>
 </dependency>

 <!-- https://mvnrepository.com/artifact/com.thoughtworks.xstream/xstream -->
 <dependency>
  <groupId>com.thoughtworks.xstream</groupId>
  <artifactId>xstream</artifactId>
  <version>1.4.10</version>
 </dependency>

 <!-- 内存数据库h2 <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> 
  </dependency> -->

 <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jdbc</artifactId>
 </dependency>
 <dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <scope>runtime</scope>
 </dependency>
 </dependencies>

 <build>
 <plugins>
  <plugin>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-maven-plugin</artifactId>
  </plugin>
 </plugins>
 </build>

</project>

Spring-batch学习总结(3)—ItemReader普通文件,数据库,XML,多文件数据读取

标签:system   cat   port   overview   mys   ESS   image   数据源   static   

热心网友 时间:2022-04-10 14:39

一个Batch Job是指一系列有序的Step的集合,它们作为预定义流程的一部分而被执行;
Step代表一个自定义的工作单元,它是Job的主要构件块;每一个Step由三部分组成:ItemReader、ItemProcessor、ItemWriter;这三个部分将执行在每一条被处理的记录上,ItemReader读取每一条记录,然后传递给ItemProcessor处理,最后交给ItemWriter做持久化;ItemProcessor不是必须的,一个Step可以仅仅包含ItemReader和ItemWriter;如果你不需要去读写任何数据,你可以仅仅在一个Step中包含一个Tasklet(等价于ItemProcessor);
组成Spring Batch的一些相关的类和接口:
org.springframework.batch.core.Job:表示一个Job,同时也提供了执行Job的能力;
org.springframework.batch.core.Step:表示一个step,同时也提供了执行Step的能力;
org.springframework.batch.item.ItemReader<T>:提供了读取数据的能力;
org.springframework.batch.item.ItemProcessor<T>:我们可以通过它应用业务逻辑到每一条要处理的数据;
org.springframework.batch.item.ItemWriter<T>:提供了写数据的能力
Spring Batch通过这种方式构建一个Job的优点在于解耦每一个Step到它自己的处理器当中;每一个Step负责

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com