Skip to content

Commit 488c0a3

Browse files
committed
Refactor event loop to accept several clients
Used only in the recovery listeners, this allows sharing the event loop between connections and saving some threads.
1 parent 78377c4 commit 488c0a3

File tree

6 files changed

+124
-43
lines changed

6 files changed

+124
-43
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
281281
if (builder.recoveryConfiguration().topology()) {
282282
RecordingTopologyListener rtl =
283283
new RecordingTopologyListener(
284-
"topology-listener-connection-" + this.name(), this.executorService());
284+
"topology-listener-connection-" + this.name(), this.environment.recoveryEventLoop());
285285
this.entityRecovery = new EntityRecovery(this, rtl);
286286
topologyListener = rtl;
287287
} else {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class AmqpEnvironment implements Environment {
4646
private final MetricsCollector metricsCollector;
4747
private final ObservationCollector observationCollector;
4848
private ConnectionUtils.AffinityCache affinityCache = new ConnectionUtils.AffinityCache();
49+
private final EventLoop recoveryEventLoop;
4950

5051
AmqpEnvironment(
5152
ExecutorService executorService,
@@ -71,6 +72,8 @@ class AmqpEnvironment implements Environment {
7172
metricsCollector == null ? NoOpMetricsCollector.INSTANCE : metricsCollector;
7273
this.observationCollector =
7374
observationCollector == null ? Utils.NO_OP_OBSERVATION_COLLECTOR : observationCollector;
75+
// TODO start the event loop on first use
76+
this.recoveryEventLoop = new EventLoop(this.executorService);
7477
}
7578

7679
DefaultConnectionSettings<?> connectionSettings() {
@@ -95,6 +98,7 @@ public void close() {
9598
if (this.closed.compareAndSet(false, true)) {
9699
this.connectionManager.close();
97100
this.client.close();
101+
this.recoveryEventLoop.close();
98102
if (this.internalExecutor) {
99103
this.executorService.shutdownNow();
100104
}
@@ -130,6 +134,10 @@ ConnectionUtils.AffinityCache affinityCache() {
130134
return this.affinityCache;
131135
}
132136

137+
EventLoop recoveryEventLoop() {
138+
return this.recoveryEventLoop;
139+
}
140+
133141
AmqpConnection connection(AmqpConnectionBuilder builder) {
134142
return this.connectionManager.connection(builder);
135143
}

src/main/java/com/rabbitmq/client/amqp/impl/EventLoop.java

Lines changed: 93 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,40 +19,46 @@
1919

2020
import com.rabbitmq.client.amqp.AmqpException;
2121
import java.time.Duration;
22+
import java.util.HashMap;
23+
import java.util.Map;
2224
import java.util.concurrent.*;
2325
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicLong;
2427
import java.util.concurrent.atomic.AtomicReference;
2528
import java.util.function.Consumer;
2629
import java.util.function.Supplier;
30+
import java.util.function.UnaryOperator;
2731
import org.slf4j.Logger;
2832
import org.slf4j.LoggerFactory;
2933

30-
final class EventLoop<S> implements AutoCloseable {
34+
final class EventLoop implements AutoCloseable {
3135

3236
private static final Duration TIMEOUT = Duration.ofSeconds(60);
3337
private static final Logger LOGGER = LoggerFactory.getLogger(EventLoop.class);
3438

3539
private final AtomicBoolean closed = new AtomicBoolean(false);
36-
private final String label;
3740
private final Future<?> loop;
3841
private final AtomicReference<Thread> loopThread = new AtomicReference<>();
39-
private final AtomicReference<S> stateReference = new AtomicReference<>();
40-
private final BlockingQueue<Consumer<S>> taskQueue = new ArrayBlockingQueue<>(100);
42+
private final BlockingQueue<ClientTaskContext<Object>> taskQueue = new ArrayBlockingQueue<>(100);
4143

42-
EventLoop(Supplier<S> stateSupplier, String label, ExecutorService executorService) {
43-
this.label = label;
44+
EventLoop(ExecutorService executorService) {
4445
CountDownLatch loopThreadSetLatch = new CountDownLatch(1);
4546
this.loop =
4647
executorService.submit(
4748
() -> {
48-
S state = stateSupplier.get();
4949
loopThread.set(Thread.currentThread());
50-
stateReference.set(state);
5150
loopThreadSetLatch.countDown();
51+
Map<Long, Object> states = new HashMap<>();
5252
while (!Thread.currentThread().isInterrupted()) {
5353
try {
54-
Consumer<S> task = this.taskQueue.take();
55-
task.accept(state);
54+
ClientTaskContext<Object> context = this.taskQueue.take();
55+
Object state = states.get(context.client.id);
56+
state = context.task.apply(state);
57+
if (state == null) {
58+
states.remove(context.client.id);
59+
} else {
60+
states.put(context.client.id, state);
61+
}
5662
} catch (InterruptedException e) {
5763
return;
5864
} catch (Exception e) {
@@ -70,34 +76,49 @@ final class EventLoop<S> implements AutoCloseable {
7076
}
7177
}
7278

73-
void submit(Consumer<S> task) {
79+
<S> Client<S> register(Supplier<S> stateSupplier) {
80+
Client<S> client = new Client<>(this);
81+
this.submit(
82+
client,
83+
nullState -> {
84+
S state = stateSupplier.get();
85+
client.stateReference.set(state);
86+
return state;
87+
});
88+
return client;
89+
}
90+
91+
private <ST> void submit(Client<ST> client, UnaryOperator<ST> task) {
7492
if (this.closed.get()) {
7593
throw new IllegalStateException("Event loop is closed");
7694
} else {
7795
if (Thread.currentThread().equals(this.loopThread.get())) {
78-
task.accept(this.stateReference.get());
96+
task.apply(client.stateReference.get());
7997
} else {
8098
CountDownLatch latch = new CountDownLatch(1);
8199
try {
82-
boolean added =
83-
this.taskQueue.offer(
100+
ClientTaskContext<ST> context =
101+
new ClientTaskContext<>(
102+
client,
84103
state -> {
85104
try {
86-
task.accept(state);
105+
return task.apply(state);
87106
} catch (Exception e) {
88-
LOGGER.info("Error during {} task", this.label, e);
107+
LOGGER.info("Error during task", e);
89108
} finally {
90109
latch.countDown();
91110
}
92-
},
93-
TIMEOUT.toMillis(),
94-
TimeUnit.MILLISECONDS);
111+
return null;
112+
});
113+
boolean added =
114+
this.taskQueue.offer(
115+
(ClientTaskContext<Object>) context, TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
95116
if (!added) {
96-
throw new AmqpException("Enqueueing of %s task timed out", this.label);
117+
throw new AmqpException("Enqueueing of task timed out");
97118
}
98119
} catch (InterruptedException e) {
99120
Thread.currentThread().interrupt();
100-
throw new AmqpException(this.label + " task enqueueing has been interrupted", e);
121+
throw new AmqpException("Task enqueueing has been interrupted", e);
101122
}
102123
try {
103124
boolean completed = latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
@@ -106,20 +127,66 @@ void submit(Consumer<S> task) {
106127
}
107128
} catch (InterruptedException e) {
108129
Thread.currentThread().interrupt();
109-
throw new AmqpException(this.label + " Topology task processing has been interrupted", e);
130+
throw new AmqpException("Topology task processing has been interrupted", e);
110131
}
111132
}
112133
}
113134
}
114135

115-
S state() {
116-
return this.stateReference.get();
117-
}
118-
119136
@Override
120137
public void close() {
121138
if (this.closed.compareAndSet(false, true)) {
122139
this.loop.cancel(true);
123140
}
124141
}
142+
143+
private static final AtomicLong CLIENT_ID_SEQUENCE = new AtomicLong();
144+
145+
static class Client<S> implements AutoCloseable {
146+
147+
private final long id;
148+
private final AtomicReference<S> stateReference = new AtomicReference<>();
149+
private final EventLoop loop;
150+
private final AtomicBoolean closed = new AtomicBoolean(false);
151+
152+
private Client(EventLoop loop) {
153+
this.id = CLIENT_ID_SEQUENCE.getAndIncrement();
154+
this.loop = loop;
155+
}
156+
157+
void submit(Consumer<S> task) {
158+
if (this.closed.get()) {
159+
throw new IllegalStateException("Event loop is closed");
160+
} else {
161+
this.loop.submit(
162+
this,
163+
s -> {
164+
task.accept(s);
165+
return s;
166+
});
167+
}
168+
}
169+
170+
@Override
171+
public void close() {
172+
if (this.closed.compareAndSet(false, true)) {
173+
this.loop.submit(this, s -> null);
174+
}
175+
}
176+
177+
S state() {
178+
return this.stateReference.get();
179+
}
180+
}
181+
182+
private static class ClientTaskContext<S> {
183+
184+
private final Client<S> client;
185+
private final UnaryOperator<S> task;
186+
187+
private ClientTaskContext(Client<S> client, UnaryOperator<S> task) {
188+
this.client = client;
189+
this.task = task;
190+
}
191+
}
125192
}

src/main/java/com/rabbitmq/client/amqp/impl/RecordingTopologyListener.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import java.util.*;
21-
import java.util.concurrent.*;
2221
import java.util.concurrent.atomic.AtomicBoolean;
2322
import java.util.concurrent.atomic.AtomicReference;
2423
import java.util.function.Consumer;
@@ -30,13 +29,13 @@ final class RecordingTopologyListener implements TopologyListener, AutoCloseable
3029
private static final Logger LOGGER = LoggerFactory.getLogger(RecordingTopologyListener.class);
3130

3231
private final String label;
33-
private final EventLoop<State> eventLoop;
32+
private final EventLoop.Client<State> eventLoopClient;
3433

3534
private final AtomicBoolean closed = new AtomicBoolean(false);
3635

37-
RecordingTopologyListener(String label, ExecutorService executorService) {
36+
RecordingTopologyListener(String label, EventLoop eventLoop) {
3837
this.label = label;
39-
this.eventLoop = new EventLoop<>(State::new, label, executorService);
38+
this.eventLoopClient = eventLoop.register(State::new);
4039
}
4140

4241
private static class State {
@@ -117,13 +116,13 @@ public void consumerDeleted(long id, String queue) {
117116
@Override
118117
public void close() {
119118
if (this.closed.compareAndSet(false, true)) {
120-
this.eventLoop.close();
119+
this.eventLoopClient.close();
121120
}
122121
}
123122

124123
private void submit(Consumer<State> task) {
125124
if (!this.closed.get()) {
126-
this.eventLoop.submit(task);
125+
this.eventLoopClient.submit(task);
127126
}
128127
}
129128

@@ -342,7 +341,7 @@ interface Visitor {
342341
// for test assertions
343342

344343
private State state() {
345-
return this.eventLoop.state();
344+
return this.eventLoopClient.state();
346345
}
347346

348347
Map<String, ExchangeSpec> exchanges() {

src/test/java/com/rabbitmq/client/amqp/impl/EventLoopTest.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
public class EventLoopTest {
2929

3030
static ExecutorService executorService;
31-
EventLoop<State> loop;
31+
EventLoop loop;
32+
EventLoop.Client<State> client;
3233

3334
@BeforeAll
3435
static void beforeAll() {
@@ -37,7 +38,8 @@ static void beforeAll() {
3738

3839
@BeforeEach
3940
void beforeEach() {
40-
loop = new EventLoop<>(State::new, "test", executorService);
41+
loop = new EventLoop(executorService);
42+
client = loop.register(State::new);
4143
}
4244

4345
@AfterEach
@@ -53,22 +55,27 @@ static void afterAll() {
5355
@Test
5456
void submitTasks() {
5557
AtomicInteger buffer = new AtomicInteger();
56-
loop.submit(s -> s.a = 42);
57-
loop.submit(s -> buffer.set(s.a));
58+
client.submit(s -> s.a = 42);
59+
client.submit(
60+
s -> {
61+
buffer.set(s.a);
62+
});
5863
assertThat(buffer).hasValue(42);
5964

60-
loop.submit(
65+
client.submit(
6166
s -> {
6267
s.a = 1;
6368
s.b = 2;
6469
});
65-
loop.submit(s -> buffer.set(s.a));
70+
client.submit(s -> buffer.set(s.a));
6671
assertThat(buffer).hasValue(1);
67-
loop.submit(s -> buffer.set(s.b));
72+
client.submit(s -> buffer.set(s.b));
6873
assertThat(buffer).hasValue(2);
6974

75+
client.close();
76+
assertThatThrownBy(() -> client.submit(s -> {})).isInstanceOf(IllegalStateException.class);
7077
loop.close();
71-
assertThatThrownBy(() -> loop.submit(s -> {})).isInstanceOf(IllegalStateException.class);
78+
assertThatThrownBy(() -> loop.register(State::new));
7279
}
7380

7481
static class State {

src/test/java/com/rabbitmq/client/amqp/impl/RecordingTopologyListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class RecordingTopologyListenerTest {
3636
@BeforeEach
3737
void init() {
3838
executorService = Executors.newSingleThreadExecutor();
39-
recovery = new RecordingTopologyListener("", executorService);
39+
recovery = new RecordingTopologyListener("", new EventLoop(executorService));
4040
}
4141

4242
@AfterEach

0 commit comments

Comments
 (0)