Skip to content

Commit f3b2148

Browse files
committed
Add Micrometer metrics collector implementation
1 parent 198356e commit f3b2148

File tree

3 files changed

+270
-7
lines changed

3 files changed

+270
-7
lines changed

pom.xml

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@
9090
<systemPath>${project.basedir}/protonj2-1.0.0-M21-SNAPSHOT.jar</systemPath>
9191
</dependency>
9292

93+
<dependency>
94+
<groupId>io.micrometer</groupId>
95+
<artifactId>micrometer-core</artifactId>
96+
<version>${micrometer.version}</version>
97+
<optional>true</optional>
98+
</dependency>
9399

94100
<dependency>
95101
<groupId>org.junit.jupiter</groupId>
@@ -144,17 +150,11 @@
144150
<scope>test</scope>
145151
</dependency>
146152

147-
<dependency>
148-
<groupId>io.micrometer</groupId>
149-
<artifactId>micrometer-core</artifactId>
150-
<version>${micrometer.version}</version>
151-
<scope>test</scope>
152-
</dependency>
153-
154153
<dependency>
155154
<groupId>eu.rekawek.toxiproxy</groupId>
156155
<artifactId>toxiproxy-java</artifactId>
157156
<version>2.1.7</version>
157+
<scope>test</scope>
158158
</dependency>
159159

160160
<!-- add explicitly to update automatically with dependabot -->
@@ -165,6 +165,14 @@
165165
<scope>test</scope>
166166
</dependency>
167167

168+
<dependency>
169+
<groupId>io.micrometer</groupId>
170+
<artifactId>micrometer-registry-prometheus</artifactId>
171+
<version>${micrometer.version}</version>
172+
<scope>test</scope>
173+
</dependency>
174+
175+
168176
</dependencies>
169177

