20
20
import java .util .List ;
21
21
import java .util .Map ;
22
22
import java .util .stream .Collectors ;
23
+ import java .util .Optional ;
23
24
24
25
import static java .util .stream .Collectors .toList ;
25
26
@@ -41,7 +42,7 @@ public class RecordConverter {
41
42
42
43
private final TableDescription tableDesc ;
43
44
private final String topic_name ;
44
- private Schema keySchema ;
45
+ private final Schema keySchema ;
45
46
private final Schema valueSchema ;
46
47
47
48
private List <String > keys ;
@@ -54,13 +55,18 @@ public RecordConverter(TableDescription tableDesc, String topicNamePrefix, Strin
54
55
this .tableDesc = tableDesc ;
55
56
this .topic_name = topicNamePrefix + this .getTopicNameSuffix (topicNamespaceMap , tableDesc .getTableName ());
56
57
57
- valueSchema = SchemaBuilder .struct ()
58
+ this .keys = tableDesc .getKeySchema ().stream ().map (this ::sanitiseAttributeName ).collect (toList ());
59
+ this .keySchema = getKeySchema (keys );
60
+ this .valueSchema = SchemaBuilder .struct ()
58
61
.name (SchemaNameAdjuster .DEFAULT .adjust ( "com.trustpilot.connector.dynamodb.envelope" ))
59
62
.field (Envelope .FieldName .VERSION , Schema .STRING_SCHEMA )
63
+ .field (Envelope .FieldName .EVENT_ID , Schema .OPTIONAL_STRING_SCHEMA )
60
64
.field (Envelope .FieldName .DOCUMENT , DynamoDbJson .schema ())
65
+ .field (Envelope .FieldName .OLD_DOCUMENT , DynamoDbJson .optionalSchema ())
61
66
.field (Envelope .FieldName .SOURCE , SourceInfo .structSchema ())
62
67
.field (Envelope .FieldName .OPERATION , Schema .STRING_SCHEMA )
63
68
.field (Envelope .FieldName .TIMESTAMP , Schema .INT64_SCHEMA )
69
+ .field (Envelope .FieldName .KEY , this .keySchema )
64
70
.build ();
65
71
}
66
72
@@ -70,6 +76,19 @@ public SourceRecord toSourceRecord(
70
76
Map <String , AttributeValue > attributes ,
71
77
Instant arrivalTimestamp ,
72
78
String shardId ,
79
+ String sequenceNumber ) throws Exception {
80
+
81
+ return toSourceRecord (sourceInfo , op , null , attributes , null , arrivalTimestamp , shardId , sequenceNumber );
82
+ }
83
+
84
+ public SourceRecord toSourceRecord (
85
+ SourceInfo sourceInfo ,
86
+ Envelope .Operation op ,
87
+ String eventId ,
88
+ Map <String , AttributeValue > attributes ,
89
+ Map <String , AttributeValue > oldAttributes ,
90
+ Instant arrivalTimestamp ,
91
+ String shardId ,
73
92
String sequenceNumber ) throws Exception {
74
93
75
94
// Sanitise the incoming attributes to remove any invalid Avro characters
@@ -81,18 +100,13 @@ public SourceRecord toSourceRecord(
81
100
LinkedHashMap ::new
82
101
));
83
102
103
+
84
104
// Leveraging offsets to store shard and sequence number with each item pushed to Kafka.
85
105
// This info will only be used to update `shardRegister` and won't be used to reset state after restart
86
106
Map <String , Object > offsets = SourceInfo .toOffset (sourceInfo );
87
107
offsets .put (SHARD_ID , shardId );
88
108
offsets .put (SHARD_SEQUENCE_NO , sequenceNumber );
89
109
90
- // DynamoDB keys can be changed only by recreating the table
91
- if (keySchema == null ) {
92
- keys = tableDesc .getKeySchema ().stream ().map (this ::sanitiseAttributeName ).collect (toList ());
93
- keySchema = getKeySchema (keys );
94
- }
95
-
96
110
Struct keyData = new Struct (getKeySchema (keys ));
97
111
for (String key : keys ) {
98
112
AttributeValue attributeValue = sanitisedAttributes .get (key );
@@ -111,7 +125,23 @@ public SourceRecord toSourceRecord(
111
125
.put (Envelope .FieldName .DOCUMENT , objectMapper .writeValueAsString (sanitisedAttributes ))
112
126
.put (Envelope .FieldName .SOURCE , SourceInfo .toStruct (sourceInfo ))
113
127
.put (Envelope .FieldName .OPERATION , op .code ())
114
- .put (Envelope .FieldName .TIMESTAMP , arrivalTimestamp .toEpochMilli ());
128
+ .put (Envelope .FieldName .TIMESTAMP , arrivalTimestamp .toEpochMilli ())
129
+ .put (Envelope .FieldName .KEY , keyData );
130
+
131
+
132
+ if (eventId != null ) {
133
+ valueData = valueData .put (Envelope .FieldName .EVENT_ID , eventId );
134
+ }
135
+ if (oldAttributes != null ) {
136
+ Map <String , AttributeValue > sanitisedOldAttributes = oldAttributes .entrySet ().stream ()
137
+ .collect (Collectors .toMap (
138
+ e -> this .sanitiseAttributeName (e .getKey ()),
139
+ Map .Entry ::getValue ,
140
+ (u , v ) -> u ,
141
+ LinkedHashMap ::new
142
+ ));
143
+ valueData = valueData .put (Envelope .FieldName .OLD_DOCUMENT , objectMapper .writeValueAsString (sanitisedOldAttributes ));
144
+ }
115
145
116
146
return new SourceRecord (
117
147
Collections .singletonMap ("table_name" , sourceInfo .tableName ),
0 commit comments