Skip to content

Commit b20d276

Browse files
artembilanmicheljung
authored andcommitted
Add gauges for queue channel size (#3349)
* Add gauges for queue channel size The `QueueChannel` provides a current size and remaining capacity metrics * Add Micrometer gauges into `QueueChannel` to expose the current values of the size and remaining capacity **Cherry-pick to 5.3.x, 5.2.x & 5.1.x** * * Revert `@SuppressWarnings("unchecked")` for test * Document new gauges for queue channel * * Fix IntegrationManagementConfigurer for NPE on `metricsCaptor` * Fix wording in meter descriptions Co-authored-by: Michel Jung <[email protected]> Co-authored-by: Michel Jung <[email protected]>
1 parent 84234b2 commit b20d276

File tree

4 files changed

+69
-6
lines changed

4 files changed

+69
-6
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannel.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.concurrent.TimeUnit;
2626

2727
import org.springframework.integration.core.MessageSelector;
28+
import org.springframework.integration.support.management.metrics.GaugeFacade;
29+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
2830
import org.springframework.lang.Nullable;
2931
import org.springframework.messaging.Message;
3032
import org.springframework.util.Assert;
@@ -49,6 +51,12 @@ public class QueueChannel extends AbstractPollableChannel implements QueueChanne
4951

5052
protected final Semaphore queueSemaphore = new Semaphore(0); // NOSONAR final
5153

54+
@Nullable
55+
private GaugeFacade sizeGauge;
56+
57+
@Nullable
58+
private GaugeFacade remainingCapacityGauge;
59+
5260
/**
5361
* Create a channel with the specified queue.
5462
*
@@ -79,6 +87,26 @@ public QueueChannel() {
7987
this(new LinkedBlockingQueue<>());
8088
}
8189

90+
@Override
91+
public void registerMetricsCaptor(MetricsCaptor metricsCaptor) {
92+
super.registerMetricsCaptor(metricsCaptor);
93+
this.sizeGauge =
94+
metricsCaptor.gaugeBuilder("spring.integration.channel.queue.size", this,
95+
(channel) -> getQueueSize())
96+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
97+
.tag("type", "channel")
98+
.description("The size of the queue channel")
99+
.build();
100+
101+
this.remainingCapacityGauge =
102+
metricsCaptor.gaugeBuilder("spring.integration.channel.queue.remaining.capacity", this,
103+
(channel) -> getRemainingCapacity())
104+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
105+
.tag("type", "channel")
106+
.description("The remaining capacity of the queue channel")
107+
.build();
108+
}
109+
82110
@Override
83111
protected boolean doSend(Message<?> message, long timeout) {
84112
Assert.notNull(message, "'message' must not be null");
@@ -207,4 +235,15 @@ public int getRemainingCapacity() {
207235
}
208236
}
209237

238+
@Override
239+
public void destroy() {
240+
super.destroy();
241+
if (this.sizeGauge != null) {
242+
this.sizeGauge.remove();
243+
}
244+
if (this.remainingCapacityGauge != null) {
245+
this.remainingCapacityGauge.remove();
246+
}
247+
}
248+
210249
}

spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfigurer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,8 @@ public void afterSingletonsInstantiated() {
285285
}
286286

287287
private void injectCaptor() {
288-
Map<String, IntegrationManagement> managed = this.applicationContext
289-
.getBeansOfType(IntegrationManagement.class);
288+
Map<String, IntegrationManagement> managed =
289+
this.applicationContext.getBeansOfType(IntegrationManagement.class);
290290
for (Entry<String, IntegrationManagement> entry : managed.entrySet()) {
291291
IntegrationManagement bean = entry.getValue();
292292
if (!getOverrides(bean).loggingConfigured) {
@@ -299,7 +299,7 @@ private void injectCaptor() {
299299
@Override
300300
public Object postProcessAfterInitialization(Object bean, String name) throws BeansException {
301301
if (this.singletonsInstantiated) {
302-
if (bean instanceof IntegrationManagement) {
302+
if (this.metricsCaptor != null && bean instanceof IntegrationManagement) {
303303
((IntegrationManagement) bean).registerMetricsCaptor(this.metricsCaptor);
304304
}
305305
return doConfigureMetrics(bean, name);

spring-integration-core/src/test/java/org/springframework/integration/support/management/micrometer/MicrometerMetricsTests.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -94,7 +94,7 @@ public class MicrometerMetricsTests {
9494

9595
@SuppressWarnings("unchecked")
9696
@Test
97-
public void testSend() {
97+
public void testMicrometerMetrics() {
9898
GenericMessage<String> message = new GenericMessage<>("foo");
9999
this.channel.send(message);
100100
assertThatExceptionOfType(MessagingException.class)
@@ -162,6 +162,16 @@ public void testSend() {
162162
.tag("result", "success")
163163
.counter().count()).isEqualTo(1);
164164

165+
this.queue.send(message);
166+
167+
assertThat(registry.get("spring.integration.channel.queue.size")
168+
.tag("name", "queue")
169+
.gauge().value()).isEqualTo(2d);
170+
171+
assertThat(registry.get("spring.integration.channel.queue.remaining.capacity")
172+
.tag("name", "queue")
173+
.gauge().value()).isEqualTo(8d);
174+
165175
assertThat(registry.get("spring.integration.send")
166176
.tag("name", "nullChannel")
167177
.tag("result", "success")
@@ -260,7 +270,7 @@ protected Object doReceive() {
260270

261271
@Bean
262272
public QueueChannel queue() {
263-
return new QueueChannel();
273+
return new QueueChannel(10);
264274
}
265275

266276
@Bean

src/reference/asciidoc/metrics.adoc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,20 @@ It is possible to customize the names and tags of `Meters` created by integratio
165165
The https://github.com/spring-projects/spring-integration/blob/master/spring-integration-core/src/test/java/org/springframework/integration/support/management/micrometer/MicrometerCustomMetricsTests.java[MicrometerCustomMetricsTests] test case shows a simple example of how to do that.
166166
You can also further customize the meters by overloading the `build()` methods on builder subclasses.
167167

168+
Starting with version 5.1.13, the `QueueChannel` exposes Micrometer gauges for queue size and remaining capacity:
169+
170+
* `name`: `spring.integration.channel.queue.size`
171+
* `tag`: `type:channel`
172+
* `tag`: `name:<componentName>`
173+
* `description`: `The size of queue channel`
174+
175+
and
176+
177+
* `name`: `spring.integration.channel.queue.remaining.capacity`
178+
* `tag`: `type:channel`
179+
* `tag`: `name:<componentName>`
180+
* `description`: `The remaining.capacity of queue channel`
181+
168182
[[mgmt-channel-features]]
169183
==== `MessageChannel` Metric Features
170184

0 commit comments

Comments
 (0)