@@ -77,7 +77,9 @@ class S3RetryingInputStream extends InputStream {
77
77
this .failures = new ArrayList <>(MAX_SUPPRESSED_EXCEPTIONS );
78
78
this .start = start ;
79
79
this .end = end ;
80
+ final int initialAttempt = attempt ;
80
81
openStreamWithRetry ();
82
+ maybeLogForSuccessAfterRetries (initialAttempt , "opened" );
81
83
}
82
84
83
85
private void openStreamWithRetry () throws IOException {
@@ -130,14 +132,16 @@ private long getStreamLength(final S3Object object) {
130
132
@ Override
131
133
public int read () throws IOException {
132
134
ensureOpen ();
135
+ final int initialAttempt = attempt ;
133
136
while (true ) {
134
137
try {
135
138
final int result = currentStream .read ();
136
139
if (result == -1 ) {
137
140
eof = true ;
138
- return -1 ;
141
+ } else {
142
+ currentOffset += 1 ;
139
143
}
140
- currentOffset += 1 ;
144
+ maybeLogForSuccessAfterRetries ( initialAttempt , "read" ) ;
141
145
return result ;
142
146
} catch (IOException e ) {
143
147
reopenStreamOrFail (e );
@@ -148,14 +152,16 @@ public int read() throws IOException {
148
152
@ Override
149
153
public int read (byte [] b , int off , int len ) throws IOException {
150
154
ensureOpen ();
155
+ final int initialAttempt = attempt ;
151
156
while (true ) {
152
157
try {
153
158
final int bytesRead = currentStream .read (b , off , len );
154
159
if (bytesRead == -1 ) {
155
160
eof = true ;
156
- return -1 ;
161
+ } else {
162
+ currentOffset += bytesRead ;
157
163
}
158
- currentOffset += bytesRead ;
164
+ maybeLogForSuccessAfterRetries ( initialAttempt , "read" ) ;
159
165
return bytesRead ;
160
166
} catch (IOException e ) {
161
167
reopenStreamOrFail (e );
@@ -192,8 +198,8 @@ private <T extends Exception> long maybeLogAndComputeRetryDelay(String action, T
192
198
throw finalException ;
193
199
}
194
200
195
- // Log at info level for the 1st retry and every ~5 minutes afterward
196
- logForRetry ((attempt == 1 || attempt % 30 == 0 ) ? Level .INFO : Level .DEBUG , action , e );
201
+ // Log at info level for the 1st retry and then exponentially less
202
+ logForRetry (Integer . bitCount (attempt ) == 1 ? Level .INFO : Level .DEBUG , action , e );
197
203
if (failures .size () < MAX_SUPPRESSED_EXCEPTIONS ) {
198
204
failures .add (e );
199
205
}
@@ -239,6 +245,19 @@ private void logForRetry(Level level, String action, Exception e) {
239
245
);
240
246
}
241
247
248
+ private void maybeLogForSuccessAfterRetries (int initialAttempt , String action ) {
249
+ if (attempt > initialAttempt ) {
250
+ logger .info (
251
+ "successfully {} input stream for [{}/{}] with purpose [{}] after [{}] retries" ,
252
+ action ,
253
+ blobStore .bucket (),
254
+ blobKey ,
255
+ purpose ,
256
+ attempt - initialAttempt
257
+ );
258
+ }
259
+ }
260
+
242
261
private long currentStreamProgress () {
243
262
return Math .subtractExact (Math .addExact (start , currentOffset ), currentStreamFirstOffset );
244
263
}
0 commit comments