Skip to content

Commit 0c6bdb1

Browse files
fmbenhassinemminella
authored andcommitted
BATCH-2401: fix CPU intensive loop while polling partitions completion
Before this commit, the JsrPartitionHandler was polling partitions completion continuously. This causes a high CPU usage. This commit makes the polling thread sleep for a configurable amount of time in order to decrease CPU usage during the polling period. Resolves BATCH-2401
1 parent 9d1114c commit 0c6bdb1

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/jsr/partition/JsrPartitionHandler.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2014 the original author or authors.
2+
* Copyright 2013-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,10 +56,13 @@
5656
* cumulative result.
5757
*
5858
* @author Michael Minella
59+
* @author Mahmoud Ben Hassine
5960
* @since 3.0
6061
*/
6162
public class JsrPartitionHandler implements PartitionHandler, InitializingBean {
6263

64+
private static final int DEFAULT_POLLING_INTERVAL = 500;
65+
6366
// TODO: Replace with proper Channel and Messages once minimum support level for Spring is 4
6467
private Queue<Serializable> partitionDataQueue;
6568
private ReentrantLock lock;
@@ -72,6 +75,7 @@ public class JsrPartitionHandler implements PartitionHandler, InitializingBean {
7275
private JobRepository jobRepository;
7376
private boolean allowStartIfComplete = false;
7477
private Set<String> partitionStepNames = new HashSet<String>();
78+
private int pollingInterval = DEFAULT_POLLING_INTERVAL;
7579

7680
/**
7781
* @return the step that will be executed by each partition
@@ -156,6 +160,14 @@ public void setJobRepository(JobRepository jobRepository) {
156160
this.jobRepository = jobRepository;
157161
}
158162

163+
/**
164+
* @param pollingInterval the duration of partitions completion polling interval
165+
* (in milliseconds). The default value is 500ms.
166+
*/
167+
public void setPollingInterval(int pollingInterval) {
168+
this.pollingInterval = pollingInterval;
169+
}
170+
159171
/* (non-Javadoc)
160172
* @see org.springframework.batch.core.partition.PartitionHandler#handle(org.springframework.batch.core.partition.StepExecutionSplitter, org.springframework.batch.core.StepExecution)
161173
*/
@@ -224,6 +236,7 @@ private void processPartitionResults(
224236
final List<Future<StepExecution>> tasks,
225237
final Set<StepExecution> result) throws Exception {
226238
while(true) {
239+
Thread.sleep(pollingInterval);
227240
try {
228241
lock.lock();
229242
while(!partitionDataQueue.isEmpty()) {
@@ -382,6 +395,7 @@ public void afterPropertiesSet() throws Exception {
382395
Assert.notNull(propertyContext, "A BatchPropertyContext is required");
383396
Assert.isTrue(mapper != null || (threads > 0 || partitions > 0), "Either a mapper implementation or the number of partitions/threads is required");
384397
Assert.notNull(jobRepository, "A JobRepository is required");
398+
Assert.isTrue(pollingInterval >= 0, "The polling interval must be positive");
385399

386400
if(partitionDataQueue == null) {
387401
partitionDataQueue = new LinkedBlockingQueue<Serializable>();

spring-batch-core/src/test/java/org/springframework/batch/core/jsr/partition/JsrPartitionHandlerTests.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2014 the original author or authors.
2+
* Copyright 2013-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
3030
import org.springframework.batch.core.step.JobRepositorySupport;
3131
import org.springframework.batch.core.step.StepSupport;
32+
import org.springframework.util.StopWatch;
3233

3334
import javax.batch.api.BatchProperty;
3435
import javax.batch.api.partition.PartitionAnalyzer;
@@ -46,6 +47,7 @@
4647
import java.util.concurrent.ConcurrentLinkedQueue;
4748

4849
import static org.junit.Assert.assertEquals;
50+
import static org.junit.Assert.assertTrue;
4951
import static org.junit.Assert.fail;
5052

5153
public class JsrPartitionHandlerTests extends AbstractJsrTestCase {
@@ -114,6 +116,14 @@ public void testAfterPropertiesSet() throws Exception {
114116

115117
handler.setJobRepository(repository);
116118
handler.afterPropertiesSet();
119+
120+
handler.setPollingInterval(-1);
121+
try {
122+
handler.afterPropertiesSet();
123+
fail("Polling interval was not checked for");
124+
} catch(IllegalArgumentException iae) {
125+
assertEquals("The polling interval must be positive", iae.getMessage());
126+
}
117127
}
118128

119129
@Test
@@ -128,6 +138,23 @@ public void testHardcodedNumberOfPartitions() throws Exception {
128138
assertEquals(3, count);
129139
}
130140

141+
@Test
142+
public void testPollingPartitionsCompletion() throws Exception {
143+
handler.setThreads(3);
144+
handler.setPartitions(3);
145+
handler.setPollingInterval(1000);
146+
handler.afterPropertiesSet();
147+
148+
StopWatch stopWatch = new StopWatch();
149+
stopWatch.start();
150+
Collection<StepExecution> executions = handler.handle(stepSplitter, stepExecution);
151+
stopWatch.stop();
152+
153+
assertEquals(3, executions.size());
154+
assertEquals(3, count);
155+
assertTrue(stopWatch.getLastTaskTimeMillis() >= 1000);
156+
}
157+
131158
@Test
132159
public void testMapperProvidesPartitions() throws Exception {
133160
handler.setPartitionMapper(new PartitionMapper() {

0 commit comments

Comments
 (0)