Skip to content

Rename BuildAsync to BuildAndStartAsync #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface IConsumerBuilder

IStreamOptions Stream();

Task<IConsumer> BuildAsync(CancellationToken cancellationToken = default);
Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationToken = default);

public interface IStreamOptions
{
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public IConsumerBuilder.IStreamOptions Stream()
return new DefaultStreamOptions(this, _filters);
}

public async Task<IConsumer> BuildAsync(CancellationToken cancellationToken = default)
public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationToken = default)
{
if (_handler is null)
{
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ RabbitMQ.AMQP.Client.IConsumer.Pause() -> void
RabbitMQ.AMQP.Client.IConsumer.Unpause() -> void
RabbitMQ.AMQP.Client.IConsumer.UnsettledMessageCount.get -> long
RabbitMQ.AMQP.Client.IConsumerBuilder
RabbitMQ.AMQP.Client.IConsumerBuilder.BuildAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConsumer!>!
RabbitMQ.AMQP.Client.IConsumerBuilder.BuildAndStartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConsumer!>!
RabbitMQ.AMQP.Client.IConsumerBuilder.InitialCredits(int initialCredits) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
Expand Down Expand Up @@ -230,7 +230,7 @@ RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Unpause() -> void
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.UnsettledMessageCount.get -> long
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.AmqpConsumerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.BuildAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConsumer!>!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.BuildAndStartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IConsumer!>!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.InitialCredits(int initialCredits) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.MessageHandler! handler) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
Expand Down
6 changes: 3 additions & 3 deletions Tests/AmqpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async Task MessageHandler(IContext ctx, IMessage msg)
}

IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder();
IConsumer consumer = await consumerBuilder.Queue(_queueName).MessageHandler(MessageHandler).BuildAsync();
IConsumer consumer = await consumerBuilder.Queue(_queueName).MessageHandler(MessageHandler).BuildAndStartAsync();

await WhenTaskCompletes(allMessagesReceivedTcs.Task);
Assert.Equal(messageCount, messageIds.Count);
Expand Down Expand Up @@ -218,7 +218,7 @@ async Task MessageHandler(IContext ctx, IMessage msg)
}

IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder();
IConsumer consumer = await consumerBuilder.Queue(_queueName).MessageHandler(MessageHandler).BuildAsync();
IConsumer consumer = await consumerBuilder.Queue(_queueName).MessageHandler(MessageHandler).BuildAndStartAsync();

await WhenTaskCompletes(allMessagesReceivedTcs.Task);

Expand Down Expand Up @@ -270,7 +270,7 @@ async Task MessageHandler(IContext ctx, IMessage msg)
}

IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder();
IConsumer consumer = await consumerBuilder.Queue(queueSpecification).MessageHandler(MessageHandler).BuildAsync();
IConsumer consumer = await consumerBuilder.Queue(queueSpecification).MessageHandler(MessageHandler).BuildAndStartAsync();

IPublisherBuilder publisherBuilder = _connection.PublisherBuilder();
IPublisher publisher = await publisherBuilder.Queue(queueSpecification).BuildAsync();
Expand Down
24 changes: 12 additions & 12 deletions Tests/Consumer/BasicConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public async Task SimpleConsumeMessage()
await context.AcceptAsync();
tcs.SetResult(message);
}
).BuildAsync();
).BuildAndStartAsync();

await WhenTcsCompletes(tcs);
IMessage receivedMessage = await tcs.Task;
Expand Down Expand Up @@ -81,7 +81,7 @@ public async Task ConsumerReQueueMessage()
break;
}
}
).BuildAsync();
).BuildAndStartAsync();

await WhenTcsCompletes(tcs);

Expand Down Expand Up @@ -136,7 +136,7 @@ async Task MessageHandler(IContext cxt, IMessage msg)
consumer = await _connection.ConsumerBuilder()
.Queue(queueSpec)
.InitialCredits(initialCredits)
.MessageHandler(MessageHandler).BuildAsync();
.MessageHandler(MessageHandler).BuildAndStartAsync();

