Skip to content

Add Completion on _waitForConfirmationActionBlock #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,24 @@ private async Task Init()
{
foreach (var id in publishingIds.Span)
{
_config.ConfirmHandler(new Confirmation
try
{
PublishingId = id,
Code = ResponseCode.Ok,
Stream = _config.Stream
});
_config.ConfirmHandler(new Confirmation
{
PublishingId = id,
Code = ResponseCode.Ok,
Stream = _config.Stream
});
}
catch (Exception e)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written in the comment The call is exposed to the user, so we need to catch any exception there could be an exception in the user code. we need to trap Exception to avoid blocking the iteration.

{
// The call is exposed to the user so we need to catch any exception
// there could be an exception in the user code.
// So here we log the exception and we continue.

_logger.LogError(e, "Error during confirm handler, publishing id: {Id}. " +
"Hint: Check the user ConfirmHandler callback", id);
}
}

_semaphore.Release(publishingIds.Length);
Expand Down
12 changes: 9 additions & 3 deletions RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ private async void OnTimedEvent(object sender, ElapsedEventArgs e)

foreach (var pair in timedOutMessages)
{
await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null).ConfigureAwait(false);
await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null)
.ConfigureAwait(false);
}
}

Expand All @@ -145,8 +146,13 @@ internal void AddUnConfirmedMessage(ulong publishingId, List<Message> messages)
});
}

