Skip to content

Commit 7226a9b

Browse files
committed
Extract event loop in dedicated class
1 parent 99f7a27 commit 7226a9b

File tree

2 files changed

+110
-69
lines changed

2 files changed

+110
-69
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import com.rabbitmq.client.amqp.AmqpException;
21+
import java.time.Duration;
22+
import java.util.concurrent.*;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
final class EventLoop implements AutoCloseable {
28+
29+
private static final Duration TIMEOUT = Duration.ofSeconds(60);
30+
private static final Logger LOGGER = LoggerFactory.getLogger(EventLoop.class);
31+
32+
private final String label;
33+
private final Future<?> loop;
34+
private final AtomicReference<Thread> loopThread = new AtomicReference<>();
35+
private final BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(100);
36+
37+
EventLoop(String label, ExecutorService executorService) {
38+
this.label = label;
39+
CountDownLatch loopThreadSetLatch = new CountDownLatch(1);
40+
41+
this.loop =
42+
executorService.submit(
43+
() -> {
44+
loopThread.set(Thread.currentThread());
45+
loopThreadSetLatch.countDown();
46+
while (!Thread.currentThread().isInterrupted()) {
47+
try {
48+
Runnable task = this.taskQueue.take();
49+
task.run();
50+
} catch (InterruptedException e) {
51+
return;
52+
} catch (Exception e) {
53+
LOGGER.warn("Error during processing of topology recording task", e);
54+
}
55+
}
56+
});
57+
try {
58+
if (!loopThreadSetLatch.await(10, TimeUnit.SECONDS)) {
59+
throw new IllegalStateException("Recording topology loop could not start");
60+
}
61+
} catch (InterruptedException e) {
62+
Thread.currentThread().interrupt();
63+
throw new AmqpException("Error while creating recording topology listener", e);
64+
}
65+
}
66+
67+
void submit(Runnable task) {
68+
if (Thread.currentThread().equals(this.loopThread.get())) {
69+
task.run();
70+
} else {
71+
CountDownLatch latch = new CountDownLatch(1);
72+
try {
73+
boolean added =
74+
this.taskQueue.offer(
75+
() -> {
76+
try {
77+
task.run();
78+
} catch (Exception e) {
79+
LOGGER.info("Error during {} task", this.label, e);
80+
} finally {
81+
latch.countDown();
82+
}
83+
},
84+
TIMEOUT.toMillis(),
85+
TimeUnit.MILLISECONDS);
86+
if (!added) {
87+
throw new AmqpException("Enqueueing of %s task timed out", this.label);
88+
}
89+
} catch (InterruptedException e) {
90+
Thread.currentThread().interrupt();
91+
throw new AmqpException(this.label + " task enqueueing has been interrupted", e);
92+
}
93+
try {
94+
latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
95+
} catch (InterruptedException e) {
96+
Thread.currentThread().interrupt();
97+
throw new AmqpException(this.label + " Topology task processing has been interrupted", e);
98+
}
99+
}
100+
}
101+
102+
@Override
103+
public void close() {
104+
this.loop.cancel(true);
105+
}
106+
}

src/main/java/com/rabbitmq/client/amqp/impl/RecordingTopologyListener.java

Lines changed: 4 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -17,56 +17,22 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20-
import com.rabbitmq.client.amqp.AmqpException;
21-
import java.time.Duration;
2220
import java.util.*;
2321
import java.util.concurrent.*;
2422
import java.util.concurrent.atomic.AtomicBoolean;
2523
import java.util.concurrent.atomic.AtomicReference;
26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
2824

2925
final class RecordingTopologyListener implements TopologyListener, AutoCloseable {
3026

31-
private static final Duration TIMEOUT = Duration.ofSeconds(60);
32-
33-
private static final Logger LOGGER = LoggerFactory.getLogger(RecordingTopologyListener.class);
34-
27+
private final EventLoop eventLoop;
3528
private final Map<String, ExchangeSpec> exchanges = new LinkedHashMap<>();
3629
private final Map<String, QueueSpec> queues = new LinkedHashMap<>();
3730
private final Set<BindingSpec> bindings = new LinkedHashSet<>();
3831
private final Map<Long, ConsumerSpec> consumers = new LinkedHashMap<>();
39-
private final BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(100);
40-
private final Future<?> loop;
41-
private final AtomicReference<Thread> loopThread = new AtomicReference<>();
4232
private final AtomicBoolean closed = new AtomicBoolean(false);
4333

4434
RecordingTopologyListener(ExecutorService executorService) {
45-
CountDownLatch loopThreadSetLatch = new CountDownLatch(1);
46-
this.loop =
47-
executorService.submit(
48-
() -> {
49-
loopThread.set(Thread.currentThread());
50-
loopThreadSetLatch.countDown();
51-
while (!Thread.currentThread().isInterrupted()) {
52-
try {
53-
Runnable task = this.taskQueue.take();
54-
task.run();
55-
} catch (InterruptedException e) {
56-
return;
57-
} catch (Exception e) {
58-
LOGGER.warn("Error during processing of topology recording task", e);
59-
}
60-
}
61-
});
62-
try {
63-
if (!loopThreadSetLatch.await(10, TimeUnit.SECONDS)) {
64-
throw new IllegalStateException("Recording topology loop could not start");
65-
}
66-
} catch (InterruptedException e) {
67-
Thread.currentThread().interrupt();
68-
throw new AmqpException("Error while creating recording topology listener", e);
69-
}
35+
this.eventLoop = new EventLoop("topology", executorService);
7036
}
7137

7238
@Override
@@ -139,44 +105,13 @@ public void consumerDeleted(long id, String queue) {
139105
@Override
140106
public void close() {
141107
if (this.closed.compareAndSet(false, true)) {
142-
this.loop.cancel(true);
108+
this.eventLoop.close();
143109
}
144110
}
145111

146112
private void submit(Runnable task) {
147113
if (!this.closed.get()) {
148-
if (Thread.currentThread().equals(this.loopThread.get())) {
149-
task.run();
150-
} else {
151-
CountDownLatch latch = new CountDownLatch(1);
152-
try {
153-
boolean added =
154-
this.taskQueue.offer(
155-
() -> {
156-
try {
157-
task.run();
158-
} catch (Exception e) {
159-
LOGGER.info("Error during topology recording task", e);
160-
} finally {
161-
latch.countDown();
162-
}
163-
},
164-
TIMEOUT.toMillis(),
165-
TimeUnit.MILLISECONDS);
166-
if (!added) {
167-
throw new AmqpException("Enqueueing of topology task timed out");
168-
}
169-
} catch (InterruptedException e) {
170-
Thread.currentThread().interrupt();
171-
throw new AmqpException("Topology task enqueueing has been interrupted", e);
172-
}
173-
try {
174-
latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
175-
} catch (InterruptedException e) {
176-
Thread.currentThread().interrupt();
177-
throw new AmqpException("Topology task processing has been interrupted", e);
178-
}
179-
}
114+
this.eventLoop.submit(task);
180115
}
181116
}
182117

0 commit comments

Comments
 (0)