Skip to content

Commit 9d1114c

Browse files
fmbenhassinemminella
authored andcommitted
BATCH-2445: make annotation based chunk listeners available to non-fault tolerant steps
Before this commit, annotation based chunk listeners were not registered when using a non-fault tolerant step builder. This commit moves the code of chunk listener annotations handling to the AbstractTaskletStepBuilder so that other tasklet builders can use it. Resolves BATCH-2445
1 parent 9bd7c1b commit 9d1114c

File tree

3 files changed

+174
-22
lines changed

3 files changed

+174
-22
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/AbstractTaskletStepBuilder.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2013 the original author or authors.
2+
* Copyright 2012-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,12 +15,18 @@
1515
*/
1616
package org.springframework.batch.core.step.builder;
1717

18+
import java.lang.reflect.Method;
19+
import java.util.HashSet;
1820
import java.util.LinkedHashSet;
1921
import java.util.Set;
2022

2123
import org.springframework.batch.core.ChunkListener;
2224
import org.springframework.batch.core.Step;
2325
import org.springframework.batch.core.StepExecutionListener;
26+
import org.springframework.batch.core.annotation.AfterChunk;
27+
import org.springframework.batch.core.annotation.AfterChunkError;
28+
import org.springframework.batch.core.annotation.BeforeChunk;
29+
import org.springframework.batch.core.listener.StepListenerFactoryBean;
2430
import org.springframework.batch.core.step.tasklet.Tasklet;
2531
import org.springframework.batch.core.step.tasklet.TaskletStep;
2632
import org.springframework.batch.item.ItemStream;
@@ -29,6 +35,7 @@
2935
import org.springframework.batch.repeat.exception.ExceptionHandler;
3036
import org.springframework.batch.repeat.support.RepeatTemplate;
3137
import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
38+
import org.springframework.batch.support.ReflectionUtils;
3239
import org.springframework.core.task.SyncTaskExecutor;
3340
import org.springframework.core.task.TaskExecutor;
3441
import org.springframework.transaction.interceptor.TransactionAttribute;
@@ -39,6 +46,7 @@
3946
*
4047
* @author Dave Syer
4148
* @author Michael Minella
49+
* @author Mahmoud Ben Hassine
4250
*
4351
* @since 2.2
4452
*
@@ -136,6 +144,32 @@ public AbstractTaskletStepBuilder<B> listener(ChunkListener listener) {
136144
return this;
137145
}
138146

147+
/**
148+
* Registers objects using the annotation based listener configuration.
149+
*
150+
* @param listener the object that has a method configured with listener annotation
151+
* @return this for fluent chaining
152+
*/
153+
@Override
154+
public B listener(Object listener) {
155+
super.listener(listener);
156+
157+
Set<Method> chunkListenerMethods = new HashSet<Method>();
158+
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeChunk.class));
159+
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunk.class));
160+
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunkError.class));
161+
162+
if(chunkListenerMethods.size() > 0) {
163+
StepListenerFactoryBean factory = new StepListenerFactoryBean();
164+
factory.setDelegate(listener);
165+
this.listener((ChunkListener) factory.getObject());
166+
}
167+
168+
@SuppressWarnings("unchecked")
169+
B result = (B) this;
170+
return result;
171+
}
172+
139173
/**
140174
* Register a stream for callbacks that manage restart data.
141175
*

spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/FaultTolerantStepBuilder.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2014 the original author or authors.
2+
* Copyright 2006-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,9 +32,6 @@
3232
import org.springframework.batch.core.SkipListener;
3333
import org.springframework.batch.core.StepExecutionListener;
3434
import org.springframework.batch.core.StepListener;
35-
import org.springframework.batch.core.annotation.AfterChunk;
36-
import org.springframework.batch.core.annotation.AfterChunkError;
37-
import org.springframework.batch.core.annotation.BeforeChunk;
3835
import org.springframework.batch.core.annotation.OnSkipInProcess;
3936
import org.springframework.batch.core.annotation.OnSkipInRead;
4037
import org.springframework.batch.core.annotation.OnSkipInWrite;
@@ -93,6 +90,7 @@
9390
* @author Dave Syer
9491
* @author Chris Schaefer
9592
* @author Michael Minella
93+
* @author Mahmoud Ben Hassine
9694
*
9795
* @since 2.2
9896
*/
@@ -206,23 +204,12 @@ public SimpleStepBuilder listener(Object listener) {
206204
skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInProcess.class));
207205
skipListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnSkipInWrite.class));
208206