await WhenTcsCompletes(tcs);

Expand Down Expand Up @@ -195,7 +195,7 @@ public async Task ConsumerForStreamQueueWithOffset(StreamOffsetSpecification off
.Stream()
.Offset(offset)
.Builder()
.BuildAsync();
.BuildAndStartAsync();

// wait for the consumer to consume all messages
// we can't use the TaskCompletionSource here because we don't know how many messages will be consumed
Expand Down Expand Up @@ -267,7 +267,7 @@ public async Task ConsumerWithStreamFilterShouldReceiveOnlyPartOfTheMessages(str
.FilterValues(filters)
.FilterMatchUnfiltered(false)
.Offset(StreamOffsetSpecification.First).Builder()
.BuildAsync();
.BuildAndStartAsync();

int receivedWithoutFilters = 0;
IConsumer consumerWithoutFilters = await _connection.ConsumerBuilder()
Expand All @@ -280,7 +280,7 @@ public async Task ConsumerWithStreamFilterShouldReceiveOnlyPartOfTheMessages(str
})
.Stream()
.Offset(StreamOffsetSpecification.First).Builder()
.BuildAsync();
.BuildAndStartAsync();

// wait for the consumer to consume all messages
await Task.Delay(500); // TODO yuck
Expand Down Expand Up @@ -326,7 +326,7 @@ public async Task ConsumerForStreamQueueWithOffsetValue(int offsetStart, int num
.Stream()
.Offset(offsetStart)
.Builder()
.BuildAsync();
.BuildAndStartAsync();

// wait for the consumer to consume all messages
// we can't use the TaskCompletionSource here because we don't know how many messages will be consumed
Expand Down Expand Up @@ -357,7 +357,7 @@ public async Task ConsumerShouldThrowWhenQueueDoesNotExist()
// TODO these are timeout exceptions under the hood, compare
// with the Java client
ConsumerException ex = await Assert.ThrowsAsync<ConsumerException>(
() => consumerBuilder.BuildAsync());
() => consumerBuilder.BuildAndStartAsync());
Assert.Contains(doesNotExist, ex.Message);
}

Expand All @@ -381,7 +381,7 @@ async Task MessageHandler(IContext cxt, IMessage msg)
IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder()
.Queue(_queueName)
.MessageHandler(MessageHandler);
IConsumer consumer = await consumerBuilder.BuildAsync();
IConsumer consumer = await consumerBuilder.BuildAndStartAsync();

await PublishAsync(queueSpecification, 1);

Expand Down Expand Up @@ -426,7 +426,7 @@ public async Task ConsumerUnsettledMessagesGoBackToQueueAfterClosing()
{
receivedGreaterThanSettledTcs.TrySetResult(true);
}
}).BuildAsync();
}).BuildAndStartAsync();

await WhenTcsCompletes(receivedGreaterThanSettledTcs);

Expand Down Expand Up @@ -472,7 +472,7 @@ public async Task ConsumerWithHigherPriorityShouldGetMessagesFirst()
allMessagesReceivedTcs.SetException(ex);
}
});
IConsumer lowPriorityConsumer = await lowPriorityConsumerBuilder.BuildAsync();
IConsumer lowPriorityConsumer = await lowPriorityConsumerBuilder.BuildAndStartAsync();

IConsumerBuilder highPriorityConsumerBuilder = _connection.ConsumerBuilder()
.Queue(queueSpecification)
Expand All @@ -493,7 +493,7 @@ public async Task ConsumerWithHigherPriorityShouldGetMessagesFirst()
allMessagesReceivedTcs.SetException(ex);
}
});
IConsumer highPriorityConsumer = await highPriorityConsumerBuilder.BuildAsync();
IConsumer highPriorityConsumer = await highPriorityConsumerBuilder.BuildAndStartAsync();

await PublishAsync(queueSpecification, messageCount);

