Skip to content

Commit 1763d26

Browse files
authored
Merge pull request #3800 from kubernetes-client/v5
Add support for v5 of the streaming protocol
2 parents c955c2d + 4f606d2 commit 1763d26

File tree

3 files changed

+89
-4
lines changed

3 files changed

+89
-4
lines changed

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737
*/
3838
public class WebSocketStreamHandler implements WebSockets.SocketListener, Closeable {
3939
private static final Logger log = LoggerFactory.getLogger(WebSocketStreamHandler.class);
40+
private static final int CLOSE = 255;
4041

41-
private final Map<Integer, PipedInputStream> input = new HashMap<>();
42+
private final Map<Integer, InputStream> input = new HashMap<>();
4243
private final Map<Integer, PipedOutputStream> pipedOutput = new HashMap<>();
4344
private final Map<Integer, OutputStream> output = new HashMap<>();
4445
private WebSocket socket;
@@ -93,6 +94,12 @@ public void textMessage(Reader in) {
9394

9495
protected void handleMessage(int stream, InputStream inStream) throws IOException {
9596
try {
97+
if (stream == CLOSE) {
98+
stream = inStream.read();
99+
InputStream in = getInputStream(stream);
100+
in.close();
101+
return;
102+
}
96103
OutputStream out = getSocketInputOutputStream(stream);
97104
Streams.copy(inStream, out);
98105
out.flush();
@@ -211,6 +218,10 @@ private synchronized OutputStream getSocketInputOutputStream(int stream) {
211218
return pipedOutput.get(stream);
212219
}
213220

221+
public boolean supportsClose() {
222+
return this.protocol.equals("v5.channel.k8s.io");
223+
}
224+
214225
private class WebSocketOutputStream extends OutputStream {
215226

216227
private static final long MAX_QUEUE_SIZE = 16L * 1024 * 1024;
@@ -225,6 +236,20 @@ public WebSocketOutputStream(int stream) {
225236
this.stream = (byte) stream;
226237
}
227238

239+
@Override
240+
public void close() throws IOException {
241+
super.close();
242+
if (WebSocketStreamHandler.this.socket == null || !WebSocketStreamHandler.this.supportsClose()) {
243+
return;
244+
}
245+
byte[] buffer = new byte[2];
246+
buffer[0] = (byte) CLOSE;
247+
buffer[1] = stream;
248+
249+
ByteString byteString = ByteString.of(buffer);
250+
WebSocketStreamHandler.this.socket.send(byteString);
251+
}
252+
228253
@Override
229254
public void flush() throws IOException {
230255
if (state == State.CLOSED) {

util/src/main/java/io/kubernetes/client/util/WebSockets.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@
3636
public class WebSockets {
3737
private static final Logger log = LoggerFactory.getLogger(WebSockets.class);
3838

39-
// Only support v4 stream protocol as it was available since k8s 1.4
40-
public static final String V4_STREAM_PROTOCOL = "v4.channel.k8s.io";
39+
public static final String[] protocols = {
40+
// Only support v5 stream protocol as it was available since k8s 1.30
41+
"v5.channel.k8s.io",
42+
// Only support v4 stream protocol as it was available since k8s 1.4
43+
"v4.channel.k8s.io"
44+
};
45+
public static final String K8S_STREAM_PROTOCOL = String.join(",", protocols);
4146
public static final String STREAM_PROTOCOL_HEADER = "Sec-WebSocket-Protocol";
4247
public static final String SPDY_3_1 = "SPDY/3.1";
4348
public static final String CONNECTION = "Connection";
@@ -91,7 +96,7 @@ public static void stream(
9196
throws ApiException, IOException {
9297

9398
HashMap<String, String> headers = new HashMap<String, String>();
94-
headers.put(STREAM_PROTOCOL_HEADER, V4_STREAM_PROTOCOL);
99+
headers.put(STREAM_PROTOCOL_HEADER, K8S_STREAM_PROTOCOL);
95100
headers.put(WebSockets.CONNECTION, WebSockets.UPGRADE);
96101
headers.put(WebSockets.UPGRADE, SPDY_3_1);
97102
String[] localVarAuthNames = new String[] {"BearerToken"};

util/src/test/java/io/kubernetes/client/WebsocketStreamHandlerTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package io.kubernetes.client;
1414

1515
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1617

1718
import io.kubernetes.client.util.WebSocketStreamHandler;
1819
import java.io.ByteArrayInputStream;
@@ -112,6 +113,60 @@ void handlerSendingLargeData() throws IOException {
112113
assertThat(mockWebSocket.data).containsExactly(output);
113114
}
114115

116+
@Test
117+
void handlerSendingClose() throws IOException {
118+
int testStreamId = 0;
119+
120+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
121+
MockWebSocket mockWebSocket = new MockWebSocket();
122+
123+
handler.open("v5.channel.k8s.io", mockWebSocket);
124+
125+
OutputStream outputStream = handler.getOutputStream(testStreamId);
126+
outputStream.close();
127+
128+
byte[] output = {(byte) 255, (byte) testStreamId};
129+
assertThat(mockWebSocket.data).containsExactly(output);
130+
}
131+
132+
@Test
133+
void handlerNotSendingClose() throws IOException {
134+
int testStreamId = 0;
135+
136+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
137+
MockWebSocket mockWebSocket = new MockWebSocket();
138+
139+
handler.open("v4.channel.k8s.io", mockWebSocket);
140+
141+
OutputStream outputStream = handler.getOutputStream(testStreamId);
142+
outputStream.close();
143+
144+
assertThat(mockWebSocket.data).isNull();
145+
}
146+
147+
@Test
148+
void handlerReceivingClosed() throws IOException {
149+
int testStreamId = 0;
150+
byte[] testDatas =
151+
new byte[] {(byte) 255, (byte) testStreamId };
152+
ByteArrayInputStream testBytesInputStream = new ByteArrayInputStream(testDatas);
153+
154+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
155+
MockWebSocket mockWebSocket = new MockWebSocket();
156+
157+
handler.open(testProtocol, mockWebSocket);
158+
159+
InputStream inputStream = handler.getInputStream(testStreamId);
160+
161+
// handler receiving
162+
handler.bytesMessage(testBytesInputStream);
163+
164+
assertThat(inputStream.available() == 0);
165+
assertThatThrownBy(() -> {
166+
inputStream.read();
167+
}).isInstanceOf(IOException.class);
168+
}
169+
115170
private static class MockWebSocket implements WebSocket {
116171
byte[] data;
117172
private boolean closed = false;

0 commit comments

Comments
 (0)