Software Architecture

Pipe-and-Filter Architecture with Spring Batch

数据处理场景

height:300px


管道+过滤器, 实现数据的多步转化

UNIX Command Pipelines

root@linux: cat fall.txt win.txt | sort | gzip | mail fred@byu.edu

height:300px

编译器

LLVM's Implementation of the Three-Phase Design

height:250px


https://www.aosabook.org/en/llvm.html

“管道-过滤器”架构模式

组成:Pipes/Filters/Data Source/Data Sink

  • Filter完成单步数据处理功能
  • Data Source/Data Sink/Filter以Pipe连接
  • Pipe连接相邻元素,前一元素的输出为后一元素的输入

height:150px

过滤器(Filters)

过滤器是流水线的处理单元,负责丰富、提炼或转换他的输入数据。 它以下面的三种方式工作:

  • 后继单元从过滤器中拉出数据(被动式)
  • 前序单元把新的输入数据压入过滤器(被动式)
  • 从前序单元拉出输入数据并且将其输出数据压入后继(主动式)

管道(Pipes)

  • 管道表示过滤器之间的连接;数据源和第一个过滤器之间的连接;以及最后的过滤器和数据宿之间的连接。
  • 如果管道连接两个主动过滤器,那么管道需要进行缓冲和同步。

数据源/数据汇点(Data Source/Sink)

  • 数据源表示系统的输入,它提供一系列相同结构或者类型的数值
    • 数据源可以主动把数据值推入(push)第一个处理阶段,也可以由第一个处理阶段主动获取(pull)数据
  • 数据汇点收集来自流水线终点的结果
    • 主动数据汇点可以获取数据
    • 被动数据汇点允许前面的过滤器把结果推进汇点

Case I

Active Source / Passive Filter / Passive Sink

Case II

Passive Source / Passive Filter / Active Sink

Case III

Passive Source / Active Filter / Passive Sink

Case IV

Passive Source / Multiple Active Filters / Passive Sink

优缺点

  • 优点
    • 过滤器可以重用/重组合/可替换
    • 不需保存中间结果
    • 高效的并行处理 (多active部件)
  • 缺点
    • 数据传输开销较大
    • 数据转换开销较大
    • 错误处理较为复杂

Java IO Streams

Java IO

height:500px

Unix Pipe

root@linux: cat fall.txt win.txt | sort | gzip | mail fred@byu.edu

height:250px

Inter-process Communication
# include "apue.h"
int main(void){
    int n;
    int fd[2];
    pid_t pid;
    char line[MAXLINE];
    if (pipe(fd)<0)
        err_sys("pipe error");
    if ((pid = fork())<0) {
        err_sys("fork error");
    } else if (pid >0) {
        close(fd[0]);
        write(fd[1], "hello world\n", 12);
    } else {
        close(fd[1]);
        n = read(fd[0], line, MAXLINE);
    }
    exit(0);
}

Axis

height:400px


https://axis.apache.org/axis/

Axis Handlers (Filters)

height:450px

Axis Architecture

height:350px


http://axis.apache.org/axis/java/architecture-guide.html

Spring Batch

height:350px

https://docs.spring.io/spring-batch/docs/current/reference/html/index.html

Spring Batch

  • Automated, complex processing of large volumes of information that is most efficiently processed without user interaction, including time-based events (such as month-end calculations, notices, or correspondence).
  • Periodic application of complex business rules processed repetitively across very large data sets (for example, insurance benefit determination or rate adjustments).
  • Integration of information that is received from internal and external systems that typically requires formatting, validation, and processing in a transactional manner into the system of record.

Batch Stereotypes

height:350px


A Job has one to many steps, each of which has exactly one ItemReader, one ItemProcessor, and one ItemWriter. A job needs to be launched (with JobLauncher), and metadata about the currently running process needs to be stored (in JobRepository).

Creating a Batch Service

https://www.bilibili.com/video/BV1gK411j7hv?p=2 5:40

spring-guides/gs-batch-processing

height:300px


https://spring.io/guides/gs/batch-processing/
Configurations
@Bean
public FlatFileItemReader<Person> reader() {
    return new FlatFileItemReaderBuilder<Person>().name("personItemReader")
            .resource(new ClassPathResource("sample-data.csv")).delimited()
            .names(new String[] { "firstName", "lastName" })
            .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{setTargetType(Person.class);}}).build();
}
@Bean
public PersonItemProcessor processor() {
    return new PersonItemProcessor();
}
@Bean
public PersonItemProcessor <Person> writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Person>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(dataSource)
            .build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
    return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1)
            .end().build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
    return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor())
            .writer(writer).build();
}

Fluent interface

In software engineering, a fluent interface is an object-oriented API whose design relies extensively on method chaining. Its goal is to increase code legibility by creating a domain-specific language. The term was coined in 2005 by Eric Evans and Martin Fowler.


https://en.wikipedia.org/wiki/Fluent_interface

Batch Step

height:300px


A Step is a domain object that encapsulates an independent, sequential phase of a batch job and contains all of the information necessary to define and control the actual batch processing. 
https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#configureStep

FlatFileItemReader

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>().name("personItemReader")
                .resource(new ClassPathResource("sample-data.csv")).delimited()
                .names(new String[] { "firstName", "lastName" })
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
                    {
                        setTargetType(Person.class);
                    }
                }).build();
    }
FlatFileItemReaderBuilder -> FlatFileItemReader
https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/file/builder/FlatFileItemReaderBuilder.html

JdbcBatchItemWriter

    @Bean
    public PersonItemProcessor <Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(dataSource)
                .build();
    }
