Skip to content
This repository was archived by the owner on May 28, 2025. It is now read-only.

Dynamodb kafka connector sanitise invalid field names #9

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

import java.time.Instant;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -62,6 +64,15 @@ public SourceRecord toSourceRecord(
String shardId,
String sequenceNumber) throws Exception {

// Sanitise the incoming attributes to remove any invalid Avro characters
final Map<String, AttributeValue> sanitisedAttributes = attributes.entrySet().stream()
.collect(Collectors.toMap(
e -> this.sanitiseAttributeName(e.getKey()),
Map.Entry::getValue,
(u, v) -> u,
LinkedHashMap::new
));

// Leveraging offsets to store shard and sequence number with each item pushed to Kafka.
// This info will only be used to update `shardRegister` and won't be used to reset state after restart
Map<String, Object> offsets = SourceInfo.toOffset(sourceInfo);
Expand All @@ -70,13 +81,13 @@ public SourceRecord toSourceRecord(

// DynamoDB keys can be changed only by recreating the table
if (keySchema == null) {
keys = tableDesc.getKeySchema().stream().map(KeySchemaElement::getAttributeName).collect(toList());
keys = tableDesc.getKeySchema().stream().map(this::sanitiseAttributeName).collect(toList());
keySchema = getKeySchema(keys);
}

Struct keyData = new Struct(getKeySchema(keys));
for (String key : keys) {
AttributeValue attributeValue = attributes.get(key);
AttributeValue attributeValue = sanitisedAttributes.get(key);
if (attributeValue.getS() != null) {
keyData.put(key, attributeValue.getS());
continue;
Expand All @@ -89,7 +100,7 @@ public SourceRecord toSourceRecord(

Struct valueData = new Struct(valueSchema)
.put(Envelope.FieldName.VERSION, sourceInfo.version)
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(attributes))
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(sanitisedAttributes))
.put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo))
.put(Envelope.FieldName.OPERATION, op.code())
.put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli());
Expand All @@ -113,4 +124,11 @@ private Schema getKeySchema(List<String> keys) {
return keySchemaBuilder.build();
}

private String sanitiseAttributeName(KeySchemaElement element) {
return this.sanitiseAttributeName(element.getAttributeName());
}

private String sanitiseAttributeName(final String attributeName) {
return attributeName.replaceAll("^[^a-zA-Z_]|(?<!^)[^a-zA-Z0-9_]", "_");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ private Map<String, AttributeValue> getAttributes() {
return attributes;
}

private Map<String, AttributeValue> getAttributesWithInvalidAvroCharacters() {
Map<String, AttributeValue> attributes = new HashMap<>();
attributes.put("test-1234", new AttributeValue().withS("testKV1Value"));
attributes.put("1-starts-with-number", new AttributeValue().withS("2"));
attributes.put("_starts_with_underscore", new AttributeValue().withN("1"));
attributes.put("test!@£$%^", new AttributeValue().withS("testStringValue"));

return attributes;
}



private SourceInfo getSourceInfo(String table) {
SourceInfo sourceInfo = new SourceInfo(table, Clock.fixed(Instant.parse("2001-01-02T00:00:00Z"), ZoneId.of("UTC")));
sourceInfo.initSyncStatus = InitSyncStatus.RUNNING;
Expand Down Expand Up @@ -191,6 +203,81 @@ public void recordAttributesAreAddedToValueData() throws Exception {
((Struct) record.value()).getString("document"));
}

@Test
public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws Exception {
// Arrange
List<KeySchemaElement> keySchema = new LinkedList<>();
keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234"));

RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-");

// Act
SourceRecord record = converter.toSourceRecord(
getSourceInfo(table),
Envelope.Operation.forCode("r"),
getAttributesWithInvalidAvroCharacters(),
Instant.parse("2001-01-02T00:00:00.00Z"),
"testShardID1",
"testSequenceNumberID1"
);

// Assert
assertEquals("test_1234", record.keySchema().fields().get(0).name());
assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(0).schema());
assertEquals("testKV1Value", ((Struct) record.key()).getString("test_1234"));
}

@Test
public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws Exception {
// Arrange
List<KeySchemaElement> keySchema = new LinkedList<>();
keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234"));
keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("1-starts-with-number"));

RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-");

// Act
SourceRecord record = converter.toSourceRecord(
getSourceInfo(table),
Envelope.Operation.forCode("r"),
getAttributesWithInvalidAvroCharacters(),
Instant.parse("2001-01-02T00:00:00.00Z"),
"testShardID1",
"testSequenceNumberID1"
);

// Assert
assertEquals("test_1234", record.keySchema().fields().get(0).name());
assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(0).schema());
assertEquals("testKV1Value", ((Struct) record.key()).getString("test_1234"));

assertEquals("__starts_with_number", record.keySchema().fields().get(1).name());
assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(1).schema());
assertEquals("2", ((Struct) record.key()).getString("__starts_with_number"));
}

@Test
public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidCharacters() throws Exception {
// Arrange
RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-");

// Act
SourceRecord record = converter.toSourceRecord(
getSourceInfo(table),
Envelope.Operation.forCode("r"),
getAttributesWithInvalidAvroCharacters(),
Instant.parse("2001-01-02T00:00:00.00Z"),
"testShardID1",
"testSequenceNumberID1"
);

String expected = "{\"test_1234\":{\"s\":\"testKV1Value\"},\"_starts_with_underscore\":{\"n\":\"1\"},\"__starts_with_number\":{\"s\":\"2\"},\"test______\":{\"s\":\"testStringValue\"}}";

// Assert
assertEquals(expected,
((Struct) record.value()).getString("document"));
}

@Test
public void sourceInfoIsAddedToValueData() throws Exception {
// Arrange
Expand Down