170178
<dependencyManagement>
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model.metrics;
19+
20+
import io.micrometer.core.instrument.Counter;
21+
import io.micrometer.core.instrument.MeterRegistry;
22+
import io.micrometer.core.instrument.Tag;
23+
import io.micrometer.core.instrument.Tags;
24+
import java.util.Collections;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
public class MicrometerMetricsCollector implements MetricsCollector {
28+
29+
private final AtomicLong connections;
30+
private final AtomicLong publishers;
31+
private final AtomicLong consumers;
32+
private final Counter publish, publishAccepted, publishFailed;
33+
private final Counter consume, consumeAccepted, consumeRequeued, consumeDiscarded;
34+
35+
public MicrometerMetricsCollector(MeterRegistry registry) {
36+
this(registry, "rabbitmq.amqp");
37+
}
38+
39+
public MicrometerMetricsCollector(final MeterRegistry registry, final String prefix) {
40+
this(registry, prefix, Collections.emptyList());
41+
}
42+
43+
public MicrometerMetricsCollector(
44+
final MeterRegistry registry, final String prefix, final String... tags) {
45+
this(registry, prefix, Tags.of(tags));
46+
}
47+
48+
public MicrometerMetricsCollector(
49+
final MeterRegistry registry, final String prefix, final Iterable<Tag> tags) {
50+
this.connections = registry.gauge(prefix + ".connections", tags, new AtomicLong(0));
51+
this.publishers = registry.gauge(prefix + ".publishers", tags, new AtomicLong(0));
52+
this.consumers = registry.gauge(prefix + ".consumers", tags, new AtomicLong(0));
53+
this.publish = registry.counter(prefix + ".published", tags);
54+
this.publishAccepted = registry.counter(prefix + ".published_accepted", tags);
55+
this.publishFailed = registry.counter(prefix + ".published_failed", tags);
56+
this.consume = registry.counter(prefix + ".consumed", tags);
57+
this.consumeAccepted = registry.counter(prefix + ".consumed_accepted", tags);
58+
this.consumeRequeued = registry.counter(prefix + ".consumed_requeued", tags);
59+
this.consumeDiscarded = registry.counter(prefix + ".consumed_discarded", tags);
60+
}
61+
62+
@Override
63+
public void openConnection() {
64+
this.connections.incrementAndGet();
65+
}
66+
67+
@Override
68+
public void closeConnection() {
69+
this.connections.decrementAndGet();
70+
}
71+
72+
@Override
73+
public void openPublisher() {
74+
this.publishers.incrementAndGet();
75+
}
76+
77+
@Override
78+
public void closePublisher() {
79+
this.publishers.decrementAndGet();
80+
}
81+
82+
@Override
83+
public void openConsumer() {
84+
this.consumers.incrementAndGet();
85+
}
86+
87+
@Override
88+
public void closeConsumer() {
89+
this.consumers.decrementAndGet();
90+
}
91+
92+
@Override
93+
public void publish() {
94+
this.publish.increment();
95+
}
96+
97+
@Override
98+
public void publishDisposition(PublishDisposition disposition) {
99+
switch (disposition) {
100+
case ACCEPTED:
101+
this.publishAccepted.increment();
102+
break;
103+
case FAILED:
104+
this.publishFailed.increment();
105+
break;
106+
default:
107+
break;
108+
}
109+
}
110+
111+
@Override
112+
public void consume() {
113+
this.consume.increment();
114+
}
115+
116+
@Override
117+
public void consumeDisposition(ConsumeDisposition disposition) {
118+
switch (disposition) {
119+
case ACCEPTED:
120+
this.consumeAccepted.increment();
121+
break;
122+
case REQUEUED:
123+
this.consumeRequeued.increment();
124+
break;
125+
case DISCARDED:
126+
this.consumeDiscarded.increment();
127+
break;
128+
default:
129+
break;
130+
}
131+
}
132+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model.metrics;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
23+
import io.micrometer.prometheusmetrics.PrometheusConfig;
24+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
25+
import org.junit.jupiter.api.Test;
26+
27+
public class MicrometerMetricsCollectorTest {
28+
29+
@Test
30+
void simple() {
31+
SimpleMeterRegistry registry = new SimpleMeterRegistry();
32+
MetricsCollector collector = new MicrometerMetricsCollector(registry);
33+
34+
assertThat(registry.get("rabbitmq.amqp.connections").gauge().value()).isZero();
35+
collector.openConnection();
36+
assertThat(registry.get("rabbitmq.amqp.connections").gauge().value()).isEqualTo(1);
37+
collector.openConnection();
38+
assertThat(registry.get("rabbitmq.amqp.connections").gauge().value()).isEqualTo(2);
39+
collector.closeConnection();
40+
assertThat(registry.get("rabbitmq.amqp.connections").gauge().value()).isEqualTo(1);
41+
42+
assertThat(registry.get("rabbitmq.amqp.publishers").gauge().value()).isZero();
43+
collector.openPublisher();
44+
assertThat(registry.get("rabbitmq.amqp.publishers").gauge().value()).isEqualTo(1);
45+
collector.openPublisher();
46+
assertThat(registry.get("rabbitmq.amqp.publishers").gauge().value()).isEqualTo(2);
47+
collector.closePublisher();
48+
assertThat(registry.get("rabbitmq.amqp.publishers").gauge().value()).isEqualTo(1);
49+
50+
assertThat(registry.get("rabbitmq.amqp.consumers").gauge().value()).isZero();
51+
collector.openConsumer();
52+
assertThat(registry.get("rabbitmq.amqp.consumers").gauge().value()).isEqualTo(1);
53+
collector.openConsumer();
54+
assertThat(registry.get("rabbitmq.amqp.consumers").gauge().value()).isEqualTo(2);
55+
collector.closeConsumer();
56+
assertThat(registry.get("rabbitmq.amqp.consumers").gauge().value()).isEqualTo(1);
57+
58+
assertThat(registry.get("rabbitmq.amqp.published").counter().count()).isZero();
59+
collector.publish();
60+
assertThat(registry.get("rabbitmq.amqp.published").counter().count()).isEqualTo(1.0);
61+
collector.publish();
62+
assertThat(registry.get("rabbitmq.amqp.published").counter().count()).isEqualTo(2.0);
63+
64+
assertThat(registry.get("rabbitmq.amqp.published_accepted").counter().count()).isZero();
65+
collector.publishDisposition(MetricsCollector.PublishDisposition.ACCEPTED);
66+
assertThat(registry.get("rabbitmq.amqp.published_accepted").counter().count()).isEqualTo(1.0);
67+
68+
assertThat(registry.get("rabbitmq.amqp.published_failed").counter().count()).isZero();
69+
collector.publishDisposition(MetricsCollector.PublishDisposition.FAILED);
70+
assertThat(registry.get("rabbitmq.amqp.published_failed").counter().count()).isEqualTo(1.0);
71+
72+
assertThat(registry.get("rabbitmq.amqp.consumed").counter().count()).isZero();
73+
collector.consume();
74+
assertThat(registry.get("rabbitmq.amqp.consumed").counter().count()).isEqualTo(1.0);
75+
collector.consume();
76+
collector.consume();
77+
assertThat(registry.get("rabbitmq.amqp.consumed").counter().count()).isEqualTo(3.0);
78+
79+
assertThat(registry.get("rabbitmq.amqp.consumed_accepted").counter().count()).isZero();
80+
collector.consumeDisposition(MetricsCollector.ConsumeDisposition.ACCEPTED);
81+
assertThat(registry.get("rabbitmq.amqp.consumed_accepted").counter().count()).isEqualTo(1.0);
82+
83+
assertThat(registry.get("rabbitmq.amqp.consumed_requeued").counter().count()).isZero();
84+
collector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
85+
assertThat(registry.get("rabbitmq.amqp.consumed_requeued").counter().count()).isEqualTo(1.0);
86+
87+
assertThat(registry.get("rabbitmq.amqp.consumed_discarded").counter().count()).isZero();
88+
collector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
89+
assertThat(registry.get("rabbitmq.amqp.consumed_discarded").counter().count()).isEqualTo(1.0);
90+
}
91+
92+
@Test
93+
void prometheus() {
94+
PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
95+
MetricsCollector collector = new MicrometerMetricsCollector(registry);
96+
97+
collector.openConnection();
98+
collector.openConnection();
99+
collector.closeConnection();
100+
101+
collector.openPublisher();
102+
collector.openPublisher();
103+
collector.closePublisher();
104+
105+
collector.openConsumer();
106+
collector.openConsumer();
107+
collector.closeConsumer();
108+
109+
collector.publish();
110+
collector.publish();
111+
112+
collector.publishDisposition(MetricsCollector.PublishDisposition.ACCEPTED);
113+
collector.publishDisposition(MetricsCollector.PublishDisposition.FAILED);
114+
115+
collector.consume();
116+
collector.consume();
117+
collector.consume();
118+
119+
collector.consumeDisposition(MetricsCollector.ConsumeDisposition.ACCEPTED);
120+
collector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
121+
collector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
122+
}
123+
}

0 commit comments

Comments
 (0)