Skip to content

Commit 40b0031

Browse files
authored
GH-3340: IntegrationEvents - add getSourceAsType()
Resolves #3340 - add common super-interface for MQTT components - add `getConnectionInfo()` so users can examine server URIs etc * Reinstate per-adapter URIs - support multiple * Restore single URL per adapter. * Code cleanup for previous commit.
1 parent 5be0ed1 commit 40b0031

File tree

12 files changed

+243
-21
lines changed

12 files changed

+243
-21
lines changed

spring-integration-core/src/main/java/org/springframework/integration/events/IntegrationEvent.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2019 the original author or authors.
2+
* Copyright 2013-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,18 @@ public Throwable getCause() {
4848
return this.cause;
4949
}
5050

51+
/**
52+
* Get the source as a specific type; the receiving variable must be declared with the
53+
* correct type.
54+
* @param <T> the type.
55+
* @return the source.
56+
* @since 5.4
57+
*/
58+
@SuppressWarnings("unchecked")
59+
public <T> T getSourceAsType() {
60+
return (T) getSource();
61+
}
62+
5163
@Override
5264
public String toString() {
5365
return this.getClass().getSimpleName() + " [source=" + this.getSource() +
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.core;
18+
19+
import org.springframework.beans.factory.BeanNameAware;
20+
21+
/**
22+
* A component that interfaces with MQTT.
23+
*
24+
* @param <T> The connection information type.
25+
*
26+
* @author Gary Russell
27+
* @since 2.5
28+
*
29+
*/
30+
public interface MqttComponent<T> extends BeanNameAware {
31+
32+
/**
33+
* Return this component's bean name.
34+
* @return the bean name.
35+
*/
36+
String getBeanName();
37+
38+
/**
39+
* Return information about the connection.
40+
* @return the information.
41+
*/
42+
T getConnectionInfo();
43+
44+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.core;
18+
19+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
20+
21+
/**
22+
* An extension of {@link MqttComponent} for Eclipse Paho components.
23+
*
24+
* @author Gary Russell
25+
* @since 5.4
26+
*
27+
*/
28+
public interface MqttPahoComponent extends MqttComponent<MqttConnectOptions> {
29+
30+
@Override
31+
MqttConnectOptions getConnectionInfo();
32+
33+
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttIntegrationEvent.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,9 @@
1919
import org.springframework.integration.events.IntegrationEvent;
2020

2121
/**
22-
* Base class for Mqtt Events.
22+
* Base class for Mqtt Events. For {@link #getSourceAsType()}, you should use a sub type
23+
* of {@link org.springframework.integration.mqtt.core.MqttComponent} for the receiving
24+
* variable.
2325
* @author Gary Russell
2426
*
2527
* @since 4.1

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/AbstractMqttMessageDrivenChannelAdapter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import org.springframework.jmx.export.annotation.ManagedAttribute;
2929
import org.springframework.jmx.export.annotation.ManagedOperation;
3030
import org.springframework.jmx.export.annotation.ManagedResource;
31+
import org.springframework.lang.Nullable;
3132
import org.springframework.messaging.MessagingException;
3233
import org.springframework.util.Assert;
3334

@@ -54,7 +55,7 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessagePro
5455

5556
protected final Lock topicLock = new ReentrantLock(); // NOSONAR
5657

57-
public AbstractMqttMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
58+
public AbstractMqttMessageDrivenChannelAdapter(@Nullable String url, String clientId, String... topic) {
5859
Assert.hasText(clientId, "'clientId' cannot be null or empty");
5960
Assert.notNull(topic, "'topics' cannot be null");
6061
Assert.noNullElements(topic, "'topics' cannot have null elements");
@@ -110,6 +111,7 @@ public int[] getQos() {
110111
}
111112
}
112113

114+
@Nullable
113115
protected String getUrl() {
114116
return this.url;
115117
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,22 @@
3535
import org.springframework.integration.mqtt.core.ConsumerStopAction;
3636
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
3737
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
38+
import org.springframework.integration.mqtt.core.MqttPahoComponent;
3839
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
40+
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
3941
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
42+
import org.springframework.integration.mqtt.support.MqttUtils;
4043
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4144
import org.springframework.messaging.Message;
4245
import org.springframework.messaging.MessagingException;
4346
import org.springframework.util.Assert;
4447

4548
/**
46-
* Eclipse Paho Implementation.
49+
* Eclipse Paho Implementation. When consuming {@link MqttIntegrationEvent}s published by
50+
* this component use {@code MqttPahoComponent adapter = event.getSourceAsType()} to get a
51+
* reference, allowing you to obtain the bean name and {@link MqttConnectOptions}. This
52+
* technique allows consumption of events from both inbound and outbound endpoints in the
53+
* same event listener.
4754
*
4855
* @author Gary Russell
4956
* @author Artem Bilan
@@ -52,7 +59,7 @@
5259
*
5360
*/
5461
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter
55-
implements MqttCallback, ApplicationEventPublisherAware {
62+
implements MqttCallback, MqttPahoComponent, ApplicationEventPublisherAware {
5663

5764
/**
5865
* The default completion timeout in milliseconds.
@@ -89,23 +96,25 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv
8996
private ApplicationEventPublisher applicationEventPublisher;
9097

9198
/**
92-
* Use this constructor for a single url (although it may be overridden
93-
* if the server URI(s) are provided by the {@link MqttConnectOptions#getServerURIs()}
94-
* provided by the {@link MqttPahoClientFactory}).
99+
* Use this constructor for a single url (although it may be overridden if the server
100+
* URI(s) are provided by the {@link MqttConnectOptions#getServerURIs()} provided by
101+
* the {@link MqttPahoClientFactory}).
95102
* @param url the URL.
96103
* @param clientId The client id.
97104
* @param clientFactory The client factory.
98105
* @param topic The topic(s).
99106
*/
100107
public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory,
101108
String... topic) {
109+
102110
super(url, clientId, topic);
103111
this.clientFactory = clientFactory;
104112
}
105113

106114
/**
107-
* Use this constructor if the server URI(s) are provided by the {@link MqttConnectOptions#getServerURIs()}
108-
* provided by the {@link MqttPahoClientFactory}.
115+
* Use this constructor if the server URI(s) are provided by the
116+
* {@link MqttConnectOptions#getServerURIs()} provided by the
117+
* {@link MqttPahoClientFactory}.
109118
* @param clientId The client id.
110119
* @param clientFactory The client factory.
111120
* @param topic The topic(s).
@@ -117,8 +126,9 @@ public MqttPahoMessageDrivenChannelAdapter(String clientId, MqttPahoClientFactor
117126
this.clientFactory = clientFactory;
118127
}
119128

129+
120130
/**
121-
* Use this URL when you don't need additional {@link MqttConnectOptions}.
131+
* Use this constructor when you don't need additional {@link MqttConnectOptions}.
122132
* @param url The URL.
123133
* @param clientId The client id.
124134
* @param topic The topic(s).
@@ -174,6 +184,19 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
174184
this.applicationEventPublisher = applicationEventPublisher; // NOSONAR (inconsistent synchronization)
175185
}
176186

187+
@Override
188+
public MqttConnectOptions getConnectionInfo() {
189+
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
190+
if (options.getServerURIs() == null) {
191+
String url = getUrl();
192+
if (url != null) {
193+
options = MqttUtils.cloneConnectOptions(options);
194+
options.setServerURIs(new String[] { url });
195+
}
196+
}
197+
return options;
198+
}
199+
177200
@Override
178201
protected void doStart() {
179202
Assert.state(getTaskScheduler() != null, "A 'taskScheduler' is required");

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
2828
import org.springframework.integration.mqtt.support.MqttHeaders;
2929
import org.springframework.integration.mqtt.support.MqttMessageConverter;
30+
import org.springframework.lang.Nullable;
3031
import org.springframework.messaging.Message;
3132
import org.springframework.messaging.converter.MessageConverter;
3233
import org.springframework.util.Assert;
@@ -67,7 +68,7 @@ public abstract class AbstractMqttMessageHandler extends AbstractMessageHandler
6768

6869
private int clientInstance;
6970

70-
public AbstractMqttMessageHandler(String url, String clientId) {
71+
public AbstractMqttMessageHandler(@Nullable String url, String clientId) {
7172
Assert.hasText(clientId, "'clientId' cannot be null or empty");
7273
this.url = url;
7374
this.clientId = clientId;
@@ -185,6 +186,7 @@ protected MessageConverter getConverter() {
185186
return this.converter;
186187
}
187188

189+
@Nullable
188190
protected String getUrl() {
189191
return this.url;
190192
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,24 @@
2727
import org.springframework.context.ApplicationEventPublisherAware;
2828
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
2929
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
30+
import org.springframework.integration.mqtt.core.MqttPahoComponent;
3031
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
32+
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
3133
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
3234
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
3335
import org.springframework.integration.mqtt.support.MqttMessageConverter;
36+
import org.springframework.integration.mqtt.support.MqttUtils;
3437
import org.springframework.messaging.Message;
3538
import org.springframework.messaging.MessageHandlingException;
3639
import org.springframework.messaging.MessagingException;
3740
import org.springframework.util.Assert;
3841

3942
/**
40-
* Eclipse Paho implementation.
43+
* Eclipse Paho Implementation. When consuming {@link MqttIntegrationEvent}s published by
44+
* this component use {@code MqttPahoComponent handler = event.getSourceAsType()} to get a
45+
* reference, allowing you to obtain the bean name and {@link MqttConnectOptions}. This
46+
* technique allows consumption of events from both inbound and outbound endpoints in the
47+
* same event listener.
4148
*
4249
* @author Gary Russell
4350
* @author Artem Bilan
@@ -46,7 +53,7 @@
4653
*
4754
*/
4855
public class MqttPahoMessageHandler extends AbstractMqttMessageHandler
49-
implements MqttCallback, ApplicationEventPublisherAware {
56+
implements MqttCallback, MqttPahoComponent, ApplicationEventPublisherAware {
5057

5158
/**
5259
* The default completion timeout in milliseconds.
@@ -73,9 +80,9 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler
7380
private volatile IMqttAsyncClient client;
7481

7582
/**
76-
* Use this constructor for a single url (although it may be overridden
77-
* if the server URI(s) are provided by the {@link MqttConnectOptions#getServerURIs()}
78-
* provided by the {@link MqttPahoClientFactory}).
83+
* Use this constructor for a single url (although it may be overridden if the server
84+
* URI(s) are provided by the {@link MqttConnectOptions#getServerURIs()} provided by
85+
* the {@link MqttPahoClientFactory}).
7986
* @param url the URL.
8087
* @param clientId The client id.
8188
* @param clientFactory The client factory.
@@ -98,7 +105,7 @@ public MqttPahoMessageHandler(String clientId, MqttPahoClientFactory clientFacto
98105
}
99106

100107
/**
101-
* Use this URL when you don't need additional {@link MqttConnectOptions}.
108+
* Use this constructor when you don't need additional {@link MqttConnectOptions}.
102109
* @param url The URL.
103110
* @param clientId The client id.
104111
*/
@@ -154,6 +161,19 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
154161
this.applicationEventPublisher = applicationEventPublisher;
155162
}
156163

164+
@Override
165+
public MqttConnectOptions getConnectionInfo() {
166+
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
167+
if (options.getServerURIs() == null) {
168+
String url = getUrl();
169+
if (url != null) {
170+
options = MqttUtils.cloneConnectOptions(options);
171+
options.setServerURIs(new String[] { url });
172+
}
173+
}
174+
return options;
175+
}
176+
157177
@Override
158178
protected void onInit() {
159179
super.onInit();

0 commit comments

Comments
 (0)