Skip to content

Commit 941da8b

Browse files
authored
fix: Content-Encoding: gzip along with Transfer-Encoding: chunked sometimes terminates early (googleapis#1608)
#### The issue When `GZIPInputStream` completes processing an individual member it will call `InputStream#available()` to determine if there is more stream to try and process. If the call to `available()` returns 0 `GZIPInputStream` will determine it has processed the entirety of the underlying stream. This is spurious, as `InputStream#available()` is allowed to return 0 if it would require blocking in order for more bytes to be available. When `GZIPInputStream` is reading from a `Transfer-Encoding: chunked` response, if the chunk boundary happens to align closely enough to the member boundary `GZIPInputStream` won't consume the whole response. #### The fix Add new `OptimisticAvailabilityInputStream`, which provides an optimistic "estimate" of the number of `available()` bytes in the underlying stream. When instantiating a `GZIPInputStream` for a response, automatically decorate the provided `InputStream` with an `OptimisticAvailabilityInputStream`. #### Verification This scenario isn't unique to processing of chunked responses, and can be replicated reliably using a `java.io.SequenceInputStream` with two underlying `java.io.ByteArrayInputStream`. See GzipSupportTest.java for a reproduction. The need for this class has been verified for the following JVMs: * ``` openjdk version "1.8.0_292" OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_292-b10) OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.292-b10, mixed mode) ``` * ``` openjdk version "11.0.14.1" 2022-02-08 OpenJDK Runtime Environment Temurin-11.0.14.1+1 (build 11.0.14.1+1) OpenJDK 64-Bit Server VM Temurin-11.0.14.1+1 (build 11.0.14.1+1, mixed mode) ``` * ``` openjdk version "17" 2021-09-14 OpenJDK Runtime Environment Temurin-17+35 (build 17+35) OpenJDK 64-Bit Server VM Temurin-17+35 (build 17+35, mixed mode, sharing) ```
1 parent 7e91e67 commit 941da8b

File tree

3 files changed

+192
-2
lines changed

3 files changed

+192
-2
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package com.google.api.client.http;
2+
3+
import java.io.FilterInputStream;
4+
import java.io.IOException;
5+
import java.io.InputStream;
6+
import java.util.zip.GZIPInputStream;
7+
8+
final class GzipSupport {
9+
10+
private GzipSupport() {}
11+
12+
static GZIPInputStream newGzipInputStream(InputStream in) throws IOException {
13+
return new GZIPInputStream(new OptimisticAvailabilityInputStream(in));
14+
}
15+
16+
/**
17+
* When {@link GZIPInputStream} completes processing an individual member it will call {@link
18+
* InputStream#available()} to determine if there is more stream to try and process. If the call
19+
* to {@code available()} returns 0 {@code GZIPInputStream} will determine it has processed the
20+
* entirety of the underlying stream. This is spurious, as {@link InputStream#available()} is
21+
* allowed to return 0 if it would require blocking in order for more bytes to be available. When
22+
* {@code GZIPInputStream} is reading from a {@code Transfer-Encoding: chunked} response, if the
23+
* chunk boundary happens to align closely enough to the member boundary {@code GZIPInputStream}
24+
* won't consume the whole response.
25+
*
26+
* <p>This class, provides an optimistic "estimate" (in actuality, a lie) of the number of {@code
27+
* available()} bytes in the underlying stream. It does this by tracking the last number of bytes
28+
* read. If the last number of bytes read is grater than -1, we return {@link Integer#MAX_VALUE}
29+
* to any call of {@link #available()}.
30+
*
31+
* <p>We're breaking the contract of available() in that we're lying about how much data we have
32+
* accessible without blocking, however in the case where we're weaving {@link GZIPInputStream}
33+
* into response processing we already know there are going to be blocking calls to read before
34+
* the stream is exhausted.
35+
*
36+
* <p>This scenario isn't unique to processing of chunked responses, and can be replicated
37+
* reliably using a {@link java.io.SequenceInputStream} with two underlying {@link
38+
* java.io.ByteArrayInputStream}. See the corresponding test class for a reproduction.
39+
*
40+
* <p>The need for this class has been verified for the following JVMs:
41+
*
42+
* <ol>
43+
* <li>
44+
* <pre>
45+
* openjdk version "1.8.0_292"
46+
* OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_292-b10)
47+
* OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.292-b10, mixed mode)
48+
* </pre>
49+
* <li>
50+
* <pre>
51+
* openjdk version "11.0.14.1" 2022-02-08
52+
* OpenJDK Runtime Environment Temurin-11.0.14.1+1 (build 11.0.14.1+1)
53+
* OpenJDK 64-Bit Server VM Temurin-11.0.14.1+1 (build 11.0.14.1+1, mixed mode)
54+
* </pre>
55+
* <li>
56+
* <pre>
57+
* openjdk version "17" 2021-09-14
58+
* OpenJDK Runtime Environment Temurin-17+35 (build 17+35)
59+
* OpenJDK 64-Bit Server VM Temurin-17+35 (build 17+35, mixed mode, sharing)
60+
* </pre>
61+
* </ol>
62+
*/
63+
private static final class OptimisticAvailabilityInputStream extends FilterInputStream {
64+
private int lastRead = 0;
65+
66+
OptimisticAvailabilityInputStream(InputStream delegate) {
67+
super(delegate);
68+
}
69+
70+
@Override
71+
public int available() throws IOException {
72+
return lastRead > -1 ? Integer.MAX_VALUE : 0;
73+
}
74+
75+
@Override
76+
public int read() throws IOException {
77+
return lastRead = super.read();
78+
}
79+
80+
@Override
81+
public int read(byte[] b) throws IOException {
82+
return lastRead = super.read(b);
83+
}
84+
85+
@Override
86+
public int read(byte[] b, int off, int len) throws IOException {
87+
return lastRead = super.read(b, off, len);
88+
}
89+
}
90+
}

google-http-client/src/main/java/com/google/api/client/http/HttpResponse.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Locale;
3131
import java.util.logging.Level;
3232
import java.util.logging.Logger;
33-
import java.util.zip.GZIPInputStream;
3433

3534
/**
3635
* HTTP response.
@@ -362,7 +361,7 @@ public InputStream getContent() throws IOException {
362361
// GZIPInputStream.close() --> ConsumingInputStream.close() -->
363362
// exhaust(ConsumingInputStream)
364363
lowLevelResponseContent =
365-
new GZIPInputStream(new ConsumingInputStream(lowLevelResponseContent));
364+
GzipSupport.newGzipInputStream(new ConsumingInputStream(lowLevelResponseContent));
366365
}
367366
}
368367
// logging (wrap content with LoggingInputStream)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.api.client.http;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
21+
import com.google.common.io.ByteStreams;
22+
import com.google.common.io.CountingInputStream;
23+
import java.io.ByteArrayInputStream;
24+
import java.io.ByteArrayOutputStream;
25+
import java.io.IOException;
26+
import java.io.InputStream;
27+
import java.io.SequenceInputStream;
28+
import java.util.zip.GZIPInputStream;
29+
import org.junit.Test;
30+
31+
public final class GzipSupportTest {
32+
33+
@SuppressWarnings("UnstableApiUsage") // CountingInputStream is @Beta
34+
@Test
35+
public void gzipInputStreamConsumesAllBytes() throws IOException {
36+
byte[] data = new byte[] {(byte) 'a', (byte) 'b'};
37+
// `echo -n a > a.txt && gzip -n9 a.txt`
38+
byte[] member0 =
39+
new byte[] {
40+
0x1f,
41+
(byte) 0x8b,
42+
0x08,
43+
0x00,
44+
0x00,
45+
0x00,
46+
0x00,
47+
0x00,
48+
0x02,
49+
0x03,
50+
0x4b,
51+
0x04,
52+
0x00,
53+
(byte) 0x43,
54+
(byte) 0xbe,
55+
(byte) 0xb7,
56+
(byte) 0xe8,
57+
0x01,
58+
0x00,
59+
0x00,
60+
0x00
61+
};
62+
// `echo -n b > b.txt && gzip -n9 b.txt`
63+
byte[] member1 =
64+
new byte[] {
65+
0x1f,
66+
(byte) 0x8b,
67+
0x08,
68+
0x00,
69+
0x00,
70+
0x00,
71+
0x00,
72+
0x00,
73+
0x02,
74+
0x03,
75+
0x4b,
76+
0x02,
77+
0x00,
78+
(byte) 0xf9,
79+
(byte) 0xef,
80+
(byte) 0xbe,
81+
(byte) 0x71,
82+
0x01,
83+
0x00,
84+
0x00,
85+
0x00
86+
};
87+
int totalZippedBytes = member0.length + member1.length;
88+
try (InputStream s =
89+
new SequenceInputStream(
90+
new ByteArrayInputStream(member0), new ByteArrayInputStream(member1));
91+
CountingInputStream countS = new CountingInputStream(s);
92+
GZIPInputStream g = GzipSupport.newGzipInputStream(countS);
93+
CountingInputStream countG = new CountingInputStream(g)) {
94+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
95+
ByteStreams.copy(countG, baos);
96+
assertThat(baos.toByteArray()).isEqualTo(data);
97+
assertThat(countG.getCount()).isEqualTo(data.length);
98+
assertThat(countS.getCount()).isEqualTo(totalZippedBytes);
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)