15
15
16
16
package com .madgag .aws .sdk .async .responsebytes .awssdk .core .internal .async ;
17
17
18
- import static software .amazon .awssdk .utils .FunctionalUtils .invokeSafely ;
19
-
20
- import java .io .ByteArrayOutputStream ;
21
- import java .nio .ByteBuffer ;
22
- import java .util .concurrent .CompletableFuture ;
23
18
import org .reactivestreams .Subscriber ;
24
19
import org .reactivestreams .Subscription ;
25
20
import software .amazon .awssdk .annotations .SdkInternalApi ;
28
23
import software .amazon .awssdk .core .async .SdkPublisher ;
29
24
import software .amazon .awssdk .utils .BinaryUtils ;
30
25
26
+ import java .io .ByteArrayOutputStream ;
27
+ import java .nio .ByteBuffer ;
28
+ import java .util .Optional ;
29
+ import java .util .concurrent .CompletableFuture ;
30
+ import java .util .function .Function ;
31
+
32
+ import static software .amazon .awssdk .utils .FunctionalUtils .invokeSafely ;
33
+
31
34
/**
32
35
* Implementation of {@link AsyncResponseTransformer} that dumps content into a byte array and supports further
33
36
* conversions into types, like strings.
41
44
public final class ByteArrayAsyncResponseTransformerAlternative <ResponseT > implements
42
45
AsyncResponseTransformer <ResponseT , ResponseBytes <ResponseT >> {
43
46
47
+ private final Optional <Function <ResponseT , Integer >> knownSize ;
44
48
private volatile CompletableFuture <byte []> cf ;
45
49
private volatile ResponseT response ;
46
50
51
+ public ByteArrayAsyncResponseTransformerAlternative (Optional <Function <ResponseT , Integer >> knownSize ) {
52
+ this .knownSize = knownSize ;
53
+ }
54
+
47
55
@ Override
48
56
public CompletableFuture <ResponseBytes <ResponseT >> prepare () {
49
57
cf = new CompletableFuture <>();
@@ -57,7 +65,9 @@ public void onResponse(ResponseT response) {
57
65
58
66
@ Override
59
67
public void onStream (SdkPublisher <ByteBuffer > publisher ) {
60
- publisher .subscribe (new BaosSubscriber (cf ));
68
+ ByteArrayOutputStream baos =
69
+ knownSize .map (f -> new ByteArrayOutputStream (f .apply (response ))).orElse (new ByteArrayOutputStream ());
70
+ publisher .subscribe (new BaosSubscriber (cf , baos ));
61
71
}
62
72
63
73
@ Override
@@ -68,12 +78,13 @@ public void exceptionOccurred(Throwable throwable) {
68
78
static class BaosSubscriber implements Subscriber <ByteBuffer > {
69
79
private final CompletableFuture <byte []> resultFuture ;
70
80
71
- private ByteArrayOutputStream baos = new ByteArrayOutputStream () ;
81
+ private ByteArrayOutputStream baos ;
72
82
73
83
private Subscription subscription ;
74
84
75
- BaosSubscriber (CompletableFuture <byte []> resultFuture ) {
85
+ BaosSubscriber (CompletableFuture <byte []> resultFuture , ByteArrayOutputStream baos ) {
76
86
this .resultFuture = resultFuture ;
87
+ this .baos = baos ;
77
88
}
78
89
79
90
@ Override
0 commit comments