Skip to content

Commit 255dca5

Browse files
dariogavranovicRabi Shanker GuhaNandy-006hardik-devrevdivanshu-aggarwal
authored
feat: merge all devrev features into main (#11)
* Add the oldDocument in field value when present * Add EventID and Key inside value * Make logs debug to reduce noise (#3) * move some info logs to debug * make info * use jackson instead of gson (#5) * delay log warning to 60 minutes (#6) * changes to version (#7) * Add flag to disable init sync (#8) * add initial value (#9) --------- Co-authored-by: Rabi Shanker Guha <[email protected]> Co-authored-by: Nandan H R <[email protected]> Co-authored-by: hardik-devrev <[email protected]> Co-authored-by: Divanshu Aggarwal <[email protected]>
1 parent b0a363b commit 255dca5

15 files changed

+169
-44
lines changed

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class DynamoDBSourceConnector extends SourceConnector {
3030
private TablesProvider tablesProvider;
3131

3232
private List<String> consumableTables;
33+
private String tableVersion;
3334

3435
private volatile Timer timer;
3536

@@ -76,7 +77,7 @@ public void start(Map<String, String> properties) {
7677
config.getSrcDynamoDBEnvTagValue());
7778
}
7879
}
79-
80+
tableVersion = config.getTableVersion();
8081
startBackgroundReconfigurationTasks(this.context, config.getRediscoveryPeriod());
8182
}
8283

