Skip to content

Commit 11a0d79

Browse files
Add flag to disable init sync (#8)
1 parent ea3d032 commit 11a0d79

File tree

2 files changed

+29
-5
lines changed

2 files changed

+29
-5
lines changed

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
1717
public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay";
1818
public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60;
1919

20+
public static final String SRC_INIT_SYNC_ENABLE_CONFIG = "init.sync.enable";
21+
public static final String SRC_INIT_SYNC_ENABLE_DOC = "Define if INIT_SYNC should be enabled.";
22+
public static final String SRC_INIT_SYNC_ENABLE_DISPLAY = "INIT_SYNC enable";
23+
public static final boolean SRC_INIT_SYNC_ENABLE_DEFAULT = false;
24+
2025
public static final String AWS_REGION_CONFIG = "aws.region";
2126
public static final String AWS_REGION_DOC = "Define AWS region.";
2227
public static final String AWS_REGION_DISPLAY = "Region";
@@ -223,16 +228,25 @@ public static ConfigDef baseConfigDef() {
223228
SRC_INIT_SYNC_DELAY_DEFAULT,
224229
ConfigDef.Importance.LOW,
225230
SRC_INIT_SYNC_DELAY_DOC,
226-
CONNECTOR_GROUP, 2,
231+
CONNECTOR_GROUP, 4,
227232
ConfigDef.Width.MEDIUM,
228233
SRC_INIT_SYNC_DELAY_DISPLAY)
229234

235+
.define(SRC_INIT_SYNC_ENABLE_CONFIG,
236+
ConfigDef.Type.BOOLEAN,
237+
SRC_INIT_SYNC_ENABLE_DEFAULT,
238+
ConfigDef.Importance.LOW,
239+
SRC_INIT_SYNC_ENABLE_DOC,
240+
CONNECTOR_GROUP, 3,
241+
ConfigDef.Width.MEDIUM,
242+
SRC_INIT_SYNC_ENABLE_DISPLAY)
243+
230244
.define(REDISCOVERY_PERIOD_CONFIG,
231245
ConfigDef.Type.LONG,
232246
REDISCOVERY_PERIOD_DEFAULT,
233247
ConfigDef.Importance.LOW,
234248
REDISCOVERY_PERIOD_DOC,
235-
CONNECTOR_GROUP, 4,
249+
CONNECTOR_GROUP, 5,
236250
ConfigDef.Width.MEDIUM,
237251
REDISCOVERY_PERIOD_DISPLAY)
238252
;
@@ -289,6 +303,10 @@ public int getInitSyncDelay() {
289303
return (int)get(SRC_INIT_SYNC_DELAY_CONFIG);
290304
}
291305

306+
public boolean getInitSyncEnable() {
307+
return getBoolean(SRC_INIT_SYNC_ENABLE_CONFIG);
308+
}
309+
292310
public String getDynamoDBServiceEndpoint() {
293311
return getString(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG);
294312
}

source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask {
8989
private SourceInfo sourceInfo;
9090
private TableDescription tableDesc;
9191
private int initSyncDelay;
92+
private boolean initSyncEnabled = false;
9293

9394
@SuppressWarnings("unused")
9495
//Used by Confluent platform to initialize connector
@@ -123,6 +124,7 @@ public void start(Map<String, String> configProperties) {
123124
tableDesc = client.describeTable(config.getTableName()).getTable();
124125

125126
initSyncDelay = config.getInitSyncDelay();
127+
initSyncEnabled = config.getInitSyncEnable();
126128

127129
LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName());
128130
setStateFromOffset();
@@ -163,7 +165,11 @@ private void setStateFromOffset() {
163165
} else {
164166
LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName());
165167
sourceInfo = new SourceInfo(tableDesc.getTableName(), clock);
166-
sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table
168+
if (initSyncEnabled) {
169+
sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table
170+
} else{
171+
sourceInfo.initSyncStatus = InitSyncStatus.FINISHED;
172+
}
167173
}
168174
}
169175

@@ -296,7 +302,7 @@ private List<SourceRecord> sync() throws Exception {
296302
//
297303
// NOTE2: KCL worker reads from multiple shards at the same time in a loop.
298304
// Which means that there can be messages from various time instances (before and after init sync start instance).
299-
if (isPreInitSyncRecord(arrivalTimestamp)) {
305+
if (initSyncEnabled && isPreInitSyncRecord(arrivalTimestamp)) {
300306
LOGGER.debug(
301307
"Dropping old record to prevent another INIT_SYNC. ShardId: {} " +
302308
"ApproximateArrivalTimestamp: {} CurrentTime: {}",
@@ -315,7 +321,7 @@ private List<SourceRecord> sync() throws Exception {
315321
// * connector was down for some time
316322
// * connector is lagging
317323
// * connector failed to finish init sync in acceptable time frame
318-
if (recordIsInDangerZone(arrivalTimestamp)) {
324+
if (initSyncEnabled && recordIsInDangerZone(arrivalTimestamp)) {
319325
sourceInfo.startInitSync();
320326

321327
LOGGER.info(

0 commit comments

Comments
 (0)