Skip to content

Commit a39cab3

Browse files
feat: port internal event stream changes (#288)
Co-authored-by: Bret Ambrose <[email protected]>
1 parent 14fa606 commit a39cab3

File tree

7 files changed

+257
-137
lines changed

7 files changed

+257
-137
lines changed

sdk/greengrass/event-stream-rpc-client/src/main/java/software/amazon/awssdk/eventstreamrpc/EventStreamRPCClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package software.amazon.awssdk.eventstreamrpc;
22

3-
import software.amazon.awssdk.crt.eventstream.*;
3+
import software.amazon.awssdk.crt.eventstream.ClientConnectionContinuation;
4+
import software.amazon.awssdk.crt.eventstream.ClientConnectionContinuationHandler;
5+
import software.amazon.awssdk.crt.eventstream.Header;
6+
import software.amazon.awssdk.crt.eventstream.HeaderType;
7+
import software.amazon.awssdk.crt.eventstream.MessageFlags;
8+
import software.amazon.awssdk.crt.eventstream.MessageType;
49
import software.amazon.awssdk.eventstreamrpc.model.EventStreamJsonMessage;
510
import software.amazon.awssdk.eventstreamrpc.model.EventStreamOperationError;
611

sdk/greengrass/event-stream-rpc-client/src/main/java/software/amazon/awssdk/eventstreamrpc/EventStreamRPCConnection.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
package software.amazon.awssdk.eventstreamrpc;
22

3+
import software.amazon.awssdk.crt.CRT;
4+
import software.amazon.awssdk.crt.CrtRuntimeException;
5+
import software.amazon.awssdk.crt.eventstream.ClientConnection;
6+
import software.amazon.awssdk.crt.eventstream.ClientConnectionContinuation;
7+
import software.amazon.awssdk.crt.eventstream.ClientConnectionContinuationHandler;
8+
import software.amazon.awssdk.crt.eventstream.ClientConnectionHandler;
9+
import software.amazon.awssdk.crt.eventstream.Header;
10+
import software.amazon.awssdk.crt.eventstream.MessageFlags;
11+
import software.amazon.awssdk.crt.eventstream.MessageType;
12+
import software.amazon.awssdk.eventstreamrpc.model.AccessDeniedException;
13+
import software.amazon.awssdk.eventstreamrpc.model.EventStreamError;
14+
315
import java.util.ArrayList;
416
import java.util.List;
517
import java.util.Optional;
618
import java.util.concurrent.CompletableFuture;
719
import java.util.logging.Logger;
820
import java.util.stream.Collectors;
921

10-
import software.amazon.awssdk.crt.CRT;
11-
import software.amazon.awssdk.crt.CrtRuntimeException;
12-
import software.amazon.awssdk.crt.eventstream.*;
13-
import software.amazon.awssdk.eventstreamrpc.model.AccessDeniedException;
14-
import software.amazon.awssdk.eventstreamrpc.model.EventStreamError;
15-
1622
public class EventStreamRPCConnection implements AutoCloseable {
1723
protected static class ConnectionState {
1824
enum Phase {
@@ -82,7 +88,7 @@ protected void onConnectionSetup(final ClientConnection clientConnection, int er
8288
connectionState.connectionPhase = ConnectionState.Phase.DISCONNECTED;
8389
initialConnectFuture.completeExceptionally(new CrtRuntimeException(errorCode, CRT.awsErrorName(errorCode)));
8490
} else if (connectionState.connectionPhase == ConnectionState.Phase.CLOSING) {
85-
connectionState.closeReason = new software.amazon.awssdk.eventstreamrpc.EventStreamClosedException("Event stream closed by client");
91+
connectionState.closeReason = new EventStreamClosedException("Event stream closed by client");
8692
disconnect();
8793
} else {
8894
connectionState.connectionPhase = ConnectionState.Phase.WAITING_CONNACK;
@@ -94,7 +100,7 @@ protected void onConnectionSetup(final ClientConnection clientConnection, int er
94100
return;
95101
}
96102
if (connectionState.connectionPhase == ConnectionState.Phase.CLOSING) {
97-
connectionState.closeReason = new software.amazon.awssdk.eventstreamrpc.EventStreamClosedException("Event stream closed by client");
103+
connectionState.closeReason = new EventStreamClosedException("Event stream closed by client");
98104
} else {
99105
try {
100106
final List<Header> headers = new ArrayList<>(messageAmendInfo.getHeaders().size() + 1);
@@ -205,7 +211,7 @@ public ClientConnectionContinuation newStream(ClientConnectionContinuationHandle
205211
if (connectionState.connectionPhase == ConnectionState.Phase.CONNECTED) {
206212
return connectionState.connection.newStream(continuationHandler);
207213
} else {
208-
throw new software.amazon.awssdk.eventstreamrpc.EventStreamClosedException("EventStream connection is not open!");
214+
throw new EventStreamClosedException("EventStream connection is not open!");
209215
}
210216
}
211217
}
@@ -220,7 +226,7 @@ public void disconnect() {
220226
connectionState.connection.closeConnection(0);
221227
}
222228
if (connectionState.closeReason == null) {
223-
connectionState.closeReason = new software.amazon.awssdk.eventstreamrpc.EventStreamClosedException("Event stream closed by client");
229+
connectionState.closeReason = new EventStreamClosedException("Event stream closed by client");
224230
}
225231
}
226232
}
@@ -270,7 +276,7 @@ public CompletableFuture<Void> sendPing(Optional<MessageAmendInfo> pingData) {
270276
ClientConnection connection;
271277
synchronized (connectionState) {
272278
if (connectionState.connectionPhase != ConnectionState.Phase.CONNECTED) {
273-
throw new software.amazon.awssdk.eventstreamrpc.EventStreamClosedException("EventStream connection not established");
279+
throw new EventStreamClosedException("EventStream connection not established");
274280
}
275281
connection = connectionState.connection;
276282
if (pingData.isPresent()) {
@@ -292,7 +298,7 @@ public CompletableFuture<Void> sendPingResponse(Optional<MessageAmendInfo> pingR
292298
ClientConnection connection;
293299
synchronized (connectionState) {
294300
if (connectionState.connectionPhase != ConnectionState.Phase.CONNECTED) {
295-
throw new software.amazon.awssdk.eventstreamrpc.EventStreamClosedException("EventStream connection not established");
301+
throw new EventStreamClosedException("EventStream connection not established");
296302
}
297303
connection = connectionState.connection;
298304
if (pingResponseData.isPresent()) {
@@ -338,7 +344,7 @@ public interface LifecycleHandler {
338344
* result in closing the connection. AccessDeniedException is such an example
339345
*
340346
* @param t Exception
341-
* @return true if the connection should be terminated as a result of handling the error
347+
* @returns true if the connection should be terminated as a result of handling the error
342348
*/
343349
boolean onError(Throwable t);
344350

sdk/greengrass/event-stream-rpc-client/src/main/java/software/amazon/awssdk/eventstreamrpc/EventStreamRPCConnectionConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package software.amazon.awssdk.eventstreamrpc;
22

3-
import software.amazon.awssdk.crt.io.*;
4-
import software.amazon.awssdk.eventstreamrpc.MessageAmendInfo;
3+
import software.amazon.awssdk.crt.io.ClientBootstrap;
4+
import software.amazon.awssdk.crt.io.ClientTlsContext;
5+
import software.amazon.awssdk.crt.io.EventLoopGroup;
6+
import software.amazon.awssdk.crt.io.SocketOptions;
57

68
import java.util.concurrent.CompletableFuture;
79
import java.util.function.Supplier;

sdk/greengrass/event-stream-rpc-client/src/main/java/software/amazon/awssdk/eventstreamrpc/StreamResponse.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package software.amazon.awssdk.eventstreamrpc;
22

3-
import software.amazon.awssdk.eventstreamrpc.StreamEventPublisher;
43
import software.amazon.awssdk.eventstreamrpc.model.EventStreamJsonMessage;
54

65
import java.util.concurrent.CompletableFuture;

sdk/greengrass/event-stream-rpc-model/src/main/java/software/amazon/awssdk/eventstreamrpc/EventStreamRPCServiceModel.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,50 @@ public abstract class EventStreamRPCServiceModel {
5050
GsonBuilder builder = new GsonBuilder();
5151
builder.registerTypeAdapterFactory(new ForceNullsForMapTypeAdapterFactory());
5252
builder.registerTypeAdapterFactory(OptionalTypeAdapter.FACTORY);
53+
builder.registerTypeAdapterFactory(EventStreamPostFromJsonTypeAdapter.FACTORY);
5354
builder.registerTypeAdapter(byte[].class, new Base64BlobSerializerDeserializer());
5455
builder.registerTypeAdapter(Instant.class, new InstantSerializerDeserializer());
5556
builder.excludeFieldsWithoutExposeAnnotation();
5657
GSON = builder.create();
5758
}
5859

60+
// Type adapter to automatically call "postFromJson" on all instances of EventStreamJsonMessage we construct
61+
private static class EventStreamPostFromJsonTypeAdapter<E extends EventStreamJsonMessage> extends TypeAdapter<E> {
62+
public static final TypeAdapterFactory FACTORY = new TypeAdapterFactory() {
63+
@Override
64+
public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
65+
if (EventStreamJsonMessage.class.isAssignableFrom(type.getRawType())) {
66+
final TypeAdapter<?> delegate = gson.getDelegateAdapter(this, type);
67+
return new EventStreamPostFromJsonTypeAdapter(delegate);
68+
}
69+
70+
return null;
71+
}
72+
};
73+
74+
private final TypeAdapter<E> adapter;
75+
76+
public EventStreamPostFromJsonTypeAdapter(TypeAdapter<E> adapter) {
77+
this.adapter = adapter;
78+
}
79+
80+
@Override
81+
public void write(JsonWriter out, E value) throws IOException {
82+
adapter.write(out, value);
83+
}
84+
85+
@Override
86+
public E read(JsonReader in) throws IOException {
87+
E obj = adapter.read(in);
88+
if (obj != null) {
89+
// Call postFromJson to finalize the deserialization. Especially important for unions to have their
90+
// member get set correctly.
91+
obj.postFromJson();
92+
}
93+
return obj;
94+
}
95+
}
96+
5997
private static class ForceNullsForMapTypeAdapterFactory implements TypeAdapterFactory {
6098

6199
public final <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
@@ -273,16 +311,12 @@ public EventStreamJsonMessage fromJson(final String applicationModelType, byte[]
273311
if (!clazz.isPresent()) {
274312
throw new UnmappedDataException(applicationModelType);
275313
}
276-
final EventStreamJsonMessage msg = fromJson(clazz.get(), payload);
277-
msg.postFromJson();
278-
return msg;
314+
return fromJson(clazz.get(), payload);
279315
}
280316

281317
public <T extends EventStreamJsonMessage> T fromJson(final Class<T> clazz, byte[] payload) {
282318
try {
283-
final T obj = getGson().fromJson(new String(payload, StandardCharsets.UTF_8), clazz);
284-
obj.postFromJson();
285-
return obj;
319+
return getGson().fromJson(new String(payload, StandardCharsets.UTF_8), clazz);
286320
} catch (Exception e) {
287321
throw new DeserializationException(payload, e);
288322
}

sdk/greengrass/event-stream-rpc-model/src/test/java/software/amazon/awssdk/eventstreamrpc/ObjectModelTests.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ void testBlobAndDeserializeEquivalence() {
4646
Assertions.assertTrue(requestObject.equals(deserialized));
4747
}
4848

49+
@Test
50+
void testEnumGetter() {
51+
MessageData data = new MessageData();
52+
for(FruitEnum value:FruitEnum.values()) {
53+
data.setEnumMessage(value);
54+
FruitEnum enumGet = data.getEnumMessage();
55+
}
56+
}
57+
4958
@Test
5059
void testEmptyObjectIsNotNullAndIsEmpty() {
5160
final EchoMessageRequest requestObject = new EchoMessageRequest();
@@ -93,13 +102,13 @@ void testSetRequiredFieldToNull() {
93102
pair.setKey(null);
94103
pair.setValue(null);
95104
}
96-
105+
97106
@Test
98107
void testInstantSerialization() {
99108
final MessageData data = new MessageData();
100109
final Instant someInstant = Instant.ofEpochSecond(1606173648);
101110
data.setTimeMessage(someInstant);
102-
111+
103112
final JSONObject jsonObject = new JSONObject(EchoTestRPCServiceModel.getInstance().toJsonString(data));
104113
final MessageData dataDeserialized = EchoTestRPCServiceModel.getInstance().fromJson(MessageData.class,
105114
jsonObject.toString().getBytes(StandardCharsets.UTF_8));
@@ -145,7 +154,7 @@ void testEnumSerializeDeserialize() {
145154
.fromJson(MessageData.class, obj.toString().getBytes(StandardCharsets.UTF_8));
146155
Assertions.assertTrue(data.equals(deserialized));
147156
}
148-
157+
149158
@Test
150159
void testDocumentNullSerialize() {
151160
final MessageData data = new MessageData();
@@ -157,17 +166,34 @@ void testDocumentNullSerialize() {
157166
docPart.put("null", null);
158167
docPart.put("nullStringValueLiteral", "null");
159168
data.setDocumentMessage(docPart);
160-
169+
161170
final JSONObject obj = new JSONObject(EchoTestRPCServiceModel.getInstance().toJsonString(data));
162171
final MessageData deserialized = EchoTestRPCServiceModel.getInstance()
163172
.fromJson(MessageData.class, obj.toString().getBytes(StandardCharsets.UTF_8));
164-
173+
165174
Assertions.assertTrue(data.equals(deserialized));
166175
//verifies that the null deserialized back
167176
Assertions.assertTrue(deserialized.getDocumentMessage().containsKey("null"));
168177
Assertions.assertFalse(deserialized.getDocumentMessage().containsKey("nullNotPresent"));
169178
}
170179

180+
@Test
181+
void testDocumentNullDeserialize() {
182+
final EchoMessageRequest data = new EchoMessageRequest();
183+
Map<String, Product> sTV = new HashMap<String, Product>();
184+
Product p = new Product();
185+
p.setPrice(1);
186+
// leaving product's name as null for previously found issue
187+
sTV.put("A", p);
188+
MessageData m = new MessageData();
189+
data.setMessage(m);
190+
m.setStringToValue(sTV);
191+
192+
final JSONObject obj = new JSONObject(EchoTestRPCServiceModel.getInstance().toJsonString(data));
193+
final EchoMessageRequest deserialized = EchoTestRPCServiceModel.getInstance()
194+
.fromJson(EchoMessageRequest.class, obj.toString().getBytes(StandardCharsets.UTF_8));
195+
}
196+
171197
@Test
172198
void testBadJsonDeserialize() {
173199
List<Header> headers = new ArrayList<>();

0 commit comments

Comments
 (0)