JdbcBatchItemWriterBuilder -> JdbcBatchItemWriter
https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/database/builder/JdbcBatchItemWriterBuilder.html

Chunk-oriented Processing

public interface ItemWriter<T>

void write(java.util.List<? extends T> items)
    throws java.lang.Exception

Process the supplied data element. 
Will not be called with any null 
items in normal operation.

Parameters:
items - items to be written
Throws:
java.lang.Exception - if there are errors.
Chaining ItemProcessors
public class Foo {}
public class Bar {
    public Bar(Foo foo) {}
}
public class Foobar {
    public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        return new Bar(foo); //Perform simple transformation, convert a Foo to a Bar
    }
}
public class BarProcessor implements ItemProcessor<Bar,Foobar>{
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}
public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(List<? extends Foobar> items) throws Exception {
        //write items
    }
}
@Bean
public Job ioSampleJob() {
	return this.jobBuilderFactory.get("ioSampleJob")
				.start(step1())
				.end()
				.build();
}
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());
	CompositeItemProcessor processor = new CompositeItemProcessor();
	processor.setDelegates(delegates);
	return processor;
}
https://docs.spring.io/spring-batch/docs/current/reference/html/readersAndWriters.html#chainingItemProcessors

Tasklet Step

public class FileDeletingTasklet implements Tasklet, InitializingBean {
    private Resource directory;
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException(…);
            }
        }
        return RepeatStatus.FINISHED;
    }
    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}
@Bean
public Job taskletJob() {
    return this.jobBuilderFactory.get("taskletJob")
                .start(deleteFilesInDir())
                .build();
}

@Bean
public Step deleteFilesInDir() {
    return this.stepBuilderFactory.get("deleteFilesInDir")
                .tasklet(fileDeletingTasklet())
                .build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
    FileDeletingTasklet tasklet = new FileDeletingTasklet();

    tasklet.setDirectoryResource(new FileSystemResource(“…/test-dir”));

    return tasklet;
}
https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#taskletStep

Sequential Flow

@Bean
public Job job() {
    return this.jobBuilderFactory.get("job")
                .start(stepA())
                .next(stepB())
                .next(stepC())
                .build();
}

Conditional Flow

@Bean
public Job job() {
    return this.jobBuilderFactory.get("job")
                .start(stepA())
                .on("*").to(stepB())
                .from(stepA()).on("FAILED").to(stepC())
                .end()
                .build();
}

Split Flow

    @Bean
    public Job parallelStepsJob() {
        Flow masterFlow = new FlowBuilder<Flow>("masterFlow").start(taskletStep("step1"))
                                .build();
        Flow flowJob2 = new FlowBuilder<Flow>("flow2").start(taskletStep("step2"))
                                .build();
        Flow flowJob3 = new FlowBuilder<Flow>("flow3").start(taskletStep("step3"))
                                .build();
        Flow flowJob4 = new FlowBuilder<Flow>("flow4").start(taskletStep("step4"))
                                .build();
        Flow slaveFlow = new FlowBuilder<Flow>("splitflow").split(new SimpleAsyncTaskExecutor())
                                .add(flowJob2, flowJob3, flowJob4).build();
        return (jobBuilderFactory.get("splitFlowJob").incrementer(new RunIdIncrementer())
                                    .start(masterFlow)
                .next(slaveFlow).build()).build();
    }

https://www.bilibili.com/video/BV1gK411j7hv?p=3 7:25

sa-spring/spring-batch-split

Scaling and Parallel Processing

  • Multi-threaded Step (single process)
  • Parallel Steps (single process)
  • Remote Chunking of Step (multi process)
  • Partitioning a Step (single or multi process)

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html

https://www.bilibili.com/video/BV1gK411j7hv?p=4 1:10

Multi-threaded Step

public class SimpleAsyncTaskExecutor
TaskExecutor implementation that fires up a new Thread for each task, executing it asynchronously. Supports limiting concurrent threads through the "concurrencyLimit" bean property. By default, the number of concurrent threads is unlimited.
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1").<Person, Person>chunk(10).reader(reader()).processor(processor())
                .writer(writer).taskExecutor(taskExecutor()).build();
    }

ThreadPoolTaskExecutor

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(2);
    executor.setMaxPoolSize(4);
    executor.setQueueCapacity(20);
    return executor;
}
As requests come in, threads will be created up to 2 and then tasks will be added to the queue until it reaches 20. When the queue is full new threads will be created up to maxPoolSize, i.e. 4. Once all the threads are in use and the queue is full tasks will be rejected. As the queue reduces, so does the number of active threads.

Remote Chunking

height:380px


https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking
https://docs.spring.io/spring-batch/docs/current/reference/html/spring-batch-integration.html#remote-chunking

Partitioning

height:380px


https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning

Partitioning

    @Bean
    public Job partitioningJob() throws Exception {
        return jobBuilderFactory.get("partitioningJob").incrementer(new RunIdIncrementer()).flow(masterStep()).end()
                .build();
    }
    @Bean
    public Step masterStep() throws Exception {
        return stepBuilderFactory.get("masterStep").partitioner(slaveStep()).partitioner("partition", partitioner())
                .gridSize(10).taskExecutor(new SimpleAsyncTaskExecutor()).build();
    }
    @Bean
    public Partitioner partitioner() throws Exception {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        partitioner.setResources(resolver.getResources("file://persona*"));
        return partitioner;
    }
    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep").<Map<String, String>, Map<String, String>>chunk(1).reader(reader(null)).writer(writer()).build();
    }

https://www.bilibili.com/video/BV1gK411j7hv?p=4 13:00

sa-spring/spring-batch-partition