Skip to content

Commit be4a3d8

Browse files
Added observation API
1 parent 1530d04 commit be4a3d8

File tree

13 files changed

+493
-18
lines changed

13 files changed

+493
-18
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
<spring-retry.version>1.3.1</spring-retry.version>
5656
<spring-integration.version>6.0.0-M1</spring-integration.version>
5757
<micrometer.version>2.0.0-SNAPSHOT</micrometer.version>
58+
<micrometer-tracing.version>1.0.0-SNAPSHOT</micrometer-tracing.version>
5859
<jackson.version>2.13.1</jackson.version>
5960

6061
<!-- optional production dependencies -->

spring-batch-core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,12 @@
256256
<version>${jakarta.inject-api.version}</version>
257257
<scope>test</scope>
258258
</dependency>
259+
<dependency>
260+
<groupId>io.micrometer</groupId>
261+
<artifactId>micrometer-test</artifactId>
262+
<version>${micrometer.version}</version>
263+
<scope>test</scope>
264+
</dependency>
259265
</dependencies>
260266

261267
</project>

spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import java.util.Collection;
2020
import java.util.Date;
21+
import java.util.List;
22+
import java.util.stream.Collectors;
2123

2224
import io.micrometer.api.instrument.LongTaskTimer;
2325
import io.micrometer.api.instrument.Tag;
24-
import io.micrometer.api.instrument.Timer;
26+
import io.micrometer.api.instrument.observation.Observation;
2527
import org.apache.commons.logging.Log;
2628
import org.apache.commons.logging.LogFactory;
2729
import org.springframework.batch.core.BatchStatus;
@@ -304,8 +306,9 @@ public final void execute(JobExecution execution) {
304306
LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs",
305307
Tag.of("name", execution.getJobInstance().getJobName()));
306308
LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start();
307-
Timer.Sample timerSample = BatchMetrics.createTimerSample();
308-
try {
309+
Observation observation = BatchMetrics.createObservation(BatchJobObservation.BATCH_JOB_OBSERVATION.getName())
310+
.contextualName(execution.getJobInstance().getJobName()).start();
311+
try (Observation.Scope scope = observation.openScope()) {
309312

310313
jobParametersValidator.validate(execution.getJobParameters());
311314

@@ -361,11 +364,7 @@ public final void execute(JobExecution execution) {
361364
ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job.");
362365
execution.setExitStatus(exitStatus.and(newExitStatus));
363366
}
364-
365-
timerSample.stop(BatchMetrics.createTimer("job", "Job duration",
366-
Tag.of("name", execution.getJobInstance().getJobName()),
367-
Tag.of("status", execution.getExitStatus().getExitCode())
368-
));
367+
stopTaggedObservation(execution, observation);
369368
longTaskTimerSample.stop();
370369
execution.setEndTime(new Date());
371370

@@ -384,6 +383,23 @@ public final void execute(JobExecution execution) {
384383

385384
}
386385

386+
private void stopTaggedObservation(JobExecution execution, Observation observation) {
387+
observation.lowCardinalityTag(BatchJobObservation.JobLowCardinalityTags.JOB_NAME.of(execution.getJobInstance().getJobName()))
388+
.lowCardinalityTag(BatchJobObservation.JobLowCardinalityTags.JOB_STATUS.of(execution.getExitStatus().getExitCode()))
389+
.highCardinalityTag(BatchJobObservation.JobHighCardinalityTags.JOB_INSTANCE_ID.of(String.valueOf(execution.getJobInstance().getInstanceId())))
390+
.highCardinalityTag(BatchJobObservation.JobHighCardinalityTags.JOB_EXECUTION_ID.of(String.valueOf(execution.getId())));
391+
List<Throwable> throwables = execution.getFailureExceptions();
392+
if (!throwables.isEmpty()) {
393+
observation.error(mergedThrowables(throwables));
394+
}
395+
observation.stop();
396+
}
397+
398+
private IllegalStateException mergedThrowables(List<Throwable> throwables) {
399+
return new IllegalStateException(
400+
throwables.stream().map(Throwable::toString).collect(Collectors.joining("\n")));
401+
}
402+
387403
/**
388404
* Convenience method for subclasses to delegate the handling of a specific
389405
* step in the context of the current {@link JobExecution}. Clients of this
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.core.job;
18+
19+
import io.micrometer.api.instrument.docs.DocumentedObservation;
20+
import io.micrometer.api.instrument.docs.TagKey;
21+
22+
enum BatchJobObservation implements DocumentedObservation {
23+
24+
/**
25+
* Observation created around a Job execution.
26+
*/
27+
BATCH_JOB_OBSERVATION {
28+
@Override
29+
public String getName() {
30+
return "spring.batch.job";
31+
}
32+
33+
@Override
34+
public String getContextualName() {
35+
return "%s";
36+
}
37+
38+
@Override
39+
public TagKey[] getLowCardinalityTagKeys() {
40+
return JobLowCardinalityTags.values();
41+
}
42+
43+
@Override
44+
public TagKey[] getHighCardinalityTagKeys() {
45+
return JobHighCardinalityTags.values();
46+
}
47+
48+
@Override
49+
public String getPrefix() {
50+
return "spring.batch";
51+
}
52+
};
53+
54+
enum JobLowCardinalityTags implements TagKey {
55+
56+
/**
57+
* Name of the Spring Batch job.
58+
*/
59+
JOB_NAME {
60+
@Override
61+
public String getKey() {
62+
return "spring.batch.job.name";
63+
}
64+
},
65+
66+
/**
67+
* Job status.
68+
*/
69+
JOB_STATUS {
70+
@Override
71+
public String getKey() {
72+
return "spring.batch.job.status";
73+
}
74+
}
75+
76+
}
77+
78+
enum JobHighCardinalityTags implements TagKey {
79+
80+
/**
81+
* ID of the Spring Batch job instance.
82+
*/
83+
JOB_INSTANCE_ID {
84+
@Override
85+
public String getKey() {
86+
return "spring.batch.job.instanceId";
87+
}
88+
},
89+
90+
/**
91+
* ID of the Spring Batch execution.
92+
*/
93+
JOB_EXECUTION_ID {
94+
@Override
95+
public String getKey() {
96+
return "spring.batch.job.executionId";
97+
}
98+
}
99+
100+
}
101+
}

spring-batch-core/src/main/java/org/springframework/batch/core/metrics/BatchMetrics.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
import java.util.concurrent.TimeUnit;
2222

2323
import io.micrometer.api.instrument.LongTaskTimer;
24+
import io.micrometer.api.instrument.MeterRegistry;
2425
import io.micrometer.api.instrument.Metrics;
2526
import io.micrometer.api.instrument.Tag;
2627
import io.micrometer.api.instrument.Timer;
28+
import io.micrometer.api.instrument.observation.Observation;
29+
import io.micrometer.api.instrument.observation.TimerObservationHandler;
30+
import io.micrometer.api.instrument.simple.SimpleMeterRegistry;
2731

2832
import org.springframework.lang.Nullable;
2933

@@ -52,6 +56,14 @@ public final class BatchMetrics {
5256

5357
private BatchMetrics() {}
5458

59+
private static final MeterRegistry simpleMeterRegistry = new SimpleMeterRegistry().withTimerObservationHandler();
60+
61+
static {
62+
// TODO: This shouldn't be necessary - we need to fix it in Micrometer
63+
Metrics.globalRegistry.observationConfig().observationHandler(new TimerObservationHandler(Metrics.globalRegistry));
64+
Metrics.globalRegistry.add(simpleMeterRegistry);
65+
}
66+
5567
/**
5668
* Create a {@link Timer}.
5769
* @param name of the timer. Will be prefixed with {@link BatchMetrics#METRICS_PREFIX}.
@@ -66,6 +78,15 @@ public static Timer createTimer(String name, String description, Tag... tags) {
6678
.register(Metrics.globalRegistry);
6779
}
6880

81+
/**
82+
* Create a new {@link Observation}. It's not started, you must
83+
* explicitly call {@link Observation#start()} to start it.
84+
* @return a new timer sample instance
85+
*/
86+
public static Observation createObservation(String name) {
87+
return Observation.createNotStarted(name, Metrics.globalRegistry);
88+
}
89+
6990
/**
7091
* Create a new {@link Timer.Sample}.
7192
* @return a new timer sample instance

spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
import java.time.Duration;
1919
import java.util.Date;
20+
import java.util.List;
21+
import java.util.stream.Collectors;
2022

21-
import io.micrometer.api.instrument.Tag;
22-
import io.micrometer.api.instrument.Timer;
23+
import io.micrometer.api.instrument.observation.Observation;
2324
import org.apache.commons.logging.Log;
2425
import org.apache.commons.logging.LogFactory;
2526
import org.springframework.batch.core.BatchStatus;
@@ -192,15 +193,16 @@ public final void execute(StepExecution stepExecution) throws JobInterruptedExce
192193
}
193194
stepExecution.setStartTime(new Date());
194195
stepExecution.setStatus(BatchStatus.STARTED);
195-
Timer.Sample sample = BatchMetrics.createTimerSample();
196+
Observation observation = BatchMetrics.createObservation(BatchStepObservation.BATCH_STEP_OBSERVATION.getName())
197+
.contextualName(stepExecution.getStepName()).start();
196198
getJobRepository().update(stepExecution);
197199

198200
// Start with a default value that will be trumped by anything
199201
ExitStatus exitStatus = ExitStatus.EXECUTING;
200202

201203
doExecutionRegistration(stepExecution);
202204

203-
try {
205+
try (Observation.Scope scope = observation.openScope()) {
204206
getCompositeListener().beforeStep(stepExecution);
205207
open(stepExecution.getExecutionContext());
206208

@@ -260,12 +262,7 @@ public final void execute(StepExecution stepExecution) throws JobInterruptedExce
260262
logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
261263
+ "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
262264
}
263-
264-
sample.stop(BatchMetrics.createTimer("step", "Step duration",
265-
Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()),
266-
Tag.of("name", stepExecution.getStepName()),
267-
Tag.of("status", stepExecution.getExitStatus().getExitCode())
268-
));
265+
stopTaggedObservation(stepExecution, observation);
269266
stepExecution.setEndTime(new Date());
270267
stepExecution.setExitStatus(exitStatus);
271268
Duration stepExecutionDuration = BatchMetrics.calculateDuration(stepExecution.getStartTime(), stepExecution.getEndTime());
@@ -299,6 +296,23 @@ public final void execute(StepExecution stepExecution) throws JobInterruptedExce
299296
}
300297
}
301298

299+
private void stopTaggedObservation(StepExecution stepExecution, Observation observation) {
300+
observation.lowCardinalityTag(BatchStepObservation.StepLowCardinalityTags.STEP_NAME.of(stepExecution.getStepName()))
301+
.lowCardinalityTag(BatchStepObservation.StepLowCardinalityTags.JOB_NAME.of(stepExecution.getJobExecution().getJobInstance().getJobName()))
302+
.lowCardinalityTag(BatchStepObservation.StepLowCardinalityTags.STEP_STATUS.of(stepExecution.getExitStatus().getExitCode()))
303+
.highCardinalityTag(BatchStepObservation.StepHighCardinalityTags.STEP_EXECUTION_ID.of(String.valueOf(stepExecution.getId())));
304+
List<Throwable> throwables = stepExecution.getFailureExceptions();
305+
if (!throwables.isEmpty()) {
306+
observation.error(mergedThrowables(throwables));
307+
}
308+
observation.stop();
309+
}
310+
311+
private IllegalStateException mergedThrowables(List<Throwable> throwables) {
312+
return new IllegalStateException(
313+
throwables.stream().map(Throwable::toString).collect(Collectors.joining("\n")));
314+
}
315+
302316
/**
303317
* Releases the most recent {@link StepExecution}
304318
*/

0 commit comments

Comments
 (0)