Skip to content

Commit 6cc718a

Browse files
committed
Add composite item reader implementation
Resolves #757
1 parent ea378b9 commit 6cc718a

File tree

9 files changed

+381
-0
lines changed

9 files changed

+381
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import java.util.Iterator;
19+
import java.util.List;
20+
21+
import org.springframework.batch.item.ExecutionContext;
22+
import org.springframework.batch.item.ItemStreamException;
23+
import org.springframework.batch.item.ItemStreamReader;
24+
25+
/**
26+
* Composite reader that delegates reading to a list of {@link ItemStreamReader}s. This
27+
* implementation is not thread-safe.
28+
*
29+
* @author Mahmoud Ben Hassine
30+
* @param <T> type of objects to read
31+
* @since 5.2
32+
*/
33+
public class CompositeItemReader<T> implements ItemStreamReader<T> {
34+
35+
private final List<ItemStreamReader<T>> delegates;
36+
37+
private final Iterator<ItemStreamReader<T>> delegatesIterator;
38+
39+
private ItemStreamReader<T> currentDelegate;
40+
41+
/**
42+
* Create a new {@link CompositeItemReader}.
43+
* @param delegates the delegate readers to read data
44+
*/
45+
public CompositeItemReader(List<ItemStreamReader<T>> delegates) {
46+
this.delegates = delegates;
47+
this.delegatesIterator = this.delegates.iterator();
48+
this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
49+
}
50+
51+
// TODO: check if we need to open/close delegates on the fly in read() to avoid
52+
// opening resources early for a long time
53+
@Override
54+
public void open(ExecutionContext executionContext) throws ItemStreamException {
55+
for (ItemStreamReader<T> delegate : delegates) {
56+
delegate.open(executionContext);
57+
}
58+
}
59+
60+
@Override
61+
public T read() throws Exception {
62+
if (this.currentDelegate == null) {
63+
return null;
64+
}
65+
T item = currentDelegate.read();
66+
if (item == null) {
67+
currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null;
68+
return read();
69+
}
70+
return item;
71+
}
72+
73+
@Override
74+
public void update(ExecutionContext executionContext) throws ItemStreamException {
75+
if (this.currentDelegate != null) {
76+
this.currentDelegate.update(executionContext);
77+
}
78+
}
79+
80+
@Override
81+
public void close() throws ItemStreamException {
82+
for (ItemStreamReader<T> delegate : delegates) {
83+
delegate.close();
84+
}
85+
}
86+
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.support;
17+
18+
import java.util.Arrays;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import org.springframework.batch.item.ExecutionContext;
23+
import org.springframework.batch.item.ItemStreamReader;
24+
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.times;
27+
import static org.mockito.Mockito.verify;
28+
import static org.mockito.Mockito.verifyNoInteractions;
29+
import static org.mockito.Mockito.when;
30+
31+
/**
32+
* Test class for {@link CompositeItemReader}.
33+
*
34+
* @author Mahmoud Ben Hassine
35+
*/
36+
public class CompositeItemReaderTests {
37+
38+
@Test
39+
void testCompositeItemReaderOpen() {
40+
// given
41+
ItemStreamReader<String> reader1 = mock();
42+
ItemStreamReader<String> reader2 = mock();
43+
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
44+
ExecutionContext executionContext = new ExecutionContext();
45+
46+
// when
47+
compositeItemReader.open(executionContext);
48+
49+
// then
50+
verify(reader1).open(executionContext);
51+
verify(reader2).open(executionContext);
52+
}
53+
54+
@Test
55+
void testCompositeItemReaderRead() throws Exception {
56+
// given
57+
ItemStreamReader<String> reader1 = mock();
58+
ItemStreamReader<String> reader2 = mock();
59+
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
60+
when(reader1.read()).thenReturn("foo1", "foo2", null);
61+
when(reader2.read()).thenReturn("bar1", "bar2", null);
62+
63+
// when & then
64+
compositeItemReader.read();
65+
verify(reader1, times(1)).read();
66+
compositeItemReader.read();
67+
verify(reader1, times(2)).read();
68+
compositeItemReader.read();
69+
verify(reader1, times(3)).read();
70+
71+
compositeItemReader.read();
72+
verify(reader2, times(2)).read();
73+
compositeItemReader.read();
74+
verify(reader2, times(3)).read();
75+
compositeItemReader.read();
76+
verify(reader2, times(3)).read();
77+
}
78+
79+
@Test
80+
void testCompositeItemReaderUpdate() {
81+
// given
82+
ItemStreamReader<String> reader1 = mock();
83+
ItemStreamReader<String> reader2 = mock();
84+
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
85+
ExecutionContext executionContext = new ExecutionContext();
86+
87+
// when
88+
compositeItemReader.update(executionContext);
89+
90+
// then
91+
verify(reader1).update(executionContext);
92+
verifyNoInteractions(reader2); // reader1 is the current delegate in this setup
93+
}
94+
95+
@Test
96+
void testCompositeItemReaderClose() {
97+
// given
98+
ItemStreamReader<String> reader1 = mock();
99+
ItemStreamReader<String> reader2 = mock();
100+
CompositeItemReader<String> compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2));
101+
102+
// when
103+
compositeItemReader.close();
104+
105+
// then
106+
verify(reader1).close();
107+
verify(reader2).close();
108+
}
109+
110+
}

spring-batch-samples/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Here is a list of samples with checks to indicate which features each one demons
2626
| [Hello world Job Sample](#hello-world-job-sample) | | | | | | | | | | X | |
2727
| [Amqp Job Sample](#amqp-job-sample) | | | | | | | | | | X | |
2828
| [BeanWrapperMapper Sample](#beanwrappermapper-sample) | | | | X | | | | | | | |
29+
| [Composite ItemReader Sample](#composite-itemreader-sample) | | | | | | | X | | | | |
2930
| [Composite ItemWriter Sample](#composite-itemwriter-sample) | | | | | | | X | | | | |
3031
| [Customer Filter Sample](#customer-filter-sample) | | | | | | | | | | | X |
3132
| [Reader Writer Adapter Sample](#reader-writer-adapter-sample) | | | | | | | X | | | | |
@@ -121,6 +122,16 @@ prototype according to the field names in the file.
121122

122123
[BeanWrapperMapper Sample](src/main/java/org/springframework/batch/samples/beanwrapper/README.md)
123124

125+
### Composite ItemReader Sample
126+
127+
This sample shows how to use a composite item reader to read data with
128+
the same format from different data sources.
129+
130+
In this sample, data items of type `Person` are read from two flat files
131+
and a relational database table.
132+
133+
[Composite reader Sample](src/main/java/org/springframework/batch/samples/compositereader/README.md)
134+
124135
### Composite ItemWriter Sample
125136

126137
This shows a common use case using a composite pattern, composing
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
## Composite ItemReader Sample
2+
3+
### About
4+
5+
This sample shows how to use a composite item reader to read data with
6+
the same format from different data sources.
7+
8+
In this sample, data items of type `Person` are read from two flat files
9+
and a relational database table.
10+
11+
### Run the sample
12+
13+
You can run the sample from the command line as following:
14+
15+
```
16+
$>cd spring-batch-samples
17+
$>../mvnw -Dtest=CompositeItemReaderSampleFunctionalTests#testJobLaunch test
18+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
1,foo1
2+
2,foo2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
3,bar1
2+
4,bar2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
insert into person_source values (5, 'baz1');
2+
insert into person_source values (6, 'baz2');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
create table person_source (id int primary key, name varchar(20));
2+
create table person_target (id int primary key, name varchar(20));

0 commit comments

Comments
 (0)