@@ -90,10 +91,10 @@ private void startBackgroundReconfigurationTasks(ConnectorContext connectorConte
9091
public void run() {
9192
try {
9293
if (consumableTables != null) {
93-
LOGGER.info("Looking for changed DynamoDB tables");
94+
LOGGER.debug("Looking for changed DynamoDB tables");
9495
List<String> consumableTablesRefreshed = tablesProvider.getConsumableTables();
9596
if (!consumableTables.equals(consumableTablesRefreshed)) {
96-
LOGGER.info("Detected changes in DynamoDB tables. Requesting tasks reconfiguration.");
97+
LOGGER.debug("Detected changes in DynamoDB tables. Requesting tasks reconfiguration.");
9798
connectorContext.requestTaskReconfiguration();
9899
}
99100
}
@@ -131,11 +132,13 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
131132

132133
List<Map<String, String>> taskConfigs = new ArrayList<>(consumableTables.size());
133134
for (String table : consumableTables) {
134-
LOGGER.info("Configuring task for table {}", table);
135+
LOGGER.debug("Configuring task for table {}", table);
135136
Map<String, String> taskProps = new HashMap<>(configProperties);
136137

137138
taskProps.put(DynamoDBSourceTaskConfig.TABLE_NAME_CONFIG, table);
138-
139+
if (tableVersion != null && !tableVersion.isEmpty()) {
140+
taskProps.put(DynamoDBSourceTaskConfig.TABLE_VERSION_CONFIG, tableVersion);
141+
}
139142
// In feature we might allow having more then one task per table for performance reasons.
140143
// TaskID will be needed for KCL worker identifiers and also to orchestrate init sync.
141144
taskProps.put(DynamoDBSourceTaskConfig.TASK_ID_CONFIG, "task-1");

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
1717
public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay";
1818
public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60;
1919

20+
public static final String SRC_INIT_SYNC_ENABLE_CONFIG = "init.sync.enable";
21+
public static final String SRC_INIT_SYNC_ENABLE_DOC = "Define if INIT_SYNC should be enabled.";
22+
public static final String SRC_INIT_SYNC_ENABLE_DISPLAY = "INIT_SYNC enable";
23+
public static final boolean SRC_INIT_SYNC_ENABLE_DEFAULT = false;
24+
2025
public static final String AWS_REGION_CONFIG = "aws.region";
2126
public static final String AWS_REGION_DOC = "Define AWS region.";
2227
public static final String AWS_REGION_DISPLAY = "Region";
@@ -52,6 +57,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
5257
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY = "Tables whitelist";
5358
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT = null;
5459

60+
public static final String SRC_DYNAMODB_TABLE_VERSION_CONFIG = "dynamodb.table.version";
61+
public static final String SRC_DYNAMODB_TABLE_VERSION_DOC = "Define version of the table. This is used to create a unique table connector name for the table.";
62+
public static final String SRC_DYNAMODB_TABLE_VERSION_DISPLAY = "Table version";
63+
public static final String SRC_DYNAMODB_TABLE_VERSION_DEFAULT = "";
64+
5565
public static final String SRC_KCL_TABLE_BILLING_MODE_CONFIG = "kcl.table.billing.mode";
5666
public static final String SRC_KCL_TABLE_BILLING_MODE_DOC = "Define billing mode for internal table created by the KCL library. Default is provisioned.";
5767
public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode";
@@ -176,6 +186,7 @@ public static ConfigDef baseConfigDef() {
176186
ConfigDef.Width.MEDIUM,
177187
SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY)
178188

189+
179190
.define(SRC_KCL_TABLE_BILLING_MODE_CONFIG,
180191
ConfigDef.Type.STRING,
181192
SRC_KCL_TABLE_BILLING_MODE_DEFAULT,
@@ -185,6 +196,15 @@ public static ConfigDef baseConfigDef() {
185196
ConfigDef.Width.MEDIUM,
186197
SRC_KCL_TABLE_BILLING_MODE_DISPLAY)
187198

199+
.define(SRC_DYNAMODB_TABLE_VERSION_CONFIG,
200+
ConfigDef.Type.STRING,
201+
SRC_DYNAMODB_TABLE_VERSION_DEFAULT,
202+
ConfigDef.Importance.LOW,
203+
SRC_DYNAMODB_TABLE_VERSION_DOC,
204+
AWS_GROUP, 10,
205+
ConfigDef.Width.MEDIUM,
206+
SRC_DYNAMODB_TABLE_VERSION_DISPLAY)
207+
188208
.define(DST_TOPIC_PREFIX_CONFIG,
189209
ConfigDef.Type.STRING,
190210
DST_TOPIC_PREFIX_DEFAULT,
@@ -208,16 +228,25 @@ public static ConfigDef baseConfigDef() {
208228
SRC_INIT_SYNC_DELAY_DEFAULT,
209229
ConfigDef.Importance.LOW,
210230
SRC_INIT_SYNC_DELAY_DOC,
211-
CONNECTOR_GROUP, 2,
231+
CONNECTOR_GROUP, 4,
212232
ConfigDef.Width.MEDIUM,
213233
SRC_INIT_SYNC_DELAY_DISPLAY)
214234

235+
.define(SRC_INIT_SYNC_ENABLE_CONFIG,
236+
ConfigDef.Type.BOOLEAN,
237+
SRC_INIT_SYNC_ENABLE_DEFAULT,
238+
ConfigDef.Importance.LOW,
239+
SRC_INIT_SYNC_ENABLE_DOC,
240+
CONNECTOR_GROUP, 3,
241+
ConfigDef.Width.MEDIUM,
242+
SRC_INIT_SYNC_ENABLE_DISPLAY)
243+
215244
.define(REDISCOVERY_PERIOD_CONFIG,
216245
ConfigDef.Type.LONG,
217246
REDISCOVERY_PERIOD_DEFAULT,
218247
ConfigDef.Importance.LOW,
219248
REDISCOVERY_PERIOD_DOC,
220-
CONNECTOR_GROUP, 4,
249+
CONNECTOR_GROUP, 5,
221250
ConfigDef.Width.MEDIUM,
222251
REDISCOVERY_PERIOD_DISPLAY)
223252
;
@@ -274,6 +303,10 @@ public int getInitSyncDelay() {
274303
return (int)get(SRC_INIT_SYNC_DELAY_CONFIG);
275304
}
276305

306+
public boolean getInitSyncEnable() {
307+
return getBoolean(SRC_INIT_SYNC_ENABLE_CONFIG);
308+
}
309+
277310
public String getDynamoDBServiceEndpoint() {
278311
return getString(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG);
279312
}
@@ -286,6 +319,10 @@ public List<String> getWhitelistTables() {
286319
return getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) != null ? getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) : null;
287320
}
288321

322+
public String getTableVersion() {
323+
return getString(SRC_DYNAMODB_TABLE_VERSION_CONFIG);
324+
}
325+
289326
public BillingMode getKCLTableBillingMode() {
290327
return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG));
291328
}

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask {
8989
private SourceInfo sourceInfo;
9090
private TableDescription tableDesc;
9191
private int initSyncDelay;
92+
private boolean initSyncEnabled = false;
9293

9394
@SuppressWarnings("unused")
9495
//Used by Confluent platform to initialize connector
@@ -110,7 +111,7 @@ public String version() {
110111
public void start(Map<String, String> configProperties) {
111112

112113
DynamoDBSourceTaskConfig config = new DynamoDBSourceTaskConfig(configProperties);
113-
LOGGER.info("Starting task for table: {}", config.getTableName());
114+
LOGGER.debug("Starting task for table: {}", config.getTableName());
114115

115116
LOGGER.debug("Getting DynamoDB description for table: {}", config.getTableName());
116117
if (client == null) {
@@ -123,10 +124,11 @@ public void start(Map<String, String> configProperties) {
123124
tableDesc = client.describeTable(config.getTableName()).getTable();
124125

125126
initSyncDelay = config.getInitSyncDelay();
127+
initSyncEnabled = config.getInitSyncEnable();
126128

127129
LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName());
128130
setStateFromOffset();
129-
LOGGER.info("Task status: {}", sourceInfo);
131+
LOGGER.debug("Task status: {}", sourceInfo);
130132

131133
LOGGER.debug("Initiating DynamoDB table scanner and record converter.");
132134
if (tableScanner == null) {
@@ -136,7 +138,7 @@ public void start(Map<String, String> configProperties) {
136138
}
137139
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix(), config.getDestinationTopicMap());
138140

139-
LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName());
141+
LOGGER.debug("Starting background KCL worker thread for table: {}", tableDesc.getTableName());
140142

141143
AmazonDynamoDBStreams dynamoDBStreamsClient = AwsClients.buildDynamoDbStreamsClient(
142144
config.getAwsRegion(),
@@ -150,7 +152,7 @@ public void start(Map<String, String> configProperties) {
150152
eventsQueue,
151153
shardRegister);
152154
}
153-
kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode());
155+
kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getTableVersion(), config.getKCLTableBillingMode());
154156

155157
shutdown = false;
156158
}
@@ -163,7 +165,11 @@ private void setStateFromOffset() {
163165
} else {
164166
LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName());
165167
sourceInfo = new SourceInfo(tableDesc.getTableName(), clock);
166-
sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table
168+
if (initSyncEnabled) {
169+
sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table
170+
} else{
171+
sourceInfo.initSyncStatus = InitSyncStatus.FINISHED;
172+
}
167173
}
168174
}
169175

