ItemWriter
对于read读取数据时是一个item为单位的循环读取,而对于writer写入数据则是以chunk为单位,一块一块的进行写入
先写一个Job 和 ItermReader作为例子
@Configuration
public class DbOutputDemoJobConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("dbOutputDemoJobFlatFileReader")
public ItemReader<Customer> dbOutputDemoJobFlatFileReader;
@Autowired
@Qualifier("dbOutputDemoJobFlatFileWriter")
public ItemWriter<Customer> dbOutputDemoJobFlatFileWriter;
@Bean
public Step dbOutputDemoStep() {
return stepBuilderFactory.get("dbOutputDemoStep")
.<Customer,Customer>chunk(10)
.reader(dbOutputDemoJobFlatFileReader)
.writer(dbOutputDemoJobFlatFileWriter)
.build();
}
@Bean
public Job dbOutputDemoJob() {
return jobBuilderFactory.get("dbOutputDemoJob")
.start(dbOutputDemoStep())
.build();
}
}
@Configuration
public class DbOutputDemoJobReaderConfiguration {
@Bean
public FlatFileItemReader<Customer> dbOutputDemoJobFlatFileReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("customerInit.csv"));
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] {"id","firstName", "lastName", "birthdate"});
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper((fieldSet -> {
return Customer.builder().id(fieldSet.readLong("id"))
.firstName(fieldSet.readString("firstName"))
.lastName(fieldSet.readString("lastName"))
.birthdate(fieldSet.readString("birthdate"))
.build();
}));
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
}
数据写入数据库中
待入库的文本数据
数据库表
JdbcBatchItemWriter
@Configuration
public class DbOutputDemoJobWriterConfiguration {
@Autowired
public DataSource dataSource;
@Bean
public JdbcBatchItemWriter<Customer> dbOutputDemoJobFlatFileWriter(){
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
// 设置数据源
itemWriter.setDataSource(dataSource);
// 执行sql语句
itemWriter.setSql("insert into customer(id,firstName,lastName,birthdate) values " +
"(:id,:firstName,:lastName,:birthdate)");
// 替换属性值
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
return itemWriter;
}
}
执行结果
数据写入.data文件中
FlatFileItemWriter可以将任何一个类型为T的对象数据写入到普通文件中
我们将customerInit.csv中的数据读出并且写入到文件customerInfo.data中
FlatFileItemWriter
@Configuration
public class FlatFileDemoJobWriterConfiguration {
@Bean
public FlatFileItemWriter<Customer> flatFileDemoFlatFileWriter() throws Exception {
FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
// 输出文件路径
String path = File.createTempFile("customerInfo",".data").getAbsolutePath();
System.out.println(">> file is created in: " + path);
itemWriter.setResource(new FileSystemResource(path));
// 将Customer对象转为字符串
itemWriter.setLineAggregator(new MyCustomerLineAggregator());
itemWriter.afterPropertiesSet();
return itemWriter;
}
}
public class MyCustomerLineAggregator implements LineAggregator<Customer> {
//JSON
private ObjectMapper mapper = new ObjectMapper();
@Override
public String aggregate(Customer customer) {
try {
return mapper.writeValueAsString(customer);
} catch (JsonProcessingException e) {
throw new RuntimeException("Unable to serialize.",e);
}
}
}
数据写入XML文件中
将数据写入到xml文件中,必须用到StaxEventItemWriter,也会用到XStreamMarshaller来序列文件
StaxEventItemWriter
@Configuration
public class XMLFileDemoJobWriterConfiguration {
@Bean
public StaxEventItemWriter<Customer> xmlFileDemoXMLFileWriter() throws Exception {
// 对象转为XML
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String,Class> aliases = new HashMap<>();
aliases.put("customer",Customer.class);
marshaller.setAliases(aliases);
StaxEventItemWriter<Customer> itemWriter = new StaxEventItemWriter<>();
// 指定根标签
itemWriter.setRootTagName("customers");
itemWriter.setMarshaller(marshaller);
// 指定输出xml文件路径
String path = File.createTempFile("customerInfo",".xml").getAbsolutePath();
System.out.println(">> xml file is generated: " + path);
itemWriter.setResource(new FileSystemResource(path));
itemWriter.afterPropertiesSet();
return itemWriter;
}
}
输出如下
数据写入多种文件中
将数据写入多个文件,需要使用CompositItemWriter或者使用ClassifierCompositItemWriter
二者差异:
-
CompositeItemWriter 是把全量数据分别写入多个文件中;
-
ClassifierCompositeItemWriter是根据指定规则,把满足条件的数据写入指定文件中;
将数据分别写入到xml文件和json文件中,在CompositeItemWriter、ClassifierCompositeItemWriter中实现写入文件
@Bean
public StaxEventItemWriter<Customer> xmlFileWriter() throws Exception {
// 对象转为XML
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String,Class> aliases = new HashMap<>();
aliases.put("customer",Customer.class);
marshaller.setAliases(aliases);
StaxEventItemWriter<Customer> itemWriter = new StaxEventItemWriter<>();
// 指定根标签
itemWriter.setRootTagName("customers");
itemWriter.setMarshaller(marshaller);
// 指定输出路径
String path = File.createTempFile("multiInfo",".xml").getAbsolutePath();
System.out.println(">> xml file is created in: " + path);
itemWriter.setResource(new FileSystemResource(path));
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean
public FlatFileItemWriter<Customer> jsonFileWriter() throws Exception {
FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
// 指定输出路径
String path = File.createTempFile("multiInfo",".json").getAbsolutePath();
System.out.println(">> json file is created in: " + path);
itemWriter.setResource(new FileSystemResource(path));
itemWriter.setLineAggregator(new MyCustomerLineAggregator());
itemWriter.afterPropertiesSet();
return itemWriter;
}
CompositeItemWriter
使用CompositeItemWriter输出数据到多个文件 ``` @Bean public CompositeItemWriter customerCompositeItemWriter() throws Exception { CompositeItemWriter itemWriter = new CompositeItemWriter<>(); // 指定多个输出对象 itemWriter.setDelegates(Arrays.asList(xmlFileWriter(),jsonFileWriter())); itemWriter.afterPropertiesSet(); return itemWriter; } ```
输出结果
ClassifierCompositeItemWriter
使用ClassifierCompositeItemWriter根据规则输出数据到文件
@Bean
public ClassifierCompositeItemWriter<Customer> customerCompositeItemWriter() throws Exception {
ClassifierCompositeItemWriter<Customer> itemWriter = new ClassifierCompositeItemWriter<>();
itemWriter.setClassifier(new MyCustomerClassifier(xmlFileWriter(),jsonFileWriter()));
return itemWriter;
}
构造指定数据划分规则,按照customer的id进行分类 ``` public class MyCustomerClassifier implements Classifier
}
<br/>
输出结果
![file](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9ncmFwaC5iYWlkdS5jb20vcmVzb3VyY2UvMjIyMDMwYmUzN2U5OWM2MzAwNTgzMDE1ODMzMzMyMzIucG5n?x-oss-process=image/format,png)
<br/>
![file](https://imgconvert.csdnimg.cn/aHR0cHM6Ly9ncmFwaC5iYWlkdS5jb20vcmVzb3VyY2UvMjIyZjAwNWRiNjVmNjcwNmVkMWEwMDE1ODMzMzMzMTcucG5n?x-oss-process=image/format,png)
<br/>
<br/>
参考:
https://blog.csdn.net/wuzhiwei549/article/details/88593942
https://blog.51cto.com/13501268/2298822
转载:https://blog.csdn.net/weixin_38004638/article/details/104765005
查看评论