Skip to content

Commit ea3d032

Browse files
changes to version (#7)
1 parent daa66a4 commit ea3d032

File tree

9 files changed

+44
-8
lines changed

9 files changed

+44
-8
lines changed

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class DynamoDBSourceConnector extends SourceConnector {
3030
private TablesProvider tablesProvider;
3131

3232
private List<String> consumableTables;
33+
private String tableVersion;
3334

3435
private volatile Timer timer;
3536

@@ -76,7 +77,7 @@ public void start(Map<String, String> properties) {
7677
config.getSrcDynamoDBEnvTagValue());
7778
}
7879
}
79-
80+
tableVersion = config.getTableVersion();
8081
startBackgroundReconfigurationTasks(this.context, config.getRediscoveryPeriod());
8182
}
8283

@@ -135,7 +136,9 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
135136
Map<String, String> taskProps = new HashMap<>(configProperties);
136137

137138
taskProps.put(DynamoDBSourceTaskConfig.TABLE_NAME_CONFIG, table);
138-
139+
if (tableVersion != null && !tableVersion.isEmpty()) {
140+
taskProps.put(DynamoDBSourceTaskConfig.TABLE_VERSION_CONFIG, tableVersion);
141+
}
139142
// In feature we might allow having more then one task per table for performance reasons.
140143
// TaskID will be needed for KCL worker identifiers and also to orchestrate init sync.
141144
taskProps.put(DynamoDBSourceTaskConfig.TASK_ID_CONFIG, "task-1");

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
5252
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY = "Tables whitelist";
5353
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT = null;
5454

55+
public static final String SRC_DYNAMODB_TABLE_VERSION_CONFIG = "dynamodb.table.version";
56+
public static final String SRC_DYNAMODB_TABLE_VERSION_DOC = "Define version of the table. This is used to create a unique table connector name for the table.";
57+
public static final String SRC_DYNAMODB_TABLE_VERSION_DISPLAY = "Table version";
58+
public static final String SRC_DYNAMODB_TABLE_VERSION_DEFAULT = "";
59+
5560
public static final String SRC_KCL_TABLE_BILLING_MODE_CONFIG = "kcl.table.billing.mode";
5661
public static final String SRC_KCL_TABLE_BILLING_MODE_DOC = "Define billing mode for internal table created by the KCL library. Default is provisioned.";
5762
public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode";
@@ -176,6 +181,7 @@ public static ConfigDef baseConfigDef() {
176181
ConfigDef.Width.MEDIUM,
177182
SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY)
178183

184+
179185
.define(SRC_KCL_TABLE_BILLING_MODE_CONFIG,
180186
ConfigDef.Type.STRING,
181187
SRC_KCL_TABLE_BILLING_MODE_DEFAULT,
@@ -185,6 +191,15 @@ public static ConfigDef baseConfigDef() {
185191
ConfigDef.Width.MEDIUM,
186192
SRC_KCL_TABLE_BILLING_MODE_DISPLAY)
187193

194+
.define(SRC_DYNAMODB_TABLE_VERSION_CONFIG,
195+
ConfigDef.Type.STRING,
196+
SRC_DYNAMODB_TABLE_VERSION_DEFAULT,
197+
ConfigDef.Importance.LOW,
198+
SRC_DYNAMODB_TABLE_VERSION_DOC,
199+
AWS_GROUP, 10,
200+
ConfigDef.Width.MEDIUM,
201+
SRC_DYNAMODB_TABLE_VERSION_DISPLAY)
202+
188203
.define(DST_TOPIC_PREFIX_CONFIG,
189204
ConfigDef.Type.STRING,
190205
DST_TOPIC_PREFIX_DEFAULT,
@@ -286,6 +301,10 @@ public List<String> getWhitelistTables() {
286301
return getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) != null ? getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) : null;
287302
}
288303

304+
public String getTableVersion() {
305+
return getString(SRC_DYNAMODB_TABLE_VERSION_CONFIG);
306+
}
307+
289308
public BillingMode getKCLTableBillingMode() {
290309
return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG));
291310
}

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public void start(Map<String, String> configProperties) {
150150
eventsQueue,
151151
shardRegister);
152152
}
153-
kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode());
153+
kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getTableVersion(), config.getKCLTableBillingMode());
154154

