Skip to content

Commit eb3c6a0

Browse files
rohanmukeshartembilan
authored andcommitted
INT-4566: Introduce R2DBC Inbound Channel Adapter
JIRA: https://jira.spring.io/browse/INT-4566 * Some code clean up
1 parent 7a09358 commit eb3c6a0

File tree

6 files changed

+512
-121
lines changed

6 files changed

+512
-121
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.r2dbc.inbound;
18+
19+
20+
import java.util.Map;
21+
22+
import org.reactivestreams.Publisher;
23+
24+
import org.springframework.data.r2dbc.core.DatabaseClient;
25+
import org.springframework.data.r2dbc.core.FetchSpec;
26+
import org.springframework.data.r2dbc.core.R2dbcEntityOperations;
27+
import org.springframework.data.relational.core.query.Query;
28+
import org.springframework.expression.Expression;
29+
import org.springframework.expression.TypeLocator;
30+
import org.springframework.expression.common.LiteralExpression;
31+
import org.springframework.expression.spel.support.StandardEvaluationContext;
32+
import org.springframework.expression.spel.support.StandardTypeLocator;
33+
import org.springframework.integration.endpoint.AbstractMessageSource;
34+
import org.springframework.integration.expression.ExpressionUtils;
35+
import org.springframework.util.Assert;
36+
37+
import reactor.core.publisher.Mono;
38+
39+
/**
40+
* An instance of {@link org.springframework.integration.core.MessageSource} which returns
41+
* a {@link org.springframework.messaging.Message} with a payload which is the result of
42+
* execution of a {@link Query}. When {@code expectSingleResult} is false (default), the R2dbc
43+
* {@link Query} is executed using {@link R2dbcEntityOperations#select(Query, Class)} method which
44+
* returns a {@link reactor.core.publisher.Flux}.
45+
* The returned {@link reactor.core.publisher.Flux} will be used as the payload of the
46+
* {@link org.springframework.messaging.Message} returned by the {@link #receive()}
47+
* method.
48+
* <p>
49+
* When {@code expectSingleResult} is true, the {@link R2dbcEntityOperations#selectOne(Query, Class)} is
50+
* used instead, and the message payload will be a {@link reactor.core.publisher.Mono}
51+
* for the single object returned from the query.
52+
*
53+
* @author Rohan Mukesh
54+
* @author Artem Bilan
55+
*
56+
* @since 5.4
57+
*/
58+
public class R2dbcMessageSource extends AbstractMessageSource<Publisher<?>> {
59+
60+
private final DatabaseClient databaseClient;
61+
62+
private final Expression queryExpression;
63+
64+
private Class<?> payloadType = Map.class;
65+
66+
private boolean expectSingleResult = false;
67+
68+
private StandardEvaluationContext evaluationContext;
69+
70+
private volatile boolean initialized = false;
71+
72+
/**
73+
* Create an instance with the provided {@link DatabaseClient} and SpEL expression
74+
* which should resolve to a Relational 'query' string.
75+
* It assumes that the {@link DatabaseClient} is fully initialized and ready to be used.
76+
* The 'query' will be evaluated on every call to the {@link #receive()} method.
77+
* @param databaseClient The reactive database client for performing database calls.
78+
* @param query The query String.
79+
*/
80+
public R2dbcMessageSource(DatabaseClient databaseClient, String query) {
81+
this(databaseClient, new LiteralExpression(query));
82+
}
83+
84+
/**
85+
* Create an instance with the provided {@link DatabaseClient} and SpEL expression
86+
* which should resolve to a Relational 'query' string.
87+
* It assumes that the {@link DatabaseClient} is fully initialized and ready to be used.
88+
* The 'queryExpression' will be evaluated on every call to the {@link #receive()} method.
89+
* @param databaseClient The reactive for performing database calls.
90+
* @param queryExpression The query expression.
91+
*/
92+
public R2dbcMessageSource(DatabaseClient databaseClient, Expression queryExpression) {
93+
Assert.notNull(databaseClient, "'databaseClient' must not be null");
94+
Assert.notNull(queryExpression, "'queryExpression' must not be null");
95+
this.databaseClient = databaseClient;
96+
this.queryExpression = queryExpression;
97+
}
98+
99+
/**
100+
* Provide a way to set the type of the entityClass that will be passed to the
101+
* {@link org.springframework.data.r2dbc.core.DatabaseClient#execute(String)}
102+
* method.
103+
* @param payloadType The t class.
104+
*/
105+
public void setPayloadType(Class<?> payloadType) {
106+
Assert.notNull(payloadType, "'payloadType' must not be null");
107+
this.payloadType = payloadType;
108+
}
109+
110+
/**
111+
* Provide a way to manage which find* method to invoke on {@link R2dbcEntityOperations}.
112+
* Default is 'false', which means the {@link #receive()} method will use
113+
* the {@link org.springframework.data.r2dbc.core.DatabaseClient#execute(String)} method and will fetch all. If set
114+
* to 'true'{@link #receive()} will use {@link org.springframework.data.r2dbc.core.DatabaseClient#execute(String)}
115+
* and will fetch one and the payload of the returned {@link org.springframework.messaging.Message}
116+
* will be the returned target Object of type
117+
* identified by {@link #payloadType} instead of a List.
118+
* @param expectSingleResult true if a single result is expected.
119+
*/
120+
public void setExpectSingleResult(boolean expectSingleResult) {
121+
this.expectSingleResult = expectSingleResult;
122+
}
123+
124+
@Override
125+
public String getComponentType() {
126+
return "r2dbc:inbound-channel-adapter";
127+
}
128+
129+
@Override
130+
protected void onInit() {
131+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
132+
TypeLocator typeLocator = this.evaluationContext.getTypeLocator();
133+
if (typeLocator instanceof StandardTypeLocator) {
134+
/*
135+
* Register the R2dbc Query DSL package so they don't need a FQCN for QueryBuilder, for example.
136+
*/
137+
((StandardTypeLocator) typeLocator).registerImport("org.springframework.data.relational.core.query");
138+
}
139+
this.initialized = true;
140+
}
141+
142+
/**
143+
* Execute a {@link Query} returning its results as the Message payload.
144+
* The payload can be either {@link reactor.core.publisher.Flux} or
145+
* {@link reactor.core.publisher.Mono} of objects of type identified by {@link #payloadType},
146+
* or a single element of type identified by {@link #payloadType}
147+
* based on the value of {@link #expectSingleResult} attribute which defaults to 'false' resulting
148+
* {@link org.springframework.messaging.Message} with payload of type
149+
* {@link reactor.core.publisher.Flux}. The collection name used in the
150+
*/
151+
@Override
152+
protected Object doReceive() {
153+
Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method");
154+
Mono<FetchSpec<?>> queryMono =
155+
Mono.fromSupplier(() -> this.queryExpression.getValue(this.evaluationContext))
156+
.map(this::prepareFetch);
157+
if (this.expectSingleResult) {
158+
return queryMono.flatMap(FetchSpec::one);
159+
}
160+
return queryMono.flatMapMany(FetchSpec::all);
161+
}
162+
163+
private FetchSpec<?> prepareFetch(Object queryObject) {
164+
String queryString = evaluateQueryObject(queryObject);
165+
return this.databaseClient
166+
.execute(queryString)
167+
.as(this.payloadType)
168+
.fetch();
169+
}
170+
171+
private String evaluateQueryObject(Object queryObject) {
172+
if (queryObject instanceof String) {
173+
return (String) queryObject;
174+
}
175+
throw new IllegalStateException("'queryExpression' must evaluate to String " +
176+
"or org.springframework.data.relational.core.query.Query, but not: " + queryObject);
177+
}
178+
179+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.r2dbc.config;
18+
19+
20+
import org.springframework.context.annotation.Bean;
21+
import org.springframework.context.annotation.Configuration;
22+
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
23+
import org.springframework.data.r2dbc.core.DatabaseClient;
24+
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
25+
26+
import io.r2dbc.h2.H2ConnectionConfiguration;
27+
import io.r2dbc.h2.H2ConnectionFactory;
28+
import io.r2dbc.spi.ConnectionFactory;
29+
30+
/**
31+
* @author Rohan Mukesh
32+
*
33+
* @since 5.4
34+
*/
35+
@Configuration
36+
@EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
37+
public class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {
38+
39+
@Bean
40+
@Override
41+
public ConnectionFactory connectionFactory() {
42+
return createConnectionFactory();
43+
}
44+
45+
public static ConnectionFactory createConnectionFactory() {
46+
47+
return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
48+
.inMemory("r2dbc")
49+
.username("sa")
50+
.password("")
51+
.option("DB_CLOSE_DELAY=-1").build());
52+
}
53+
54+
@Bean
55+
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
56+
return DatabaseClient.create(connectionFactory);
57+
}
58+
59+
}

spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/outbound/Person.java renamed to spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/entity/Person.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.integration.r2dbc.outbound;
17+
package org.springframework.integration.r2dbc.entity;
1818

1919
import org.springframework.data.annotation.Id;
2020
import org.springframework.data.relational.core.mapping.Table;
@@ -25,14 +25,14 @@
2525
* @since 5.4
2626
*/
2727
@Table
28-
class Person {
28+
public class Person {
2929

3030
@Id
31-
Integer id;
31+
private Integer id;
3232

33-
String name;
33+
private String name;
3434

35-
Integer age;
35+
private Integer age;
3636

3737
public void setId(Integer id) {
3838
this.id = id;
@@ -46,7 +46,7 @@ public void setAge(Integer age) {
4646
this.age = age;
4747
}
4848

49-
Person(String name, Integer age) {
49+
public Person(String name, Integer age) {
5050
this.name = name;
5151
this.age = age;
5252
}

0 commit comments

Comments
 (0)