16
16
package software .amazon .awssdk .http .crt .internal ;
17
17
18
18
import static org .assertj .core .api .Assertions .assertThatThrownBy ;
19
+ import static org .mockito .ArgumentMatchers .any ;
19
20
import static org .mockito .ArgumentMatchers .anyInt ;
20
21
import static org .mockito .Mockito .never ;
21
22
import static org .mockito .Mockito .verify ;
23
+ import static org .mockito .Mockito .when ;
22
24
25
+ import java .nio .ByteBuffer ;
23
26
import java .nio .charset .StandardCharsets ;
24
27
import java .util .concurrent .CompletableFuture ;
28
+ import java .util .concurrent .atomic .AtomicBoolean ;
29
+ import java .util .concurrent .atomic .AtomicInteger ;
30
+ import org .apache .commons .lang3 .RandomStringUtils ;
25
31
import org .junit .jupiter .api .BeforeEach ;
26
32
import org .junit .jupiter .api .Test ;
27
33
import org .junit .jupiter .api .extension .ExtendWith ;
28
34
import org .junit .jupiter .params .ParameterizedTest ;
29
35
import org .junit .jupiter .params .provider .ValueSource ;
30
36
import org .mockito .Mock ;
37
+ import org .mockito .Mockito ;
31
38
import org .mockito .junit .jupiter .MockitoExtension ;
32
39
import software .amazon .awssdk .crt .http .HttpClientConnection ;
33
40
import software .amazon .awssdk .crt .http .HttpException ;
34
41
import software .amazon .awssdk .crt .http .HttpHeader ;
35
42
import software .amazon .awssdk .crt .http .HttpHeaderBlock ;
36
43
import software .amazon .awssdk .crt .http .HttpStream ;
37
44
import software .amazon .awssdk .crt .http .HttpStreamResponseHandler ;
45
+ import software .amazon .awssdk .http .crt .internal .response .InputStreamAdaptingHttpStreamResponseHandler ;
46
+ import software .amazon .awssdk .utils .async .SimplePublisher ;
38
47
39
48
@ ExtendWith (MockitoExtension .class )
40
49
public abstract class BaseHttpStreamResponseHandlerTest {
@@ -44,10 +53,15 @@ public abstract class BaseHttpStreamResponseHandlerTest {
44
53
@ Mock
45
54
HttpStream httpStream ;
46
55
56
+ @ Mock
57
+ SimplePublisher <ByteBuffer > simplePublisher ;
58
+
47
59
HttpStreamResponseHandler responseHandler ;
48
60
49
61
abstract HttpStreamResponseHandler responseHandler ();
50
62
63
+ abstract HttpStreamResponseHandler responseHandlerWithMockedPublisher (SimplePublisher <ByteBuffer > simplePublisher );
64
+
51
65
@ BeforeEach
52
66
public void setUp () {
53
67
requestFuture = new CompletableFuture <>();
@@ -113,6 +127,101 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException {
113
127
verify (httpStream , never ()).incrementWindow (anyInt ());
114
128
}
115
129
130
+ @ Test
131
+ void publisherWritesFutureFails_shouldShutdownConnection () {
132
+ SimplePublisher <ByteBuffer > simplePublisher = Mockito .mock (SimplePublisher .class );
133
+ CompletableFuture <Void > future = new CompletableFuture <>();
134
+ when (simplePublisher .send (any (ByteBuffer .class ))).thenReturn (future );
135
+
136
+ HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher (simplePublisher );
137
+ HttpHeader [] httpHeaders = getHttpHeaders ();
138
+
139
+ handler .onResponseHeaders (httpStream , 200 , HttpHeaderBlock .MAIN .getValue (),
140
+ httpHeaders );
141
+ handler .onResponseHeadersDone (httpStream , 0 );
142
+ handler .onResponseBody (httpStream ,
143
+ RandomStringUtils .random (1 * 1024 * 1024 ).getBytes (StandardCharsets .UTF_8 ));
144
+ RuntimeException runtimeException = new RuntimeException ();
145
+ future .completeExceptionally (runtimeException );
146
+
147
+ try {
148
+ requestFuture .join ();
149
+ } catch (Exception e ) {
150
+ // we don't verify here because it behaves differently in async and sync
151
+ }
152
+
153
+ verify (crtConn ).shutdown ();
154
+ verify (crtConn ).close ();
155
+ verify (httpStream ).close ();
156
+ verify (httpStream , never ()).incrementWindow (anyInt ());
157
+ }
158
+
159
+ @ Test
160
+ void publisherWritesFutureCompletesAfterConnectionClosed_shouldNotInvokeIncrementWindow () {
161
+ CompletableFuture <Void > future = new CompletableFuture <>();
162
+ when (simplePublisher .send (any (ByteBuffer .class ))).thenReturn (future );
163
+ when (simplePublisher .complete ()).thenReturn (future );
164
+
165
+ HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher (simplePublisher );
166
+
167
+
168
+ HttpHeader [] httpHeaders = getHttpHeaders ();
169
+
170
+ handler .onResponseHeaders (httpStream , 200 , HttpHeaderBlock .MAIN .getValue (),
171
+ httpHeaders );
172
+ handler .onResponseHeadersDone (httpStream , 0 );
173
+ handler .onResponseBody (httpStream ,
174
+ RandomStringUtils .random (1 * 1024 * 1024 ).getBytes (StandardCharsets .UTF_8 ));
175
+ handler .onResponseComplete (httpStream , 0 );
176
+ future .complete (null );
177
+
178
+ requestFuture .join ();
179
+ verify (crtConn , never ()).shutdown ();
180
+ verify (crtConn ).close ();
181
+ verify (httpStream ).close ();
182
+ verify (httpStream , never ()).incrementWindow (anyInt ());
183
+ }
184
+
185
+ @ Test
186
+ void publisherWritesFutureCompletesWhenConnectionClosed_shouldNotInvokeIncrementWindow () {
187
+ CompletableFuture <Void > future = new CompletableFuture <>();
188
+ when (simplePublisher .send (any (ByteBuffer .class ))).thenReturn (future );
189
+ when (simplePublisher .complete ()).thenReturn (future );
190
+
191
+ HttpStreamResponseHandler handler = responseHandlerWithMockedPublisher (simplePublisher );
192
+
193
+
194
+ HttpHeader [] httpHeaders = getHttpHeaders ();
195
+
196
+ handler .onResponseHeaders (httpStream , 200 , HttpHeaderBlock .MAIN .getValue (),
197
+ httpHeaders );
198
+ handler .onResponseHeadersDone (httpStream , 0 );
199
+ handler .onResponseBody (httpStream ,
200
+ RandomStringUtils .random (1 * 1024 * 1024 ).getBytes (StandardCharsets .UTF_8 ));
201
+
202
+ // This tracker tracks which of the two operation completes first
203
+ AtomicInteger whenCompleteTracker = new AtomicInteger (0 );
204
+ CompletableFuture .runAsync (() -> handler .onResponseComplete (httpStream , 0 ))
205
+ .whenComplete ((r , t ) -> whenCompleteTracker .compareAndSet (0 , 1 ));
206
+
207
+ CompletableFuture .runAsync (() -> future .complete (null ))
208
+ .whenComplete ((r , t ) -> whenCompleteTracker .compareAndSet (0 , 2 ));
209
+
210
+
211
+ requestFuture .join ();
212
+
213
+ if (whenCompleteTracker .get () == 1 ) {
214
+ // onResponseComplete finishes first
215
+ verify (httpStream , never ()).incrementWindow (anyInt ());
216
+ } else {
217
+ verify (httpStream ).incrementWindow (anyInt ());
218
+ }
219
+
220
+ verify (crtConn , never ()).shutdown ();
221
+ verify (crtConn ).close ();
222
+ verify (httpStream ).close ();
223
+ }
224
+
116
225
static HttpHeader [] getHttpHeaders () {
117
226
HttpHeader [] httpHeaders = new HttpHeader [1 ];
118
227
httpHeaders [0 ] = new HttpHeader ("Content-Length" , "1" );
0 commit comments