155155
shutdown = false;
156156
}

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ public class DynamoDBSourceTaskConfig extends DynamoDBSourceConnectorConfig {
88
public static final String TABLE_NAME_CONFIG = "table";
99
private static final String TABLE_NAME_DOC = "table for this task to watch for changes.";
1010

11+
public static final String TABLE_VERSION_CONFIG = "table.version";
12+
private static final String TABLE_VERSION_DOC = "table version for this task to watch for changes.";
13+
1114
public static final String TASK_ID_CONFIG = "task.id";
1215
private static final String TASK_ID_DOC = "Per table id of the task.";
1316

1417
private static final ConfigDef config = baseConfigDef()
1518
.define(TABLE_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TABLE_NAME_DOC)
16-
.define(TASK_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TASK_ID_DOC);
19+
.define(TASK_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TASK_ID_DOC)
20+
.define(TABLE_VERSION_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, TABLE_VERSION_DOC);
1721

1822
public DynamoDBSourceTaskConfig(Map<String, String> props) {
1923
super(config, props);
@@ -23,6 +27,10 @@ public String getTableName() {
2327
return getString(TABLE_NAME_CONFIG);
2428
}
2529

30+
public String getTableVersion() {
31+
return getString(TABLE_VERSION_CONFIG);
32+
}
33+
2634
public String getTaskID() {
2735
return getString(TASK_ID_CONFIG);
2836
}

source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,13 +114,11 @@ public static Map<String, Object> toOffset(SourceInfo sourceInfo) {
114114
offset.put(INIT_SYNC_START, sourceInfo.lastInitSyncStart.toEpochMilli());
115115

116116
if (sourceInfo.exclusiveStartKey != null) {
117-
if (sourceInfo.exclusiveStartKey != null) {
118117
try {
119118
offset.put(EXCLUSIVE_START_KEY, objectMapper.writeValueAsString(sourceInfo.exclusiveStartKey));
120119
} catch (JsonProcessingException e) {
121120
throw new RuntimeException("Failed to serialize exclusiveStartKey", e);
122121
}
123-
}
124122
// offset.put(EXCLUSIVE_START_KEY, gson.toJson(sourceInfo.exclusiveStartKey));
125123
}
126124

source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ void start(AmazonDynamoDB dynamoDBClient,
1010
String tableName,
1111
String taskid,
1212
String endpoint,
13+
String tableVersion,
1314
BillingMode kclTablebillingMode);
1415

1516
void stop();

source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,14 @@ public void start(AmazonDynamoDB dynamoDBClient,
4949
String tableName,
5050
String taskid,
5151
String endpoint,
52+
String tableVersion,
5253
BillingMode kclTableBillingMode) {
5354
IRecordProcessorFactory recordProcessorFactory = new KclRecordProcessorFactory(tableName, eventsQueue,
5455
recordProcessorsRegister);
5556

5657
KinesisClientLibConfiguration clientLibConfiguration = getClientLibConfiguration(tableName,
5758
taskid,
58-
dynamoDBClient, endpoint, kclTableBillingMode);
59+
dynamoDBClient, endpoint, tableVersion, kclTableBillingMode);
5960

6061
AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient);
6162

@@ -123,12 +124,17 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName,
123124
String taskid,
124125
AmazonDynamoDB dynamoDBClient,
125126
String endpoint,
127+
String tableVersion,
126128
BillingMode kclTableBillingMode) {
127129

128130
String streamArn = dynamoDBClient.describeTable(
129131
new DescribeTableRequest()
130132
.withTableName(tableName)).getTable().getLatestStreamArn();
131133

134+
if (tableVersion != null && !tableVersion.isEmpty()) {
135+
tableName = tableName + "-" + tableVersion;
136+
}
137+
132138
return new KinesisClientLibConfiguration(
133139
Constants.KCL_WORKER_APPLICATION_NAME_PREFIX + tableName,
134140
streamArn,

source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ public void kclWorkerIsStartedOnStart() throws InterruptedException {
208208
eq(tableName),
209209
eq("testTask1"),
210210
eq(null),
211+
eq(""),
211212
eq(BillingMode.PROVISIONED)
212213
);
213214
}

source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ void initializationRegistersNewShardToRegistry() {
4343
when(dynamoDBClient.describeTable(ArgumentMatchers.<DescribeTableRequest>any())).thenReturn(result);
4444

4545
// Act
46-
KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, kclTableBillingMode);
46+
KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, "", kclTableBillingMode);
4747

4848
// Assert
4949
assertEquals("datalake-KCL-testTableName1", clientLibConfiguration.getApplicationName());

0 commit comments

Comments
 (0)