|
25 | 25 | - [Publish Messages](#publish-messages)
|
26 | 26 | - [Deduplication](#Deduplication)
|
27 | 27 | - [Consume Messages](#consume-messages)
|
| 28 | + - [Offset Types](#offset-types) |
28 | 29 | - [Track Offset](#track-offset)
|
29 | 30 | - [Handle Close](#handle-close)
|
30 | 31 | - [Handle Metadata Update](#handle-metadata-update)
|
@@ -296,6 +297,45 @@ var consumer = await system.CreateConsumer(
|
296 | 297 | }
|
297 | 298 | });
|
298 | 299 | ```
|
| 300 | +### Offset Types |
| 301 | +There are five types of Offset and they can be set by the `ConsumerConfig.OffsetSpec` property that must be passed to the Consumer constructor, in the example we use `OffsetTypeFirst`: |
| 302 | +```csharp |
| 303 | +var consumerOffsetTypeFirst = await system.CreateConsumer( |
| 304 | + new ConsumerConfig |
| 305 | + { |
| 306 | + Reference = "my_consumer_offset_first", |
| 307 | + Stream = stream, |
| 308 | + OffsetSpec = new OffsetTypeFirst(), |
| 309 | + MessageHandler = async (consumer, ctx, message) => |
| 310 | + { |
| 311 | + |
| 312 | + await Task.CompletedTask; |
| 313 | + } |
| 314 | + }); |
| 315 | +``` |
| 316 | +The five types are: |
| 317 | +- First: it takes messages from the first message of the stream. |
| 318 | +```csharp |
| 319 | +var offsetTypeFirst = new OffsetTypeFirst(); |
| 320 | +``` |
| 321 | +- Last: it takes messages from the last chunk of the stream, i.e. it doesn’t start from the last message, but the last “group” of messages. |
| 322 | +```csharp |
| 323 | +var offsetTypeLast = new OffsetTypeLast(); |
| 324 | +``` |
| 325 | +- Next: it takes messages published after the consumer connection. |
| 326 | +```csharp |
| 327 | +var offsetTypeNext = new OffsetTypeNext() |
| 328 | +``` |
| 329 | +- Offset: it takes messages starting from the message with id equal to the passed value. If the value is less than the first message of the stream, it starts from the first (i.e. if you pass 0, but the stream starts from 10, it starts from 10). If the message with the id hasn’t yet been published it waits until this publishingId is reached. |
| 330 | +```csharp |
| 331 | +ulong iWantToStartFromPubId = 10; |
| 332 | +var offsetTypeOffset = new OffsetTypeOffset(iWantToStartFromPubId); |
| 333 | +``` |
| 334 | +- Timestamp: it takes messages starting from the first message with timestamp bigger than the one passed |
| 335 | +```csharp |
| 336 | +var anHourAgo = (long)DateTime.UtcNow.AddHours(-1).Subtract(new DateTime(1970, 1, 1)).TotalSeconds; |
| 337 | +var offsetTypeTimestamp = new OffsetTypeTimestamp(anHourAgo); |
| 338 | +``` |
299 | 339 | ### Track Offset
|
300 | 340 |
|
301 | 341 | The server can store the current delivered offset given a consumer with `StoreOffset` in this way:
|
@@ -329,6 +369,8 @@ var consumer = await system.CreateConsumer(
|
329 | 369 | OffsetSpec = new OffsetTypeOffset(trackedOffset),
|
330 | 370 | ```
|
331 | 371 |
|
| 372 | +OBS. if don't have stored an offset for the consumer's reference on the stream you get an OffsetNotFoundException exception. |
| 373 | + |
332 | 374 | ### Handle Close
|
333 | 375 | Producers/Consumers raise and event when the client is disconnected:
|
334 | 376 |
|
|
0 commit comments