Skip to content

Commit 9ff8f85

Browse files
committed
Make received message disposition idempotent
1 parent a99ca17 commit 9ff8f85

File tree

1 file changed

+34
-28
lines changed

1 file changed

+34
-28
lines changed

src/main/java/com/rabbitmq/model/amqp/AmqpConsumer.java

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -113,49 +113,55 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
113113
if (delivery != null) {
114114
this.metricsCollector.consume();
115115
AmqpMessage message = new AmqpMessage(delivery.message());
116-
// TODO make disposition idempotent
116+
AtomicBoolean disposed = new AtomicBoolean(false);
117117
Consumer.Context context =
118118
new Consumer.Context() {
119119

120120
@Override
121121
public void accept() {
122-
try {
123-
protonExecutor.execute(() -> replenishCreditIfNeeded());
124-
delivery.disposition(DeliveryState.accepted(), true);
125-
metricsCollector.consumeDisposition(
126-
MetricsCollector.ConsumeDisposition.ACCEPTED);
127-
} catch (ClientIllegalStateException | ClientIOException e) {
128-
LOGGER.debug("message accept failed: {}", e.getMessage());
129-
} catch (ClientException e) {
130-
throw ExceptionUtils.convert(e);
122+
if (disposed.compareAndSet(false, true)) {
123+
try {
124+
protonExecutor.execute(() -> replenishCreditIfNeeded());
125+
delivery.disposition(DeliveryState.accepted(), true);
126+
metricsCollector.consumeDisposition(
127+
MetricsCollector.ConsumeDisposition.ACCEPTED);
128+
} catch (ClientIllegalStateException | ClientIOException e) {
129+
LOGGER.debug("message accept failed: {}", e.getMessage());
130+
} catch (ClientException e) {
131+
throw ExceptionUtils.convert(e);
132+
}
131133
}
132134
}
133135

134136
@Override
135137
public void discard() {
136-
try {
137-
protonExecutor.execute(() -> replenishCreditIfNeeded());
138-
delivery.disposition(DeliveryState.rejected("", ""), true);
139-
metricsCollector.consumeDisposition(
140-
MetricsCollector.ConsumeDisposition.DISCARDED);
141-
} catch (ClientIllegalStateException | ClientIOException e) {
142-
LOGGER.debug("message discard failed: {}", e.getMessage());
143-
} catch (ClientException e) {
144-
throw ExceptionUtils.convert(e);
138+
if (disposed.compareAndSet(false, true)) {
139+
try {
140+
protonExecutor.execute(() -> replenishCreditIfNeeded());
141+
delivery.disposition(DeliveryState.rejected("", ""), true);
142+
metricsCollector.consumeDisposition(
143+
MetricsCollector.ConsumeDisposition.DISCARDED);
144+
} catch (ClientIllegalStateException | ClientIOException e) {
145+
LOGGER.debug("message discard failed: {}", e.getMessage());
146+
} catch (ClientException e) {
147+
throw ExceptionUtils.convert(e);
148+
}
145149
}
146150
}
147151

148152
@Override
149153
public void requeue() {
150-
try {
151-
protonExecutor.execute(() -> replenishCreditIfNeeded());
152-
delivery.disposition(DeliveryState.released(), true);
153-
metricsCollector.consumeDisposition(
154-
MetricsCollector.ConsumeDisposition.REQUEUED);
155-
} catch (ClientIllegalStateException | ClientIOException e) {
156-
LOGGER.debug("message requeue failed: {}", e.getMessage());
157-
} catch (ClientException e) {
158-
throw ExceptionUtils.convert(e);
154+
if (disposed.compareAndSet(false, true)) {
155+
try {
156+
protonExecutor.execute(() -> replenishCreditIfNeeded());
157+
delivery.disposition(DeliveryState.released(), true);
158+
metricsCollector.consumeDisposition(
159+
MetricsCollector.ConsumeDisposition.REQUEUED);
160+
} catch (ClientIllegalStateException | ClientIOException e) {
161+
LOGGER.debug("message requeue failed: {}", e.getMessage());
162+
} catch (ClientException e) {
163+
throw ExceptionUtils.convert(e);
164+
}
159165
}
160166
}
161167
};

0 commit comments

Comments
 (0)