Expand Down
6 changes: 3 additions & 3 deletions Tests/Consumer/ConsumerPauseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public async Task PauseShouldStopMessageArrivalUnpauseShouldResumeIt()
{
messageContexts.Add(ctx);
return Task.CompletedTask;
}).BuildAsync();
}).BuildAndStartAsync();

Task<bool> WaitForMessageContextCountAtLeast(int expectedCount)
{
Expand Down Expand Up @@ -146,7 +146,7 @@ public async Task ConsumerPauseThenClose()
{
unsettledMessages.Add(ctx);
}
}).BuildAsync();
}).BuildAndStartAsync();

DateTime start = DateTime.Now;
for (int i = 0; i < 100; i++)
Expand Down Expand Up @@ -220,7 +220,7 @@ public async Task ConsumerGracefulShutdownExample()
}
await Task.Delay(TimeSpan.FromMilliseconds(r.Next(1, 10)));
await ctx.AcceptAsync();
}).BuildAsync();
}).BuildAndStartAsync();

await WhenTcsCompletes(receivedTwiceInitialCreditsTcs);

Expand Down
2 changes: 1 addition & 1 deletion Tests/Recovery/CustomPublisherConsumerRecoveryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public async Task PublisherAndConsumerShouldNotRestartIfRecoveryIsDisabled()
{
// ignored
}
}).BuildAsync();
}).BuildAndStartAsync();

List<(State, State)> statesConsumer = [];
consumer.ChangeState += (sender, fromState, toState, e) =>
Expand Down
8 changes: 4 additions & 4 deletions Tests/Recovery/PublisherConsumerRecoveryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public async Task ConsumerShouldChangeStatusWhenClosed()
.MessageHandler((context, message) =>
{
return Task.CompletedTask;
}).BuildAsync();
}).BuildAndStartAsync();

List<(State, State)> states = [];
consumer.ChangeState += (_, fromState, toState, _) =>
Expand Down Expand Up @@ -162,7 +162,7 @@ public async Task ConsumerShouldChangeStatusWhenConnectionIsKilled()
.MessageHandler((context, message) =>
{
return Task.CompletedTask;
}).BuildAsync();
}).BuildAndStartAsync();

List<(State, State)> states = [];
consumer.ChangeState += (_, fromState, toState, _) => { states.Add((fromState, toState)); };
Expand Down Expand Up @@ -219,7 +219,7 @@ public async Task PublishShouldRestartPublishConsumerShouldRestartConsumeWhenCon
{
// ignored
}
}).BuildAsync();
}).BuildAndStartAsync();

const int publishBatchCount = 10;

Expand Down Expand Up @@ -294,7 +294,7 @@ public async Task PublisherAndConsumerShouldNotRestartIfRecoveryIsDisabled()
{
// ignored
}
}).BuildAsync();
}).BuildAndStartAsync();

List<(State, State)> statesConsumer = [];
consumer.ChangeState += (_, fromState, toState, _) =>
Expand Down
2 changes: 1 addition & 1 deletion docs/Examples/GettingStarted/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
Trace.WriteLine(TraceLevel.Information, $"[Consumer] Message: {message.Body()} received");
await context.AcceptAsync();
}
).BuildAsync();
).BuildAndStartAsync();
// ------------------------------------------------------------------------------------

const int total = 10;
Expand Down
2 changes: 1 addition & 1 deletion docs/Examples/HAClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
Interlocked.Increment(ref messagesReceived);
return context.AcceptAsync();
}
).BuildAsync();
).BuildAndStartAsync();

consumer.ChangeState += (sender, fromState, toState, e) =>
{
Expand Down
2 changes: 1 addition & 1 deletion docs/Examples/PerformanceTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async Task MessageHandler(IContext context, IMessage message)
.Queue(queueName)
.InitialCredits(1000)
.MessageHandler(MessageHandler)
.BuildAsync();
.BuildAndStartAsync();
}


Expand Down