internal Task RemoveUnConfirmedMessage(ConfirmationStatus confirmationStatus, ulong publishingId, string stream)
internal async Task RemoveUnConfirmedMessage(ConfirmationStatus confirmationStatus, ulong publishingId,
string stream)
{
return _waitForConfirmationActionBlock.SendAsync((confirmationStatus, publishingId, stream));
if (!await _waitForConfirmationActionBlock.SendAsync((confirmationStatus, publishingId, stream))
.ConfigureAwait(false))
{
await _waitForConfirmationActionBlock.Completion.ConfigureAwait(false);
}
}
}
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private async Task<IProducer> SuperStreamProducer()
_ => ConfirmationStatus.UndefinedError
};
_confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId,
stream);
stream).ConfigureAwait(false);
}
}, BaseLogger).ConfigureAwait(false);
}
Expand Down Expand Up @@ -97,7 +97,7 @@ private async Task<IProducer> StandardProducer()
_ => ConfirmationStatus.UndefinedError
};
_confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId,
confirmation.Stream);
confirmation.Stream).ConfigureAwait(false);
}
}, BaseLogger).ConfigureAwait(false);
}
Expand Down
4 changes: 3 additions & 1 deletion kubernetes/stream_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ metadata:
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: rabbitmq-stream
name: tls
namespace: stream-clients-test
spec:
replicas: 1
image: rabbitmq:3-management
service:
type: LoadBalancer
tls:
secretName: tls-secret
resources:
requests:
cpu: 500m
Expand Down
28 changes: 28 additions & 0 deletions kubernetes/test-server-key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCzfGD7b37pIVtW
O0lO27ZVJRdgFrEjI4F/9GUQvzb9zsYZBD4YbHIWISqrh1Y/4osQLrojKSkGZrX5
6x/RegCNNsfIeqAM6gNQ1sVH5pxVp2essOitBoy8Udgf6kqH46+yrTX0Dk23cgSp
7j0gCXL7KM15DBxDygjA96E1SHp44DgkwCvWLeYXFie5uVL1SUDx4drCGfnu2Fcg
rvGfPBZpUIR9okmJWZIJcESkL2u4GaGQPGGbQOkCZOcqwRs80mwyYHsKSthKc58n
lqJlZd76s/LNMbGkMdppyInYnqacx3yOPWVsbH6hWKPb1wQsNWnzCVFL3krvrqDI
rCXOOKN1AgMBAAECggEAEXHelDmZPmv3TmmLPb6lmV92RujVtSpbiRX2J7tKCz+o
aeCLb4DE0ulM1iicha+NYBiGj2nN+rkLaVvEty2yNYd0QgRHH5I0GcySFqOvoLSZ
Y0O7jaukDJ6w0KNLNKt41Xc31f146MJMeT62UrGQayBjXidC7QTLNoQq9zyQ6MQK
crbj8f/TqT68V5f0nQeYFRGPYZPiaLcDB6mOCL09B4bfMxcOl0/6SVouV8BrTHxs
kqtO6yxrpc0XL1vfCrBqljlVLnXyGNmkaegSQMOTDfGqM6Mkc6771DYt5MJsmVCH
VgDZeMs/BJA/srV2rXW4cwfO0OOSTkE9cCNzZM6Q6QKBgQDmLk+bLb6cafFceuGl
gktNYi1TbTD62/shKulmFm8C7jNMTdbGi4pEwlnxzU57A+spr0nzody5c8qMIfUF
Cpf9aRWs7xG0WOCuYbUSI+gbICxaJOKe00TyWwf/xn13p6J4wecg4n5C8Wo1QIRi
nFwgaqYATkyzhE9+YOQKpxgeTQKBgQDHnlmMxiTuD3j9BxEkD3aY4E97qTfPNqgM
umrwXoU0paYtgfuxMe7yjkE/qVKn3QP+wzN+XMR6YcBdphQPlSCcEyJrAZZCGrgJ
CSO9anY6CA5bgZ9Mk3pBCldHEQqxWrg27bejkn4KV3PXmDtQwMYrpsgEu3DPCgy/
TUVTnpK9yQKBgQC6PGQaWOOtKCapvZ6OTCJjJPkpU+JaRdwlVNPszl/ZTiLhLOWG
VOZ1hY5Cjutdqqj9XB8IaUDuJ5qM0PiusIiS9xAbkH6RnYuEa/eWCslEET7xXICj
IqrZMAAD2XQweMiCzdgUikzAGxXkqiOyqXH8pG1VOATlBjtPNFOtrs5bzQKBgD08
1cn6609gzcQJy/ddCwwBHEEae3WFFe65rZ7J0GGDQ8SIMLd+Uwh0HY4zGplGkzgv
l/d27AuDO2k/Tr4tCJD4ycE7/mWPHtAezqkIJPbOi+EEleL/By02x+mUT8xywTqQ
mJqEkUgI5g/IssGmMeUoSAozmnrZYWm6gb8SUYAJAoGATjkZuRTAx+85ninZCbMU
fKpxM5KhCOHOBzxEKjmHp/fRUga+XoZMeWtmcqABWa3pD3kgTa+9z1hrQaUCh1cw
01QjqGxY9Z7VdBMPexXiht1JDZULaJ3Cif4YKUrwrDWc737HRqLP6c7EMwXnAod2
RPe55pKjmk0X/uuple2WCAE=
-----END PRIVATE KEY-----
24 changes: 24 additions & 0 deletions kubernetes/test-server.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIID7zCCAtegAwIBAgIBATANBgkqhkiG9w0BAQsFADBMMTswOQYDVQQDDDJUTFNH
ZW5TZWxmU2lnbmVkdFJvb3RDQSAyMDIzLTA5LTEzVDE2OjU0OjQ0LjU2NzY0MjEN
MAsGA1UEBwwEJCQkJDAeFw0yMzA5MTMxNDU0NDRaFw0zMzA5MTAxNDU0NDRaMDYx
IzAhBgNVBAMMGmdzYW50b21hZ2c2TFZETS52bXdhcmUuY29tMQ8wDQYDVQQKDAZz
ZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCzfGD7b37pIVtW
O0lO27ZVJRdgFrEjI4F/9GUQvzb9zsYZBD4YbHIWISqrh1Y/4osQLrojKSkGZrX5
6x/RegCNNsfIeqAM6gNQ1sVH5pxVp2essOitBoy8Udgf6kqH46+yrTX0Dk23cgSp
7j0gCXL7KM15DBxDygjA96E1SHp44DgkwCvWLeYXFie5uVL1SUDx4drCGfnu2Fcg
rvGfPBZpUIR9okmJWZIJcESkL2u4GaGQPGGbQOkCZOcqwRs80mwyYHsKSthKc58n
lqJlZd76s/LNMbGkMdppyInYnqacx3yOPWVsbH6hWKPb1wQsNWnzCVFL3krvrqDI
rCXOOKN1AgMBAAGjgfEwge4wCQYDVR0TBAIwADALBgNVHQ8EBAMCBaAwEwYDVR0l
BAwwCgYIKwYBBQUHAwEwTAYDVR0RBEUwQ4IaZ3NhbnRvbWFnZzZMVkRNLnZtd2Fy
ZS5jb22CGmdzYW50b21hZ2c2TFZETS52bXdhcmUuY29tgglsb2NhbGhvc3QwHQYD
VR0OBBYEFDHW2nif3ILxi7DWd4TNyn63Q7apMB8GA1UdIwQYMBaAFKccKJsr3YYn
MVNXfujEGRCONpu9MDEGA1UdHwQqMCgwJqAkoCKGIGh0dHA6Ly9jcmwtc2VydmVy
OjgwMDAvYmFzaWMuY3JsMA0GCSqGSIb3DQEBCwUAA4IBAQCwa+ksiRPR06JZzKFd
pcD4K5oZ6F5mVpTqn3Kf5jS1cz6Ippi/T8nU8k/xVKmDMqqCWCYGal1U8DmHGPzQ
WOWMk/Ibb72feCS4txIH4GuV/ZO868/5qOy1rmP/UjOY6Kpyju/Eg13AdzcuSnZ3
rZcSncm/gY5BHMmUJdMutTe+Scz32VW7yV8Mi+2ZwsMiqLksZMpBJqPyxroGTksI
p7bklWf1pOgQqh9XJqu3x4rceH0o3xHZ/wana4RnSWL7Q4N6TNinAjLzlLvDByW7
JX9ivpCVpM0n6tIT+E7UbWVX6WoICjCJeDLNwq/jYVEDP80O3yjDYKpYIOqp/e7Q
/BJy
-----END CERTIFICATE-----