@@ -225,7 +231,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {
225231
Thread.sleep(initSyncDelay * 1000);
226232
}
227233

228-
LOGGER.info("Continuing INIT_SYNC {}", sourceInfo);
234+
LOGGER.debug("Continuing INIT_SYNC {}", sourceInfo);
229235
ScanResult scanResult = tableScanner.getItems(sourceInfo.exclusiveStartKey);
230236

231237
LinkedList<SourceRecord> result = new LinkedList<>();
@@ -261,7 +267,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {
261267

262268

263269
if (sourceInfo.initSyncStatus == InitSyncStatus.RUNNING) {
264-
LOGGER.info(
270+
LOGGER.debug(
265271
"INIT_SYNC iteration returned {}. Status: {}", result.size(), sourceInfo);
266272
} else {
267273
LOGGER.info("INIT_SYNC FINISHED: {}", sourceInfo);
@@ -296,7 +302,7 @@ private List<SourceRecord> sync() throws Exception {
296302
//
297303
// NOTE2: KCL worker reads from multiple shards at the same time in a loop.
298304
// Which means that there can be messages from various time instances (before and after init sync start instance).
299-
if (isPreInitSyncRecord(arrivalTimestamp)) {
305+
if (initSyncEnabled && isPreInitSyncRecord(arrivalTimestamp)) {
300306
LOGGER.debug(
301307
"Dropping old record to prevent another INIT_SYNC. ShardId: {} " +
302308
"ApproximateArrivalTimestamp: {} CurrentTime: {}",
@@ -315,7 +321,7 @@ private List<SourceRecord> sync() throws Exception {
315321
// * connector was down for some time
316322
// * connector is lagging
317323
// * connector failed to finish init sync in acceptable time frame
318-
if (recordIsInDangerZone(arrivalTimestamp)) {
324+
if (initSyncEnabled && recordIsInDangerZone(arrivalTimestamp)) {
319325
sourceInfo.startInitSync();
320326

321327
LOGGER.info(
@@ -334,6 +340,7 @@ private List<SourceRecord> sync() throws Exception {
334340
((RecordAdapter) record).getInternalObject();
335341

336342
Envelope.Operation op = getOperation(dynamoDbRecord.getEventName());
343+
String eventId = dynamoDbRecord.getEventID();
337344

338345
Map<String, AttributeValue> attributes;
339346
if (dynamoDbRecord.getDynamodb().getNewImage() != null) {
@@ -344,7 +351,9 @@ private List<SourceRecord> sync() throws Exception {
344351

345352
SourceRecord sourceRecord = converter.toSourceRecord(sourceInfo,
346353
op,
354+
eventId,
347355
attributes,
356+
dynamoDbRecord.getDynamodb().getOldImage(),
348357
arrivalTimestamp.toInstant(),
349358
dynamoDBRecords.getShardId(),
350359
record.getSequenceNumber());
@@ -440,4 +449,4 @@ ArrayBlockingQueue<KclRecordsWrapper> getEventsQueue() {
440449
SourceInfo getSourceInfo() {
441450
return sourceInfo;
442451
}
443-
}
452+
}

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ public class DynamoDBSourceTaskConfig extends DynamoDBSourceConnectorConfig {
88
public static final String TABLE_NAME_CONFIG = "table";
99
private static final String TABLE_NAME_DOC = "table for this task to watch for changes.";
1010

11+
public static final String TABLE_VERSION_CONFIG = "table.version";
12+
private static final String TABLE_VERSION_DOC = "table version for this task to watch for changes.";
13+
1114
public static final String TASK_ID_CONFIG = "task.id";
1215
private static final String TASK_ID_DOC = "Per table id of the task.";
1316

1417
private static final ConfigDef config = baseConfigDef()
1518
.define(TABLE_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TABLE_NAME_DOC)
16-
.define(TASK_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TASK_ID_DOC);
19+
.define(TASK_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TASK_ID_DOC)
20+
.define(TABLE_VERSION_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, TABLE_VERSION_DOC);
1721

1822
public DynamoDBSourceTaskConfig(Map<String, String> props) {
1923
super(config, props);
@@ -23,6 +27,10 @@ public String getTableName() {
2327
return getString(TABLE_NAME_CONFIG);
2428
}
2529

30+
public String getTableVersion() {
31+
return getString(TABLE_VERSION_CONFIG);
32+
}
33+
2634
public String getTaskID() {
2735
return getString(TASK_ID_CONFIG);
2836
}

source/src/main/java/com/trustpilot/connector/dynamodb/Envelope.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,18 @@ public static final class FieldName {
5656
* The {@code after} field is used to store the state of a record after an operation.
5757
*/
5858
public static final String DOCUMENT = "document";
59+
/**
60+
* The {@code before} field is used to store the state of a record before the operation.
61+
*/
62+
public static final String OLD_DOCUMENT = "old_document";
5963
/**
6064
* The {@code op} field is used to store the kind of operation on a record.
6165
*/
6266
public static final String OPERATION = "op";
67+
/**
68+
* The {@code event_id} field is used to store a globally unique identifier for the event that was recorded in this stream record.
69+
*/
70+
public static final String EVENT_ID = "event_id";
6371
/**
6472
* The {@code origin} field is used to store the information about the source of a record, including the
6573
* Kafka Connect partition and offset information.
@@ -73,6 +81,8 @@ public static final class FieldName {
7381
* variations.
7482
*/
7583
public static final String TIMESTAMP = "ts_ms";
84+
85+
public static final String KEY = "key";
7686
}
7787

7888
}

source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
44
import com.google.gson.Gson;
5-
import com.google.gson.reflect.TypeToken;
65
import com.trustpilot.connector.dynamodb.utils.SchemaNameAdjuster;
76
import org.apache.kafka.connect.data.Schema;
87
import org.apache.kafka.connect.data.SchemaBuilder;
98
import org.apache.kafka.connect.data.Struct;
9+
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import com.fasterxml.jackson.core.JsonProcessingException;
11+
import com.fasterxml.jackson.core.type.TypeReference;
1012

11-
import java.lang.reflect.Type;
1213
import java.time.Clock;
1314
import java.time.Instant;
1415
import java.util.LinkedHashMap;
@@ -24,13 +25,14 @@
2425
*/
2526
public class SourceInfo {
2627
private static final Gson gson = new Gson();
28+
private static final ObjectMapper objectMapper = new ObjectMapper();
2729
private final Clock clock;
2830

2931
public final String version;
3032
public final String tableName;
3133
public boolean initSync;
3234
public InitSyncStatus initSyncStatus = InitSyncStatus.UNDEFINED;
33-
public Instant lastInitSyncStart;
35+
public Instant lastInitSyncStart = Instant.now();
3436
public Instant lastInitSyncEnd = null;
3537
public long initSyncCount = 0L;
3638

@@ -112,7 +114,12 @@ public static Map<String, Object> toOffset(SourceInfo sourceInfo) {
112114
offset.put(INIT_SYNC_START, sourceInfo.lastInitSyncStart.toEpochMilli());
113115

114116
if (sourceInfo.exclusiveStartKey != null) {
115-
offset.put(EXCLUSIVE_START_KEY, gson.toJson(sourceInfo.exclusiveStartKey));
117+
try {
118+
offset.put(EXCLUSIVE_START_KEY, objectMapper.writeValueAsString(sourceInfo.exclusiveStartKey));
119+
} catch (JsonProcessingException e) {
120+
throw new RuntimeException("Failed to serialize exclusiveStartKey", e);
121+
}
122+
// offset.put(EXCLUSIVE_START_KEY, gson.toJson(sourceInfo.exclusiveStartKey));
116123
}
117124

118125
if (sourceInfo.lastInitSyncEnd != null) {
@@ -130,9 +137,14 @@ public static SourceInfo fromOffset(Map<String, Object> offset, Clock clock) {
130137
sourceInfo.lastInitSyncStart = Instant.ofEpochMilli((Long) offset.get(INIT_SYNC_START));
131138

132139
if (offset.containsKey(EXCLUSIVE_START_KEY)) {
133-
134-
Type empMapType = new TypeToken<Map<String, AttributeValue>>() {}.getType();
135-
sourceInfo.exclusiveStartKey = gson.fromJson((String)offset.get(EXCLUSIVE_START_KEY), empMapType);
140+
try {
141+
String json = (String) offset.get(EXCLUSIVE_START_KEY);
142+
sourceInfo.exclusiveStartKey = objectMapper.readValue(json, new TypeReference<Map<String, AttributeValue>>() {});
143+
} catch (JsonProcessingException e) {
144+
throw new RuntimeException("Failed to deserialize exclusiveStartKey", e);
145+
}
146+
// Type empMapType = new TypeToken<Map<String, AttributeValue>>() {}.getType();
147+
// sourceInfo.exclusiveStartKey = gson.fromJson((String)offset.get(EXCLUSIVE_START_KEY), empMapType);
136148
}
137149

138150
if (offset.containsKey(INIT_SYNC_END)) {

source/src/main/java/com/trustpilot/connector/dynamodb/aws/ConfigTablesProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public List<String> getConsumableTables() {
2828
final TableDescription tableDesc = client.describeTable(table).getTable();
2929

3030
if (this.hasValidConfig(tableDesc, table)) {
31-
LOGGER.info("Table to sync: {}", table);
31+
LOGGER.debug("Table to sync: {}", table);
3232
consumableTables.add(table);
3333
}
3434
}

0 commit comments

Comments
 (0)