Skip to content

Commit 198356e

Browse files
committed
Add the metrics collector abstraction
1 parent e20b284 commit 198356e

File tree

11 files changed

+421
-46
lines changed

11 files changed

+421
-46
lines changed

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,13 @@
130130
<scope>test</scope>
131131
</dependency>
132132

133+
<dependency>
134+
<groupId>org.mockito</groupId>
135+
<artifactId>mockito-junit-jupiter</artifactId>
136+
<version>${mockito.version}</version>
137+
<scope>test</scope>
138+
</dependency>
139+
133140
<dependency>
134141
<groupId>io.dropwizard.metrics</groupId>
135142
<artifactId>metrics-core</artifactId>
@@ -209,7 +216,7 @@
209216
<excludes>
210217
<exclude>**/*TestSuite.java</exclude>
211218
</excludes>
212-
<argLine>${test-arguments}</argLine>
219+
<argLine>-Xshare:off ${test-arguments}</argLine>
213220
<systemPropertyVariables>
214221
<net.bytebuddy.experimental>true</net.bytebuddy.experimental>
215222
<rabbitmqctl.bin>DOCKER:rabbitmq</rabbitmqctl.bin>

src/main/java/com/rabbitmq/model/amqp/AmqpConnection.java

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.util.Collections.singletonMap;
2424

2525
import com.rabbitmq.model.*;
26+
import com.rabbitmq.model.metrics.MetricsCollector;
2627
import java.time.Duration;
2728
import java.util.ArrayList;
2829
import java.util.List;
@@ -111,19 +112,16 @@ class AmqpConnection extends ResourceBase implements Connection {
111112
} else {
112113
disconnectHandler =
113114
(c, e) -> {
114-
if (this.closed.compareAndSet(false, true)) {
115-
ModelException failureCause = convert(e.failureCause(), "Connection disconnected");
116-
this.state(CLOSING, failureCause);
117-
this.releaseManagementResources();
118-
this.state(CLOSED, failureCause);
119-
}
115+
ModelException failureCause = convert(e.failureCause(), "Connection disconnected");
116+
this.close(failureCause);
120117
};
121118
this.recoveryRequestQueue = null;
122119
this.recoveryLoop = null;
123120
}
124121
this.nativeConnection = connect(this.connectionSettings, builder.name(), disconnectHandler);
125122
this.management = createManagement();
126123
this.state(OPEN);
124+
this.environment.metricsCollector().openConnection();
127125
}
128126

129127
@Override
@@ -166,38 +164,7 @@ public RpcServerBuilder rpcServerBuilder() {
166164

167165
@Override
168166
public void close() {
169-
if (this.closed.compareAndSet(false, true)) {
170-
this.state(CLOSING);
171-
if (this.recoveryLoop != null) {
172-
this.recoveryLoop.cancel(true);
173-
}
174-
if (this.topologyListener instanceof AutoCloseable) {
175-
try {
176-
((AutoCloseable) this.topologyListener).close();
177-
} catch (Exception e) {
178-
LOGGER.info("Error while closing topology listener", e);
179-
}
180-
}
181-
this.closeManagement();
182-
for (RpcClient rpcClient : this.rpcClients) {
183-
rpcClient.close();
184-
}
185-
for (RpcServer rpcServer : this.rpcServers) {
186-
rpcServer.close();
187-
}
188-
for (AmqpPublisher publisher : this.publishers) {
189-
publisher.close();
190-
}
191-
for (AmqpConsumer consumer : this.consumers) {
192-
consumer.close();
193-
}
194-
try {
195-
this.nativeConnection.close();
196-
} catch (Exception e) {
197-
LOGGER.warn("Error while closing native connection", e);
198-
}
199-
this.state(CLOSED);
200-
}
167+
this.close(null);
201168
}
202169

203170
// internal API
@@ -508,6 +475,10 @@ Clock clock() {
508475
return this.environment.clock();
509476
}
510477

478+
MetricsCollector metricsCollector() {
479+
return this.environment.metricsCollector();
480+
}
481+
511482
Publisher createPublisher(AmqpPublisherBuilder builder) {
512483
// TODO copy the builder properties to create the publisher
513484
AmqpPublisher publisher = new AmqpPublisher(builder);
@@ -573,6 +544,42 @@ private String currentConnectionLabel() {
573544
}
574545
}
575546

547+
private void close(Throwable cause) {
548+
if (this.closed.compareAndSet(false, true)) {
549+
this.state(CLOSING, cause);
550+
if (this.recoveryLoop != null) {
551+
this.recoveryLoop.cancel(true);
552+
}
553+
if (this.topologyListener instanceof AutoCloseable) {
554+
try {
555+
((AutoCloseable) this.topologyListener).close();
556+
} catch (Exception e) {
557+
LOGGER.info("Error while closing topology listener", e);
558+
}
559+
}
560+
this.closeManagement();
561+
for (RpcClient rpcClient : this.rpcClients) {
562+
rpcClient.close();
563+
}
564+
for (RpcServer rpcServer : this.rpcServers) {
565+
rpcServer.close();
566+
}
567+
for (AmqpPublisher publisher : this.publishers) {
568+
publisher.close();
569+
}
570+
for (AmqpConsumer consumer : this.consumers) {
571+
consumer.close();
572+
}
573+
try {
574+
this.nativeConnection.close();
575+
} catch (Exception e) {
576+
LOGGER.warn("Error while closing native connection", e);
577+
}
578+
this.state(CLOSED, cause);
579+
this.environment.metricsCollector().closeConnection();
580+
}
581+
}
582+
576583
@Override
577584
public String toString() {
578585
return this.environment.toString() + "-" + this.id;

src/main/java/com/rabbitmq/model/amqp/AmqpConsumer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.rabbitmq.model.Consumer;
2323
import com.rabbitmq.model.ModelException;
24+
import com.rabbitmq.model.metrics.MetricsCollector;
2425
import java.lang.reflect.Field;
2526
import java.lang.reflect.InvocationTargetException;
2627
import java.lang.reflect.Method;
@@ -59,6 +60,7 @@ class AmqpConsumer extends ResourceBase implements Consumer {
5960
private final AmqpConnection connection;
6061
private final AtomicBoolean paused = new AtomicBoolean(false);
6162
private final AtomicReference<CountDownLatch> echoedFlowAfterPauseLatch = new AtomicReference<>();
63+
private final MetricsCollector metricsCollector;
6264
// native receiver internal state, accessed only in the native executor/scheduler
6365
private ProtonReceiver protonReceiver;
6466
private Scheduler protonExecutor;
@@ -76,7 +78,9 @@ class AmqpConsumer extends ResourceBase implements Consumer {
7678
this.initStateFromNativeReceiver(this.nativeReceiver);
7779
this.connection = builder.connection();
7880
this.startReceivingLoop();
81+
this.metricsCollector = this.connection.metricsCollector();
7982
this.state(OPEN);
83+
this.metricsCollector.openConsumer();
8084
}
8185

8286
@Override
@@ -107,6 +111,7 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
107111
while (!Thread.currentThread().isInterrupted()) {
108112
Delivery delivery = receiver.receive(100, TimeUnit.MILLISECONDS);
109113
if (delivery != null) {
114+
this.metricsCollector.consume();
110115
AmqpMessage message = new AmqpMessage(delivery.message());
111116
// TODO make disposition idempotent
112117
Consumer.Context context =
@@ -117,6 +122,8 @@ public void accept() {
117122
try {
118123
protonExecutor.execute(() -> replenishCreditIfNeeded());
119124
delivery.disposition(DeliveryState.accepted(), true);
125+
metricsCollector.consumeDisposition(
126+
MetricsCollector.ConsumeDisposition.ACCEPTED);
120127
} catch (ClientIllegalStateException | ClientIOException e) {
121128
LOGGER.debug("message accept failed: {}", e.getMessage());
122129
} catch (ClientException e) {
@@ -129,6 +136,8 @@ public void discard() {
129136
try {
130137
protonExecutor.execute(() -> replenishCreditIfNeeded());
131138
delivery.disposition(DeliveryState.rejected("", ""), true);
139+
metricsCollector.consumeDisposition(
140+
MetricsCollector.ConsumeDisposition.DISCARDED);
132141
} catch (ClientIllegalStateException | ClientIOException e) {
133142
LOGGER.debug("message discard failed: {}", e.getMessage());
134143
} catch (ClientException e) {
@@ -141,6 +150,8 @@ public void requeue() {
141150
try {
142151
protonExecutor.execute(() -> replenishCreditIfNeeded());
143152
delivery.disposition(DeliveryState.released(), true);
153+
metricsCollector.consumeDisposition(
154+
MetricsCollector.ConsumeDisposition.REQUEUED);
144155
} catch (ClientIllegalStateException | ClientIOException e) {
145156
LOGGER.debug("message requeue failed: {}", e.getMessage());
146157
} catch (ClientException e) {
@@ -196,6 +207,7 @@ private void close(Throwable cause) {
196207
LOGGER.warn("Error while closing receiver", e);
197208
}
198209
this.state(CLOSED, cause);
210+
this.metricsCollector.closeConsumer();
199211
}
200212
}
201213

src/main/java/com/rabbitmq/model/amqp/AmqpEnvironment.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package com.rabbitmq.model.amqp;
1919

2020
import com.rabbitmq.model.*;
21+
import com.rabbitmq.model.metrics.MetricsCollector;
22+
import com.rabbitmq.model.metrics.NoOpMetricsCollector;
2123
import java.util.ArrayList;
2224
import java.util.Collections;
2325
import java.util.List;
@@ -46,9 +48,12 @@ class AmqpEnvironment implements Environment {
4648
private final ScheduledExecutorService scheduledExecutorService;
4749
private volatile ScheduledFuture<?> clockRefreshFuture;
4850
private final AtomicBoolean clockRefreshSet = new AtomicBoolean(false);
51+
private final MetricsCollector metricsCollector;
4952

5053
AmqpEnvironment(
51-
ExecutorService executorService, DefaultConnectionSettings<?> connectionSettings) {
54+
ExecutorService executorService,
55+
DefaultConnectionSettings<?> connectionSettings,
56+
MetricsCollector metricsCollector) {
5257
this.id = ID_SEQUENCE.getAndIncrement();
5358
connectionSettings.copyTo(this.connectionSettings);
5459
this.connectionSettings.consolidate();
@@ -64,6 +69,8 @@ class AmqpEnvironment implements Environment {
6469
}
6570
this.scheduledExecutorService =
6671
Executors.newScheduledThreadPool(0, Utils.defaultThreadFactory());
72+
this.metricsCollector =
73+
metricsCollector == null ? NoOpMetricsCollector.INSTANCE : metricsCollector;
6774
}
6875

6976
DefaultConnectionSettings<?> connectionSettings() {
@@ -117,6 +124,10 @@ ScheduledExecutorService scheduledExecutorService() {
117124
return this.scheduledExecutorService;
118125
}
119126

127+
MetricsCollector metricsCollector() {
128+
return this.metricsCollector;
129+
}
130+
120131
void addConnection(AmqpConnection connection) {
121132
this.connections.add(connection);
122133
}

src/main/java/com/rabbitmq/model/amqp/AmqpEnvironmentBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
import com.rabbitmq.model.ConnectionSettings;
2121
import com.rabbitmq.model.Environment;
2222
import com.rabbitmq.model.EnvironmentBuilder;
23+
import com.rabbitmq.model.metrics.MetricsCollector;
24+
import com.rabbitmq.model.metrics.NoOpMetricsCollector;
2325
import java.util.concurrent.ExecutorService;
2426

2527
public class AmqpEnvironmentBuilder implements EnvironmentBuilder {
2628

2729
private final DefaultEnvironmentConnectionSettings connectionSettings =
2830
new DefaultEnvironmentConnectionSettings(this);
2931
private ExecutorService executorService;
32+
private MetricsCollector metricsCollector = NoOpMetricsCollector.INSTANCE;
3033

3134
public AmqpEnvironmentBuilder() {}
3235

@@ -35,13 +38,18 @@ public AmqpEnvironmentBuilder executorService(ExecutorService executorService) {
3538
return this;
3639
}
3740

41+
public AmqpEnvironmentBuilder metricsCollector(MetricsCollector metricsCollector) {
42+
this.metricsCollector = metricsCollector;
43+
return this;
44+
}
45+
3846
public EnvironmentConnectionSettings connectionSettings() {
3947
return this.connectionSettings;
4048
}
4149

4250
@Override
4351
public Environment build() {
44-
return new AmqpEnvironment(executorService, connectionSettings);
52+
return new AmqpEnvironment(executorService, connectionSettings, metricsCollector);
4553
}
4654

4755
public interface EnvironmentConnectionSettings

src/main/java/com/rabbitmq/model/amqp/AmqpManagement.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,21 @@ public UnbindSpecification unbind() {
156156
public void close() {
157157
if (this.closed.compareAndSet(false, true) && this.initialized.get()) {
158158
this.releaseResources();
159-
this.receiver.close();
160-
this.sender.close();
161-
this.session.close();
159+
try {
160+
this.receiver.close();
161+
} catch (Exception e) {
162+
LOGGER.debug("Error while closing management receiver: {}", e.getMessage());
163+
}
164+
try {
165+
this.sender.close();
166+
} catch (Exception e) {
167+
LOGGER.debug("Error while closing management sender: {}", e.getMessage());
168+
}
169+
try {
170+
this.session.close();
171+
} catch (Exception e) {
172+
LOGGER.debug("Error while closing management session: {}", e.getMessage());
173+
}
162174
}
163175
}
164176

src/main/java/com/rabbitmq/model/amqp/AmqpPublisher.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.rabbitmq.model.Message;
2323
import com.rabbitmq.model.Publisher;
24+
import com.rabbitmq.model.metrics.MetricsCollector;
2425
import java.util.concurrent.*;
2526
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.atomic.AtomicLong;
@@ -42,6 +43,7 @@ class AmqpPublisher extends ResourceBase implements Publisher {
4243
private final String address;
4344
private final AmqpConnection connection;
4445
private final AtomicBoolean closed = new AtomicBoolean(false);
46+
private final MetricsCollector metricsCollector;
4547

4648
AmqpPublisher(AmqpPublisherBuilder builder) {
4749
super(builder.listeners());
@@ -50,7 +52,9 @@ class AmqpPublisher extends ResourceBase implements Publisher {
5052
this.address = builder.address();
5153
this.connection = builder.connection();
5254
this.sender = this.createSender(builder.connection().nativeSession(), this.address);
55+
this.metricsCollector = this.connection.metricsCollector();
5356
this.state(OPEN);
57+
this.metricsCollector.openPublisher();
5458
}
5559

5660
@Override
@@ -82,10 +86,14 @@ public void publish(Message message, Callback callback) {
8286
} catch (InterruptedException | ExecutionException e) {
8387
status = Status.FAILED;
8488
}
85-
@SuppressWarnings("rawtypes")
8689
DefaultContext defaultContext = new DefaultContext(message, status);
90+
this.metricsCollector.publishDisposition(
91+
status == Status.ACCEPTED
92+
? MetricsCollector.PublishDisposition.ACCEPTED
93+
: MetricsCollector.PublishDisposition.FAILED);
8794
callback.handle(defaultContext);
8895
});
96+
this.metricsCollector.publish();
8997
} catch (ClientIllegalStateException e) {
9098
// the link is closed
9199
this.close(ExceptionUtils.convert(e));
@@ -129,6 +137,7 @@ private void close(Throwable cause) {
129137
LOGGER.warn("Error while closing sender", e);
130138
}
131139
this.state(State.CLOSED, cause);
140+
this.metricsCollector.closePublisher();
132141
}
133142
}
134143

0 commit comments

Comments
 (0)