-
Notifications
You must be signed in to change notification settings - Fork 582
basicCancel and basicConsume honor rpc timeout. #278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Also make sure to remove other e2e bindings for autodeleted exchanges
@@ -1267,17 +1275,26 @@ public void basicCancel(final String consumerTag) | |||
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() { | |||
@Override | |||
public Consumer transformReply(AMQCommand replyCommand) { | |||
replyCommand.getMethod(); | |||
((Basic.CancelOk) replyCommand.getMethod()).getConsumerTag(); // just to make sure its the method expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll be logging another issue related to this. During cluster node failovers we have seen some TimeoutExceptions during connection recovery end up causing ClassCastExceptions because the reply for the timed out rpc request comes in while a 2nd rpc request is waiting. Adding this here to make sure the replyCommand is actually a CancelOk event. I'm not sure there is an easy fix for the timed out rpc replies so I didn't try to tackle it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so you want to trigger the "unsynchronized" RPC responses problem here, right? If so, maybe wrap it in try/catch block and log the exception as a warning, to avoid changing the behavior to much. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
public Consumer transformReply(AMQCommand replyCommand) {
if (!(replyCommand.getMethod() instanceof Basic.CancelOk))
LOGGER.warn("Received reply was not of expected method Basic.CancelOk");
_consumers.remove(consumerTag); //may already have been removed
dispatcher.handleCancelOk(originalConsumer, consumerTag);
return originalConsumer;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks!
@@ -805,7 +805,7 @@ void maybeDeleteRecordedAutoDeleteExchange(String exchange) { | |||
// last binding where this exchange is the source is gone, remove recorded exchange | |||
// if it is auto-deleted. See bug 26364. | |||
if((x != null) && x.isAutoDelete()) { | |||
this.recordedExchanges.remove(exchange); | |||
deleteRecordedExchange(exchange); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was possible to have abandoned e2e bindings in the recordedBindings before this change.
Workflow to reproduce:
- create durable topic exchange -> auto-delete headers exchange -> auto-delete queue -> consumer
- cancel consumer
- calls maybeDeleteRecordedAutoDeleteQueue which ends up deleting the recorded queue
- calls maybeDeleteRecordedAutoDeleteExchange and removes the recorded AD headers exchange
In that case the e2e binding from the durable topic exchange to the now deleted headers exchange still existed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't directly related to the issue you fix here. Could you address this one in a separate PR? It's better for versioning, changelog, etc. Thanks :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -805,7 +805,7 @@ void maybeDeleteRecordedAutoDeleteExchange(String exchange) { | |||
// last binding where this exchange is the source is gone, remove recorded exchange | |||
// if it is auto-deleted. See bug 26364. | |||
if((x != null) && x.isAutoDelete()) { | |||
this.recordedExchanges.remove(exchange); | |||
deleteRecordedExchange(exchange); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't directly related to the issue you fix here. Could you address this one in a separate PR? It's better for versioning, changelog, etc. Thanks :)
@@ -1267,17 +1275,26 @@ public void basicCancel(final String consumerTag) | |||
BlockingRpcContinuation<Consumer> k = new BlockingRpcContinuation<Consumer>() { | |||
@Override | |||
public Consumer transformReply(AMQCommand replyCommand) { | |||
replyCommand.getMethod(); | |||
((Basic.CancelOk) replyCommand.getMethod()).getConsumerTag(); // just to make sure its the method expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so you want to trigger the "unsynchronized" RPC responses problem here, right? If so, maybe wrap it in try/catch block and log the exception as a warning, to avoid changing the behavior to much. Thanks.
@vikinghawk Thanks! |
Also make sure to remove other e2e bindings for autodeleted exchanges.
See https://groups.google.com/forum/#!topic/rabbitmq-users/pQ46aq6Tf_o for more details related to this PR.