Skip to content

chore: Add topic map config #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"resource.tagging.service.endpoint": "",

"kafka.topic.prefix": "dynamodb-",
"kafka.topic.map": "",
"tasks.max": "1",

"init.sync.delay.period": 60,
Expand All @@ -44,7 +45,9 @@

`dynamodb.table.ingestion.tag.key` - only tables marked with this tag key will be ingested.

`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}-{dynamodb-table-name}`
`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}{dynamodb-table-name}`

`kafka.topic.map` - A JSON mapping between dynamodb table name and topic name. The topics will be named like `{prefix}{map[table-name]}`. If the map is not specified, the table name will be used.

`tasks.max` - **MUST** always exceed number of tables found for tracking. If max tasks count is lower then found tables count, no tasks will be started!

Expand Down
1 change: 1 addition & 0 deletions source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ description = "Kafka Connect Source connector that reads from DynamoDB streams"
dependencies {
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.amazonaws:aws-java-sdk-resourcegroupstaggingapi:1.11.551'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'

compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}"
compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED";

public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix";
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table.";
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table by default.";
public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix";
public static final String DST_TOPIC_PREFIX_DEFAULT = "dynamodb-";

public static final String DST_TOPIC_NAMESPACE_MAP_CONFIG = "kafka.topic.namespace.map";
public static final String DST_TOPIC_NAMESPACE_MAP_DOC = "Define Kafka topic namespace map.";
public static final String DST_TOPIC_NAMESPACE_MAP_DISPLAY = "Topic namespace map";
public static final String DST_TOPIC_NAMESPACE_MAP_DEFAULT = null;

public static final String REDISCOVERY_PERIOD_CONFIG = "connect.dynamodb.rediscovery.period";
public static final String REDISCOVERY_PERIOD_DOC = "Time period in milliseconds to rediscover stream enabled DynamoDB tables";
Expand Down Expand Up @@ -189,6 +193,15 @@ public static ConfigDef baseConfigDef() {
CONNECTOR_GROUP, 1,
ConfigDef.Width.MEDIUM,
DST_TOPIC_PREFIX_DISPLAY)

.define(DST_TOPIC_NAMESPACE_MAP_CONFIG,
ConfigDef.Type.STRING,
DST_TOPIC_NAMESPACE_MAP_DEFAULT,
ConfigDef.Importance.HIGH,
DST_TOPIC_NAMESPACE_MAP_DOC,
CONNECTOR_GROUP, 2,
ConfigDef.Width.LONG,
DST_TOPIC_NAMESPACE_MAP_DISPLAY)

.define(SRC_INIT_SYNC_DELAY_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -249,6 +262,10 @@ public String getDestinationTopicPrefix() {
return getString(DST_TOPIC_PREFIX_CONFIG);
}

public String getDestinationTopicNamespaceMap() {
return getString(DST_TOPIC_NAMESPACE_MAP_CONFIG);
}

public long getRediscoveryPeriod() {
return getLong(REDISCOVERY_PERIOD_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void start(Map<String, String> configProperties) {
tableDesc.getTableName(),
tableDesc.getProvisionedThroughput().getReadCapacityUnits());
}
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix());
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix(), config.getDestinationTopicNamespaceMap());

LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.trustpilot.connector.dynamodb.Envelope;
Expand All @@ -22,6 +23,8 @@

import static java.util.stream.Collectors.toList;

import java.io.IOException;

/**
* Takes in KCL event attributes and converts into Kafka Connect Source record.
* With dynamic schema for key(based on actual DynamoDB table keys) and fixed schema for value.
Expand All @@ -44,8 +47,12 @@ public class RecordConverter {
private List<String> keys;

public RecordConverter(TableDescription tableDesc, String topicNamePrefix) {
this(tableDesc, topicNamePrefix, null);
}

public RecordConverter(TableDescription tableDesc, String topicNamePrefix, String topicNamespaceMap) {
this.tableDesc = tableDesc;
this.topic_name = topicNamePrefix + tableDesc.getTableName();
this.topic_name = topicNamePrefix + this.getTopicNameSuffix(topicNamespaceMap, tableDesc.getTableName());

valueSchema = SchemaBuilder.struct()
.name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope"))
Expand Down Expand Up @@ -115,6 +122,26 @@ public SourceRecord toSourceRecord(
);
}

private String getTopicNameSuffix(String topicNamespaceMap, String tableName) {
if (Strings.isNullOrEmpty(topicNamespaceMap)) {
return tableName;
}

ObjectMapper objectMapper = new ObjectMapper();
try {
LinkedHashMap<String, Object> map = objectMapper.readValue(topicNamespaceMap, new TypeReference<LinkedHashMap<String, Object>>() {});

if (map.containsKey(tableName)) {
return (String) map.get(tableName);
}

} catch (IOException e) {
throw new IllegalArgumentException("Invalid topicNamespaceMap: " + topicNamespaceMap);
}

return tableName;
}

private Schema getKeySchema(List<String> keys) {
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct()
.name(SchemaNameAdjuster.DEFAULT.adjust(topic_name + ".Key"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,25 @@ public void correctTopicNameIsConstructed() throws Exception {
assertEquals("TestTopicPrefix-TestTable1", record.topic());
}

@Test
public void correctTopicNameIsConstructedWithTopicNamespaceMapExact() throws Exception {
// Arrange
RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-", "{\"TestTable1\":\"TestTopic1\"}");

// Act
SourceRecord record = converter.toSourceRecord(
getSourceInfo(table),
Envelope.Operation.forCode("r"),
getAttributes(),
Instant.parse("2001-01-02T00:00:00.00Z"),
"testShardID1",
"testSequenceNumberID1"
);

// Assert
assertEquals("TestTopicPrefix-TestTopic1", record.topic());
}

@Test
public void sourceInfoIsPutToOffset() throws Exception {
// Arrange
Expand Down