Skip to content

Commit 48f3040

Browse files
authored
Add topic map config (#1)
1 parent deadfd6 commit 48f3040

File tree

6 files changed

+71
-4
lines changed

6 files changed

+71
-4
lines changed

docs/options.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"resource.tagging.service.endpoint": "",
3333

3434
"kafka.topic.prefix": "dynamodb-",
35+
"kafka.topic.map": "",
3536
"tasks.max": "1",
3637

3738
"init.sync.delay.period": 60,
@@ -44,7 +45,9 @@
4445

4546
`dynamodb.table.ingestion.tag.key` - only tables marked with this tag key will be ingested.
4647

47-
`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}-{dynamodb-table-name}`
48+
`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}{dynamodb-table-name}`
49+
50+
`kafka.topic.map` - A JSON mapping between dynamodb table name and topic name. The topics will be named like `{prefix}{map[table-name]}`. If the map is not specified, the table name will be used.
4851

4952
`tasks.max` - **MUST** always exceed number of tables found for tracking. If max tasks count is lower then found tables count, no tasks will be started!
5053

source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ description = "Kafka Connect Source connector that reads from DynamoDB streams"
33
dependencies {
44
implementation 'com.google.code.gson:gson:2.8.2'
55
implementation 'com.amazonaws:aws-java-sdk-resourcegroupstaggingapi:1.11.551'
6+
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
67

78
compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}"
89
compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1'

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,14 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
5858
public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED";
5959

6060
public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix";
61-
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table.";
61+
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table by default.";
6262
public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix";
6363
public static final String DST_TOPIC_PREFIX_DEFAULT = "dynamodb-";
6464

65+
public static final String DST_TOPIC_NAMESPACE_MAP_CONFIG = "kafka.topic.namespace.map";
66+
public static final String DST_TOPIC_NAMESPACE_MAP_DOC = "Define Kafka topic namespace map.";
67+
public static final String DST_TOPIC_NAMESPACE_MAP_DISPLAY = "Topic namespace map";
68+
public static final String DST_TOPIC_NAMESPACE_MAP_DEFAULT = null;
6569

6670
public static final String REDISCOVERY_PERIOD_CONFIG = "connect.dynamodb.rediscovery.period";
6771
public static final String REDISCOVERY_PERIOD_DOC = "Time period in milliseconds to rediscover stream enabled DynamoDB tables";
@@ -189,6 +193,15 @@ public static ConfigDef baseConfigDef() {
189193
CONNECTOR_GROUP, 1,
190194
ConfigDef.Width.MEDIUM,
191195
DST_TOPIC_PREFIX_DISPLAY)
196+
197+
.define(DST_TOPIC_NAMESPACE_MAP_CONFIG,
198+
ConfigDef.Type.STRING,
199+
DST_TOPIC_NAMESPACE_MAP_DEFAULT,
200+
ConfigDef.Importance.HIGH,
201+
DST_TOPIC_NAMESPACE_MAP_DOC,
202+
CONNECTOR_GROUP, 2,
203+
ConfigDef.Width.LONG,
204+
DST_TOPIC_NAMESPACE_MAP_DISPLAY)
192205

193206
.define(SRC_INIT_SYNC_DELAY_CONFIG,
194207
ConfigDef.Type.INT,
@@ -249,6 +262,10 @@ public String getDestinationTopicPrefix() {
249262
return getString(DST_TOPIC_PREFIX_CONFIG);
250263
}
251264

265+
public String getDestinationTopicNamespaceMap() {
266+
return getString(DST_TOPIC_NAMESPACE_MAP_CONFIG);
267+
}
268+
252269
public long getRediscoveryPeriod() {
253270
return getLong(REDISCOVERY_PERIOD_CONFIG);
254271
}

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void start(Map<String, String> configProperties) {
134134
tableDesc.getTableName(),
135135
tableDesc.getProvisionedThroughput().getReadCapacityUnits());
136136
}
137-
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix());
137+
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix(), config.getDestinationTopicNamespaceMap());
138138

139139
LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName());
140140

source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
55
import com.amazonaws.services.dynamodbv2.model.TableDescription;
66
import com.fasterxml.jackson.annotation.JsonInclude;
7+
import com.fasterxml.jackson.core.type.TypeReference;
78
import com.fasterxml.jackson.databind.ObjectMapper;
89
import com.google.common.base.Strings;
910
import com.trustpilot.connector.dynamodb.Envelope;
@@ -22,6 +23,8 @@
2223

2324
import static java.util.stream.Collectors.toList;
2425

26+
import java.io.IOException;
27+
2528
/**
2629
* Takes in KCL event attributes and converts into Kafka Connect Source record.
2730
* With dynamic schema for key(based on actual DynamoDB table keys) and fixed schema for value.
@@ -44,8 +47,12 @@ public class RecordConverter {
4447
private List<String> keys;
4548

4649
public RecordConverter(TableDescription tableDesc, String topicNamePrefix) {
50+
this(tableDesc, topicNamePrefix, null);
51+
}
52+
53+
public RecordConverter(TableDescription tableDesc, String topicNamePrefix, String topicNamespaceMap) {
4754
this.tableDesc = tableDesc;
48-
this.topic_name = topicNamePrefix + tableDesc.getTableName();
55+
this.topic_name = topicNamePrefix + this.getTopicNameSuffix(topicNamespaceMap, tableDesc.getTableName());
4956

5057
valueSchema = SchemaBuilder.struct()
5158
.name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope"))
@@ -115,6 +122,26 @@ public SourceRecord toSourceRecord(
115122
);
116123
}
117124

125+
private String getTopicNameSuffix(String topicNamespaceMap, String tableName) {
126+
if (Strings.isNullOrEmpty(topicNamespaceMap)) {
127+
return tableName;
128+
}
129+
130+
ObjectMapper objectMapper = new ObjectMapper();
131+
try {
132+
LinkedHashMap<String, Object> map = objectMapper.readValue(topicNamespaceMap, new TypeReference<LinkedHashMap<String, Object>>() {});
133+
134+
if (map.containsKey(tableName)) {
135+
return (String) map.get(tableName);
136+
}
137+
138+
} catch (IOException e) {
139+
throw new IllegalArgumentException("Invalid topicNamespaceMap: " + topicNamespaceMap);
140+
}
141+
142+
return tableName;
143+
}
144+
118145
private Schema getKeySchema(List<String> keys) {
119146
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct()
120147
.name(SchemaNameAdjuster.DEFAULT.adjust(topic_name + ".Key"));

source/src/test/java/com/trustpilot/connector/dynamodb/utils/RecordConverterTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,25 @@ public void correctTopicNameIsConstructed() throws Exception {
9191
assertEquals("TestTopicPrefix-TestTable1", record.topic());
9292
}
9393

94+
@Test
95+
public void correctTopicNameIsConstructedWithTopicNamespaceMapExact() throws Exception {
96+
// Arrange
97+
RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-", "{\"TestTable1\":\"TestTopic1\"}");
98+
99+
// Act
100+
SourceRecord record = converter.toSourceRecord(
101+
getSourceInfo(table),
102+
Envelope.Operation.forCode("r"),
103+
getAttributes(),
104+
Instant.parse("2001-01-02T00:00:00.00Z"),
105+
"testShardID1",
106+
"testSequenceNumberID1"
107+
);
108+
109+
// Assert
110+
assertEquals("TestTopicPrefix-TestTopic1", record.topic());
111+
}
112+
94113
@Test
95114
public void sourceInfoIsPutToOffset() throws Exception {
96115
// Arrange

0 commit comments

Comments
 (0)