Skip to content

Commit e55215c

Browse files
Merge branch 'master' into terminalSize
2 parents 4b611d7 + 1763d26 commit e55215c

File tree

4 files changed

+90
-5
lines changed

4 files changed

+90
-5
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
<snakeyaml.version>2.3</snakeyaml.version>
5050
<slf4j.version>2.0.16</slf4j.version>
5151
<caffeine.version>2.9.3</caffeine.version>
52-
<protobuf.version>4.29.0</protobuf.version>
52+
<protobuf.version>4.29.1</protobuf.version>
5353
<junit-jupiter.version>5.11.3</junit-jupiter.version>
5454
<bucket4j.version>8.10.1</bucket4j.version>
5555
<bouncycastle.version>1.79</bouncycastle.version>

util/src/main/java/io/kubernetes/client/util/WebSocketStreamHandler.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737
*/
3838
public class WebSocketStreamHandler implements WebSockets.SocketListener, Closeable {
3939
private static final Logger log = LoggerFactory.getLogger(WebSocketStreamHandler.class);
40+
private static final int CLOSE = 255;
4041

41-
private final Map<Integer, PipedInputStream> input = new HashMap<>();
42+
private final Map<Integer, InputStream> input = new HashMap<>();
4243
private final Map<Integer, PipedOutputStream> pipedOutput = new HashMap<>();
4344
private final Map<Integer, OutputStream> output = new HashMap<>();
4445
private WebSocket socket;
@@ -93,6 +94,12 @@ public void textMessage(Reader in) {
9394

9495
protected void handleMessage(int stream, InputStream inStream) throws IOException {
9596
try {
97+
if (stream == CLOSE) {
98+
stream = inStream.read();
99+
InputStream in = getInputStream(stream);
100+
in.close();
101+
return;
102+
}
96103
OutputStream out = getSocketInputOutputStream(stream);
97104
Streams.copy(inStream, out);
98105
out.flush();
@@ -216,6 +223,10 @@ public void injectOutputStream(int streamNum, OutputStream stream) {
216223
output.put(streamNum, stream);
217224
}
218225

226+
public boolean supportsClose() {
227+
return this.protocol.equals("v5.channel.k8s.io");
228+
}
229+
219230
private class WebSocketOutputStream extends OutputStream {
220231

221232
private static final long MAX_QUEUE_SIZE = 16L * 1024 * 1024;
@@ -230,6 +241,20 @@ public WebSocketOutputStream(int stream) {
230241
this.stream = (byte) stream;
231242
}
232243

244+
@Override
245+
public void close() throws IOException {
246+
super.close();
247+
if (WebSocketStreamHandler.this.socket == null || !WebSocketStreamHandler.this.supportsClose()) {
248+
return;
249+
}
250+
byte[] buffer = new byte[2];
251+
buffer[0] = (byte) CLOSE;
252+
buffer[1] = stream;
253+
254+
ByteString byteString = ByteString.of(buffer);
255+
WebSocketStreamHandler.this.socket.send(byteString);
256+
}
257+
233258
@Override
234259
public void flush() throws IOException {
235260
if (state == State.CLOSED) {

util/src/main/java/io/kubernetes/client/util/WebSockets.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@
3636
public class WebSockets {
3737
private static final Logger log = LoggerFactory.getLogger(WebSockets.class);
3838

39-
// Only support v4 stream protocol as it was available since k8s 1.4
40-
public static final String V4_STREAM_PROTOCOL = "v4.channel.k8s.io";
39+
public static final String[] protocols = {
40+
// Only support v5 stream protocol as it was available since k8s 1.30
41+
"v5.channel.k8s.io",
42+
// Only support v4 stream protocol as it was available since k8s 1.4
43+
"v4.channel.k8s.io"
44+
};
45+
public static final String K8S_STREAM_PROTOCOL = String.join(",", protocols);
4146
public static final String STREAM_PROTOCOL_HEADER = "Sec-WebSocket-Protocol";
4247
public static final String SPDY_3_1 = "SPDY/3.1";
4348
public static final String CONNECTION = "Connection";
@@ -91,7 +96,7 @@ public static void stream(
9196
throws ApiException, IOException {
9297

9398
HashMap<String, String> headers = new HashMap<String, String>();
94-
headers.put(STREAM_PROTOCOL_HEADER, V4_STREAM_PROTOCOL);
99+
headers.put(STREAM_PROTOCOL_HEADER, K8S_STREAM_PROTOCOL);
95100
headers.put(WebSockets.CONNECTION, WebSockets.UPGRADE);
96101
headers.put(WebSockets.UPGRADE, SPDY_3_1);
97102
String[] localVarAuthNames = new String[] {"BearerToken"};

util/src/test/java/io/kubernetes/client/WebsocketStreamHandlerTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package io.kubernetes.client;
1414

1515
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1617

1718
import io.kubernetes.client.util.WebSocketStreamHandler;
1819
import java.io.ByteArrayInputStream;
@@ -112,6 +113,60 @@ void handlerSendingLargeData() throws IOException {
112113
assertThat(mockWebSocket.data).containsExactly(output);
113114
}
114115

116+
@Test
117+
void handlerSendingClose() throws IOException {
118+
int testStreamId = 0;
119+
120+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
121+
MockWebSocket mockWebSocket = new MockWebSocket();
122+
123+
handler.open("v5.channel.k8s.io", mockWebSocket);
124+
125+
OutputStream outputStream = handler.getOutputStream(testStreamId);
126+
outputStream.close();
127+
128+
byte[] output = {(byte) 255, (byte) testStreamId};
129+
assertThat(mockWebSocket.data).containsExactly(output);
130+
}
131+
132+
@Test
133+
void handlerNotSendingClose() throws IOException {
134+
int testStreamId = 0;
135+
136+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
137+
MockWebSocket mockWebSocket = new MockWebSocket();
138+
139+
handler.open("v4.channel.k8s.io", mockWebSocket);
140+
141+
OutputStream outputStream = handler.getOutputStream(testStreamId);
142+
outputStream.close();
143+
144+
assertThat(mockWebSocket.data).isNull();
145+
}
146+
147+
@Test
148+
void handlerReceivingClosed() throws IOException {
149+
int testStreamId = 0;
150+
byte[] testDatas =
151+
new byte[] {(byte) 255, (byte) testStreamId };
152+
ByteArrayInputStream testBytesInputStream = new ByteArrayInputStream(testDatas);
153+
154+
WebSocketStreamHandler handler = new WebSocketStreamHandler();
155+
MockWebSocket mockWebSocket = new MockWebSocket();
156+
157+
handler.open(testProtocol, mockWebSocket);
158+
159+
InputStream inputStream = handler.getInputStream(testStreamId);
160+
161+
// handler receiving
162+
handler.bytesMessage(testBytesInputStream);
163+
164+
assertThat(inputStream.available() == 0);
165+
assertThatThrownBy(() -> {
166+
inputStream.read();
167+
}).isInstanceOf(IOException.class);
168+
}
169+
115170
private static class MockWebSocket implements WebSocket {
116171
byte[] data;
117172
private boolean closed = false;

0 commit comments

Comments
 (0)