209-
Set<Method> chunkListenerMethods = new HashSet<Method>();
210-
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeChunk.class));
211-
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunk.class));
212-
chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunkError.class));
213-
214207
if(skipListenerMethods.size() > 0) {
215208
StepListenerFactoryBean factory = new StepListenerFactoryBean();
216209
factory.setDelegate(listener);
217210
skipListeners.add((SkipListener) factory.getObject());
218211
}
219212

220-
if(chunkListenerMethods.size() > 0) {
221-
StepListenerFactoryBean factory = new StepListenerFactoryBean();
222-
factory.setDelegate(listener);
223-
super.listener(new TerminateOnExceptionChunkListenerDelegate((ChunkListener) factory.getObject()));
224-
}
225-
226213
@SuppressWarnings("unchecked")
227214
SimpleStepBuilder result = this;
228215
return result;

spring-batch-core/src/test/java/org/springframework/batch/core/step/builder/StepBuilderTests.java

Lines changed: 137 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2014 the original author or authors.
2+
* Copyright 2012-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,10 +15,31 @@
1515
*/
1616
package org.springframework.batch.core.step.builder;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
1821
import org.junit.Test;
19-
import org.springframework.batch.core.*;
20-
import org.springframework.batch.core.annotation.*;
22+
23+
import org.springframework.batch.core.BatchStatus;
24+
import org.springframework.batch.core.ExitStatus;
25+
import org.springframework.batch.core.JobParameters;
26+
import org.springframework.batch.core.StepContribution;
27+
import org.springframework.batch.core.StepExecution;
28+
import org.springframework.batch.core.StepExecutionListener;
29+
import org.springframework.batch.core.annotation.AfterChunk;
30+
import org.springframework.batch.core.annotation.AfterChunkError;
31+
import org.springframework.batch.core.annotation.AfterProcess;
32+
import org.springframework.batch.core.annotation.AfterRead;
33+
import org.springframework.batch.core.annotation.AfterStep;
34+
import org.springframework.batch.core.annotation.AfterWrite;
35+
import org.springframework.batch.core.annotation.BeforeChunk;
36+
import org.springframework.batch.core.annotation.BeforeProcess;
37+
import org.springframework.batch.core.annotation.BeforeRead;
38+
import org.springframework.batch.core.annotation.BeforeStep;
39+
import org.springframework.batch.core.annotation.BeforeWrite;
40+
import org.springframework.batch.core.configuration.xml.DummyItemReader;
2141
import org.springframework.batch.core.configuration.xml.DummyItemWriter;
42+
import org.springframework.batch.core.job.SimpleJob;
2243
import org.springframework.batch.core.repository.JobRepository;
2344
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
2445
import org.springframework.batch.core.scope.context.ChunkContext;
@@ -30,14 +51,12 @@
3051
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
3152
import org.springframework.transaction.PlatformTransactionManager;
3253

33-
import java.util.ArrayList;
34-
import java.util.List;
35-
3654
import static org.junit.Assert.assertEquals;
3755

