Skip to content

Commit 445ebb8

Browse files
committed
Implement basic.return support.
1 parent 4a6ae00 commit 445ebb8

File tree

11 files changed

+233
-131
lines changed

11 files changed

+233
-131
lines changed

projects/RabbitMQ.Client/Constants.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,11 @@ public static class Constants
9191
/// for setting this value for a particular channel.
9292
/// </summary>
9393
public const ushort DefaultConsumerDispatchConcurrency = 1;
94+
95+
/// <summary>
96+
/// The message header used to track publish sequence numbers, to allow correlation when
97+
/// <c>basic.return</c> is sent via the broker.
98+
/// </summary>
99+
public const string PublishSequenceNumberHeader = "x-dotnet-pub-seq-no";
94100
}
95101
}

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,11 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
202202
/// <param name="basicProperties">The message properties.</param>
203203
/// <param name="body">The message body.</param>
204204
/// <param name="cancellationToken">CancellationToken for this operation.</param>
205+
/// <returns>Returns <c>true</c> if publisher confirmations enabled and the message was ack-ed</returns>
205206
/// <remarks>
206207
/// Routing key must be shorter than 255 bytes.
207208
/// </remarks>
208-
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
209+
ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, string routingKey,
209210
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
210211
CancellationToken cancellationToken = default)
211212
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
@@ -219,10 +220,11 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
219220
/// <param name="basicProperties">The message properties.</param>
220221
/// <param name="body">The message body.</param>
221222
/// <param name="cancellationToken">CancellationToken for this operation.</param>
223+
/// <returns>Returns <c>true</c> if publisher confirmations enabled and the message was ack-ed</returns>
222224
/// <remarks>
223225
/// Routing key must be shorter than 255 bytes.
224226
/// </remarks>
225-
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
227+
ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
226228
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
227229
CancellationToken cancellationToken = default)
228230
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

projects/RabbitMQ.Client/IChannelExtensions.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public static Task<string> BasicConsumeAsync(this IChannel channel,
7878
/// <remarks>
7979
/// The publication occurs with mandatory=false.
8080
/// </remarks>
81-
public static ValueTask BasicPublishAsync<T>(this IChannel channel,
81+
public static ValueTask<bool> BasicPublishAsync<T>(this IChannel channel,
8282
PublicationAddress addr,
8383
T basicProperties,
8484
ReadOnlyMemory<byte> body,
@@ -94,7 +94,7 @@ public static ValueTask BasicPublishAsync<T>(this IChannel channel,
9494
/// <remarks>
9595
/// The publication occurs with mandatory=false and empty BasicProperties
9696
/// </remarks>
97-
public static ValueTask BasicPublishAsync(this IChannel channel,
97+
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
9898
string exchange,
9999
string routingKey,
100100
ReadOnlyMemory<byte> body,
@@ -109,7 +109,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
109109
/// <remarks>
110110
/// The publication occurs with mandatory=false and empty BasicProperties
111111
/// </remarks>
112-
public static ValueTask BasicPublishAsync(this IChannel channel,
112+
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
113113
CachedString exchange,
114114
CachedString routingKey,
115115
ReadOnlyMemory<byte> body,
@@ -124,7 +124,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
124124
/// <remarks>
125125
/// The publication occurs with empty BasicProperties
126126
/// </remarks>
127-
public static ValueTask BasicPublishAsync(this IChannel channel,
127+
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
128128
string exchange,
129129
string routingKey,
130130
bool mandatory,
@@ -140,7 +140,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel,
140140
/// <remarks>
141141
/// The publication occurs with empty BasicProperties
142142
/// </remarks>
143-
public static ValueTask BasicPublishAsync(this IChannel channel,
143+
public static ValueTask<bool> BasicPublishAsync(this IChannel channel,
144144
CachedString exchange,
145145
CachedString routingKey,
146146
bool mandatory,

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,15 +311,15 @@ await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false)
311311
public Task<BasicGetResult?> BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken)
312312
=> InnerChannel.BasicGetAsync(queue, autoAck, cancellationToken);
313313

314-
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
314+
public ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, string routingKey,
315315
bool mandatory,
316316
TProperties basicProperties,
317317
ReadOnlyMemory<byte> body,
318318
CancellationToken cancellationToken = default)
319319
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
320320
=> InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken);
321321

322-
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
322+
public ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
323323
bool mandatory,
324324
TProperties basicProperties,
325325
ReadOnlyMemory<byte> body,

0 commit comments

Comments
 (0)