Skip to content

Commit c10be96

Browse files
committed
GH-8592: Add ReactiveRedisStreamMessageHandler.setAddOptionsFunction
Fixes: #8592 The `XADD` command for the record can be customized with a `RedisStreamCommands.XAddOptions`. * Expose `ReactiveRedisStreamMessageHandler.setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction)` to allow to configure those option against the specific request message.
1 parent 2e830f9 commit c10be96

File tree

3 files changed

+30
-1
lines changed

3 files changed

+30
-1
lines changed

spring-integration-redis/src/main/java/org/springframework/integration/redis/outbound/ReactiveRedisStreamMessageHandler.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.integration.redis.outbound;
1818

19+
import java.util.function.Function;
20+
1921
import reactor.core.publisher.Mono;
2022

2123
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
24+
import org.springframework.data.redis.connection.RedisStreamCommands;
2225
import org.springframework.data.redis.connection.stream.Record;
2326
import org.springframework.data.redis.connection.stream.StreamRecords;
2427
import org.springframework.data.redis.core.ReactiveRedisTemplate;
@@ -60,6 +63,9 @@ public class ReactiveRedisStreamMessageHandler extends AbstractReactiveMessageHa
6063
@Nullable
6164
private HashMapper<String, ?, ?> hashMapper;
6265

66+
@Nullable
67+
private Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction;
68+
6369
/**
6470
* Create an instance based on provided {@link ReactiveRedisConnectionFactory} and key for stream.
6571
* @param connectionFactory the {@link ReactiveRedisConnectionFactory} to use
@@ -106,6 +112,16 @@ public void setExtractPayload(boolean extractPayload) {
106112
this.extractPayload = extractPayload;
107113
}
108114

115+
/**
116+
* Set a function to create a {@link RedisStreamCommands.XAddOptions} based on the request message.
117+
* Cannot be null and cannot return null.
118+
* @param addOptionsFunction the function to provide a {@link RedisStreamCommands.XAddOptions}.
119+
* @since 6.5
120+
*/
121+
public void setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction) {
122+
this.addOptionsFunction = addOptionsFunction;
123+
}
124+
109125
@Override
110126
public String getComponentType() {
111127
return "redis:stream-outbound-channel-adapter";
@@ -145,7 +161,12 @@ protected Mono<Void> handleMessageInternal(Message<?> message) {
145161
StreamRecords.objectBacked(value)
146162
.withStreamKey(streamKey);
147163

148-
return this.reactiveStreamOperations.add(record);
164+
if (this.addOptionsFunction == null) {
165+
return this.reactiveStreamOperations.add(record);
166+
}
167+
else {
168+
return this.reactiveStreamOperations.add(record, this.addOptionsFunction.apply(message));
169+
}
149170
})
150171
.then();
151172
}

src/reference/antora/modules/ROOT/pages/redis.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,8 @@ Another constructor variant is based on a SpEL expression to evaluate a stream k
761761
Or use the whole message as a value.
762762
It defaults to `true`.
763763

764+
Starting with version 6.5, the `ReactiveRedisStreamMessageHandler` provides a `setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction)` to build `RedisStreamCommands.XAddOptions` based on the request message for the internal `ReactiveStreamOperations.add(Record<K, ?> record, XAddOptions xAddOptions)` call.
765+
764766
[[redis-stream-inbound]]
765767
== Redis Stream Inbound Channel Adapter
766768

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,9 @@ The `BeanPropertySqlParameterSourceFactory` uses now internally the `MapSqlParam
8383
Also, `JdbcMessageHandler` exposes a `usePayloadAsParameterSource` flag to allow to deal with parameter source only against message payload.
8484
That's where the mentioned `MapSqlParameterSource` comes useful for request messages with map payloads.
8585
See xref:jdbc.adoc[JDBC Support] for more information.
86+
87+
[[x6.5-redis-changes]]
88+
== Redis Stream Support
89+
90+
The `ReactiveRedisStreamMessageHandler` now exposes a `Function<Message<?>, RedisStreamCommands.XAddOptions>` to provide additional `XADD` option via convenient `RedisStreamCommands.XAddOptions` API.
91+
See xref:redis.adoc#redis-stream-outbound[Redis Support] for more information.

0 commit comments

Comments
 (0)