Skip to content

Commit 9f0f908

Browse files
committed
Sync ProtonJ2 with upstream
1 parent dfbeab2 commit 9f0f908

File tree

4 files changed

+68
-16
lines changed

4 files changed

+68
-16
lines changed

src/main/qpid/org/apache/qpid/protonj2/client/TransportOptions.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class TransportOptions implements Cloneable {
4040
public static final boolean DEFAULT_TRACE_BYTES = false;
4141
public static final int DEFAULT_LOCAL_PORT = 0;
4242
public static final boolean DEFAULT_USE_WEBSOCKETS = false;
43+
public static final boolean DEFAULT_WEBSOCKET_COMPRESSION = false;
4344
public static final int DEFAULT_WEBSOCKET_MAX_FRAME_SIZE = 65535;
4445
private static final String[] DEFAULT_NATIVEIO_PREFERENCES_ARRAY = { "EPOLL", "KQUEUE" };
4546
public static final List<String> DEFAULT_NATIVEIO_PREFERENCES =
@@ -62,6 +63,7 @@ public class TransportOptions implements Cloneable {
6263
private boolean useWebSockets = DEFAULT_USE_WEBSOCKETS;
6364
private String webSocketPath;
6465
private int webSocketMaxFrameSize = DEFAULT_WEBSOCKET_MAX_FRAME_SIZE;
66+
private boolean webSocketCompression = DEFAULT_WEBSOCKET_COMPRESSION;
6567

6668
private final Map<String, String> webSocketHeaders = new HashMap<>();
6769

@@ -454,6 +456,31 @@ public TransportOptions webSocketMaxFrameSize(int maxFrameSize) {
454456
return this;
455457
}
456458

459+
/**
460+
* @return the configured value for the WebSocket compression support enabled flag.
461+
*/
462+
public boolean webSocketCompression() {
463+
return webSocketCompression;
464+
}
465+
466+
/**
467+
* Set to true to configure the transport layer as a WebSocket based connection that
468+
* support compression of the WebSocket packets. This option simply allows the client
469+
* to support compression if the server offers support but does not influence the server
470+
* side, if the server does not offer support for compression of WS packets then this
471+
* value has no affect on the WS packets and they remain uncompressed as if not enabled.
472+
* (default is disabled).
473+
*
474+
* @param enabled
475+
* should the transport support WebSocket compression if server offers it.
476+
*
477+
* @return this {@link TransportOptions} instance.
478+
*/
479+
public TransportOptions webSocketCompression(boolean enabled) {
480+
this.webSocketCompression = enabled;
481+
return this;
482+
}
483+
457484
/**
458485
* Copy all configuration into the given {@link TransportOptions} from this instance.
459486
*
@@ -481,6 +508,7 @@ public TransportOptions copyInto(TransportOptions other) {
481508
other.webSocketPath(webSocketPath());
482509
other.webSocketHeaders().putAll(webSocketHeaders);
483510
other.webSocketMaxFrameSize(webSocketMaxFrameSize());
511+
other.webSocketCompression(webSocketCompression());
484512

485513
return other;
486514
}

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientConnection.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ ClientConnection connect() throws ClientException {
559559
openFuture.failed(failureCause);
560560
closeFuture.complete(this);
561561
ioContext.shutdown();
562+
notifications.shutdown();
562563

563564
throw failureCause;
564565
}
@@ -754,7 +755,6 @@ private void handleEngineFailure(Engine engine) {
754755
} else {
755756
FAILURE_CAUSE_UPDATER.compareAndSet(this, null, failureCause);
756757
engine.shutdown();
757-
notifications.shutdownNow();
758758
}
759759
}
760760

@@ -778,23 +778,24 @@ private void handleEngineShutdown(Engine engine) {
778778

779779
ioContext.shutdownAsync();
780780

781-
if (failureCause != null)
782-
{
783-
openFuture.failed(failureCause);
784-
closeFuture.complete(this);
781+
try {
782+
if (failureCause != null) {
783+
openFuture.failed(failureCause);
784+
closeFuture.complete(this);
785785

786-
LOG.warn("Connection {} has failed due to: {}", getId(), failureCause != null ?
787-
failureCause.getClass().getSimpleName() + " -> " + failureCause.getMessage() : "No failure details provided.");
786+
LOG.warn("Connection {} has failed due to: {}", getId(), failureCause != null ?
787+
failureCause.getClass().getSimpleName() + " -> " + failureCause.getMessage() : "No failure details provided.");
788788

789-
submitDisconnectionEvent(options.disconnectedHandler(), transport.getHost(), transport.getPort(), failureCause);
790-
}
791-
else
792-
{
793-
openFuture.complete(this);
794-
closeFuture.complete(this);
795-
}
789+
submitDisconnectionEvent(options.disconnectedHandler(), transport.getHost(), transport.getPort(), failureCause);
790+
} else {
791+
openFuture.complete(this);
792+
closeFuture.complete(this);
793+
}
796794

797-
client.unregisterConnection(this);
795+
client.unregisterConnection(this);
796+
} finally {
797+
submitNotificationShutdownTask();
798+
}
798799
}
799800

800801
private void submitConnectionEvent(BiConsumer<Connection, ConnectionEvent> handler, String host, int port, ClientIOException cause) {
@@ -829,6 +830,22 @@ private void submitDisconnectionEvent(BiConsumer<Connection, DisconnectionEvent>
829830
}
830831
}
831832

833+
private void submitNotificationShutdownTask() {
834+
try {
835+
if (!notifications.isShutdown()) {
836+
notifications.submit(() -> {
837+
try {
838+
notifications.shutdown();
839+
} catch (Exception ex) {
840+
LOG.trace("Shutdown of notification event handler threw an error: ", ex);
841+
}
842+
});
843+
}
844+
} catch (Exception ex) {
845+
LOG.trace("Error thrown while attempting to submit shutdown of notification executor: ", ex);
846+
}
847+
}
848+
832849
private Engine configureEngineSaslSupport() {
833850
if (options.saslOptions().saslEnabled()) {
834851
SaslMechanismSelector mechSelector =

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/WebSocketTransport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
4848
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
4949
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
50+
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
5051
import io.netty.util.concurrent.Future;
5152
import io.netty.util.concurrent.GenericFutureListener;
5253
import io.netty.util.concurrent.ScheduledFuture;
@@ -151,6 +152,9 @@ protected ChannelInboundHandlerAdapter createChannelHandler() {
151152
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
152153
pipeline.addLast(new HttpClientCodec());
153154
pipeline.addLast(new HttpObjectAggregator(8192));
155+
if (options.webSocketCompression()) {
156+
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
157+
}
154158
}
155159

156160
@Override

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.netty5.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
5050
import io.netty5.handler.codec.http.websocketx.WebSocketFrame;
5151
import io.netty5.handler.codec.http.websocketx.WebSocketVersion;
52+
import io.netty5.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
5253
import io.netty5.util.concurrent.Future;
5354
import io.netty5.util.concurrent.FutureListener;
5455

@@ -98,6 +99,9 @@ protected ChannelHandler createChannelHandler() {
9899
protected void addAdditionalHandlers(ChannelPipeline pipeline) {
99100
pipeline.addLast(new HttpClientCodec());
100101
pipeline.addLast(new HttpObjectAggregator<DefaultHttpContent>(8192));
102+
if (options.webSocketCompression()) {
103+
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
104+
}
101105
}
102106

103107
@Override
@@ -169,7 +173,6 @@ public void operationComplete(Future<? extends Void> future) throws Exception {
169173
super.channelActive(context);
170174
}
171175

172-
@SuppressWarnings("resource")
173176
@Override
174177
protected void messageReceived(ChannelHandlerContext ctx, Object message) throws Exception {
175178
LOG.trace("New data read: incoming: {}", message);

0 commit comments

Comments
 (0)