3856
/**
3957
* @author Dave Syer
4058
* @author Michael Minella
59+
* @author Mahmoud Ben Hassine
4160
*
4261
*/
4362
public class StepBuilderTests {
@@ -85,6 +104,90 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon
85104
assertEquals(1, InterfaceBasedStepExecutionListener.afterStepCount);
86105
assertEquals(1, AnnotationBasedStepExecutionListener.beforeStepCount);
87106
assertEquals(1, AnnotationBasedStepExecutionListener.afterStepCount);
107+
assertEquals(1, AnnotationBasedStepExecutionListener.beforeChunkCount);
108+
assertEquals(1, AnnotationBasedStepExecutionListener.afterChunkCount);
109+
}
110+
111+
@Test
112+
public void testAnnotationBasedChunkListenerForTaskletStep() throws Exception {
113+
JobRepository jobRepository = new MapJobRepositoryFactoryBean().getObject();
114+
StepExecution execution = jobRepository.createJobExecution("foo", new JobParameters()).createStepExecution("step");
115+
jobRepository.add(execution);
116+
PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
117+
TaskletStepBuilder builder = new StepBuilder("step")
118+
.repository(jobRepository)
119+
.transactionManager(transactionManager)
120+
.tasklet(new Tasklet() {
121+
@Override
122+
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
123+
return null;
124+
}
125+
})
126+
.listener(new AnnotationBasedChunkListener());
127+
builder.build().execute(execution);
128+
assertEquals(BatchStatus.COMPLETED, execution.getStatus());
129+
assertEquals(1, AnnotationBasedChunkListener.beforeChunkCount);
130+
assertEquals(1, AnnotationBasedChunkListener.afterChunkCount);
131+
}
132+
133+
@Test
134+
public void testAnnotationBasedChunkListenerForSimpleTaskletStep() throws Exception {
135+
JobRepository jobRepository = new MapJobRepositoryFactoryBean().getObject();
136+
StepExecution execution = jobRepository.createJobExecution("foo", new JobParameters()).createStepExecution("step");
137+
jobRepository.add(execution);
138+
PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
139+
SimpleStepBuilder builder = new StepBuilder("step")
140+
.repository(jobRepository)
141+
.transactionManager(transactionManager)
142+
.chunk(5)
143+
.reader(new DummyItemReader())
144+
.writer(new DummyItemWriter())
145+
.listener(new AnnotationBasedChunkListener());
146+
builder.build().execute(execution);
147+
assertEquals(BatchStatus.COMPLETED, execution.getStatus());
148+
assertEquals(1, AnnotationBasedChunkListener.beforeChunkCount);
149+
assertEquals(1, AnnotationBasedChunkListener.afterChunkCount);
150+
}
151+
152+
@Test
153+
public void testAnnotationBasedChunkListenerForFaultTolerantTaskletStep() throws Exception {
154+
JobRepository jobRepository = new MapJobRepositoryFactoryBean().getObject();
155+
StepExecution execution = jobRepository.createJobExecution("foo", new JobParameters()).createStepExecution("step");
156+
jobRepository.add(execution);
157+
PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
158+
SimpleStepBuilder builder = new StepBuilder("step")
159+
.repository(jobRepository)
160+
.transactionManager(transactionManager)
161+
.chunk(5)
162+
.reader(new DummyItemReader())
163+
.writer(new DummyItemWriter())
164+
.faultTolerant()
165+
.listener(new AnnotationBasedChunkListener()); // TODO should this return FaultTolerantStepBuilder?
166+
builder.build().execute(execution);
167+
assertEquals(BatchStatus.COMPLETED, execution.getStatus());
168+
assertEquals(1, AnnotationBasedChunkListener.beforeChunkCount);
169+
assertEquals(1, AnnotationBasedChunkListener.afterChunkCount);
170+
}
171+
172+
@Test
173+
public void testAnnotationBasedChunkListenerForJobStepBuilder() throws Exception {
174+
JobRepository jobRepository = new MapJobRepositoryFactoryBean().getObject();
175+
StepExecution execution = jobRepository.createJobExecution("foo", new JobParameters()).createStepExecution("step");
176+
jobRepository.add(execution);
177+
PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
178+
SimpleJob job = new SimpleJob("job");
179+
job.setJobRepository(jobRepository);
180+
JobStepBuilder builder = new StepBuilder("step")
181+
.repository(jobRepository)
182+
.transactionManager(transactionManager)
183+
.job(job)
184+
.listener(new AnnotationBasedChunkListener());
185+
builder.build().execute(execution);
186+
assertEquals(BatchStatus.COMPLETED, execution.getStatus());
187+
188+
// it makes no sense to register a ChunkListener on a step which is not of type tasklet, so it should not be invoked
189+
assertEquals(0, AnnotationBasedChunkListener.beforeChunkCount);
190+
assertEquals(0, AnnotationBasedChunkListener.afterChunkCount);
88191
}
89192

90193
@Test
@@ -221,4 +324,32 @@ public void afterChunk() {
221324
afterChunkCount++;
222325
}
223326
}
327+
328+
public static class AnnotationBasedChunkListener {
329+
330+
static int beforeChunkCount = 0;
331+
static int afterChunkCount = 0;
332+
static int afterChunkErrorCount = 0;
333+
334+
public AnnotationBasedChunkListener() {
335+
beforeChunkCount = 0;
336+
afterChunkCount = 0;
337+
afterChunkErrorCount = 0;
338+
}
339+
340+
@BeforeChunk
341+
public void beforeChunk() {
342+
beforeChunkCount++;
343+
}
344+
345+
@AfterChunk
346+
public void afterChunk() {
347+
afterChunkCount++;
348+
}
349+
350+
@AfterChunkError
351+
public void afterChunkError() {
352+
afterChunkErrorCount++;
353+
}
354+
}
224355
}

0 commit comments

Comments
 (0)