Skip to content

feat: merge all devrev features into main #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class DynamoDBSourceConnector extends SourceConnector {
private TablesProvider tablesProvider;

private List<String> consumableTables;
private String tableVersion;

private volatile Timer timer;

Expand Down Expand Up @@ -76,7 +77,7 @@ public void start(Map<String, String> properties) {
config.getSrcDynamoDBEnvTagValue());
}
}

tableVersion = config.getTableVersion();
startBackgroundReconfigurationTasks(this.context, config.getRediscoveryPeriod());
}

Expand All @@ -90,10 +91,10 @@ private void startBackgroundReconfigurationTasks(ConnectorContext connectorConte
public void run() {
try {
if (consumableTables != null) {
LOGGER.info("Looking for changed DynamoDB tables");
LOGGER.debug("Looking for changed DynamoDB tables");
List<String> consumableTablesRefreshed = tablesProvider.getConsumableTables();
if (!consumableTables.equals(consumableTablesRefreshed)) {
LOGGER.info("Detected changes in DynamoDB tables. Requesting tasks reconfiguration.");
LOGGER.debug("Detected changes in DynamoDB tables. Requesting tasks reconfiguration.");
connectorContext.requestTaskReconfiguration();
}
}
Expand Down Expand Up @@ -131,11 +132,13 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {

List<Map<String, String>> taskConfigs = new ArrayList<>(consumableTables.size());
for (String table : consumableTables) {
LOGGER.info("Configuring task for table {}", table);
LOGGER.debug("Configuring task for table {}", table);
Map<String, String> taskProps = new HashMap<>(configProperties);

taskProps.put(DynamoDBSourceTaskConfig.TABLE_NAME_CONFIG, table);

if (tableVersion != null && !tableVersion.isEmpty()) {
taskProps.put(DynamoDBSourceTaskConfig.TABLE_VERSION_CONFIG, tableVersion);
}
// In feature we might allow having more then one task per table for performance reasons.
// TaskID will be needed for KCL worker identifiers and also to orchestrate init sync.
taskProps.put(DynamoDBSourceTaskConfig.TASK_ID_CONFIG, "task-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay";
public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60;

public static final String SRC_INIT_SYNC_ENABLE_CONFIG = "init.sync.enable";
public static final String SRC_INIT_SYNC_ENABLE_DOC = "Define if INIT_SYNC should be enabled.";
public static final String SRC_INIT_SYNC_ENABLE_DISPLAY = "INIT_SYNC enable";
public static final boolean SRC_INIT_SYNC_ENABLE_DEFAULT = false;

public static final String AWS_REGION_CONFIG = "aws.region";
public static final String AWS_REGION_DOC = "Define AWS region.";
public static final String AWS_REGION_DISPLAY = "Region";
Expand Down Expand Up @@ -52,6 +57,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY = "Tables whitelist";
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT = null;

public static final String SRC_DYNAMODB_TABLE_VERSION_CONFIG = "dynamodb.table.version";
public static final String SRC_DYNAMODB_TABLE_VERSION_DOC = "Define version of the table. This is used to create a unique table connector name for the table.";
public static final String SRC_DYNAMODB_TABLE_VERSION_DISPLAY = "Table version";
public static final String SRC_DYNAMODB_TABLE_VERSION_DEFAULT = "";

public static final String SRC_KCL_TABLE_BILLING_MODE_CONFIG = "kcl.table.billing.mode";
public static final String SRC_KCL_TABLE_BILLING_MODE_DOC = "Define billing mode for internal table created by the KCL library. Default is provisioned.";
public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode";
Expand Down Expand Up @@ -176,6 +186,7 @@ public static ConfigDef baseConfigDef() {
ConfigDef.Width.MEDIUM,
SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY)


.define(SRC_KCL_TABLE_BILLING_MODE_CONFIG,
ConfigDef.Type.STRING,
SRC_KCL_TABLE_BILLING_MODE_DEFAULT,
Expand All @@ -185,6 +196,15 @@ public static ConfigDef baseConfigDef() {
ConfigDef.Width.MEDIUM,
SRC_KCL_TABLE_BILLING_MODE_DISPLAY)

.define(SRC_DYNAMODB_TABLE_VERSION_CONFIG,
ConfigDef.Type.STRING,
SRC_DYNAMODB_TABLE_VERSION_DEFAULT,
ConfigDef.Importance.LOW,
SRC_DYNAMODB_TABLE_VERSION_DOC,
AWS_GROUP, 10,
ConfigDef.Width.MEDIUM,
SRC_DYNAMODB_TABLE_VERSION_DISPLAY)

.define(DST_TOPIC_PREFIX_CONFIG,
ConfigDef.Type.STRING,
DST_TOPIC_PREFIX_DEFAULT,
Expand All @@ -208,16 +228,25 @@ public static ConfigDef baseConfigDef() {
SRC_INIT_SYNC_DELAY_DEFAULT,
ConfigDef.Importance.LOW,
SRC_INIT_SYNC_DELAY_DOC,
CONNECTOR_GROUP, 2,
CONNECTOR_GROUP, 4,
ConfigDef.Width.MEDIUM,
SRC_INIT_SYNC_DELAY_DISPLAY)

.define(SRC_INIT_SYNC_ENABLE_CONFIG,
ConfigDef.Type.BOOLEAN,
SRC_INIT_SYNC_ENABLE_DEFAULT,
ConfigDef.Importance.LOW,
SRC_INIT_SYNC_ENABLE_DOC,
CONNECTOR_GROUP, 3,
ConfigDef.Width.MEDIUM,
SRC_INIT_SYNC_ENABLE_DISPLAY)

.define(REDISCOVERY_PERIOD_CONFIG,
ConfigDef.Type.LONG,
REDISCOVERY_PERIOD_DEFAULT,
ConfigDef.Importance.LOW,
REDISCOVERY_PERIOD_DOC,
CONNECTOR_GROUP, 4,
CONNECTOR_GROUP, 5,
ConfigDef.Width.MEDIUM,
REDISCOVERY_PERIOD_DISPLAY)
;
Expand Down Expand Up @@ -274,6 +303,10 @@ public int getInitSyncDelay() {
return (int)get(SRC_INIT_SYNC_DELAY_CONFIG);
}

public boolean getInitSyncEnable() {
return getBoolean(SRC_INIT_SYNC_ENABLE_CONFIG);
}

public String getDynamoDBServiceEndpoint() {
return getString(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG);
}
Expand All @@ -286,6 +319,10 @@ public List<String> getWhitelistTables() {
return getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) != null ? getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) : null;
}

public String getTableVersion() {
return getString(SRC_DYNAMODB_TABLE_VERSION_CONFIG);
}

public BillingMode getKCLTableBillingMode() {
return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask {
private SourceInfo sourceInfo;
private TableDescription tableDesc;
private int initSyncDelay;
private boolean initSyncEnabled = false;

@SuppressWarnings("unused")
//Used by Confluent platform to initialize connector
Expand All @@ -110,7 +111,7 @@ public String version() {
public void start(Map<String, String> configProperties) {

DynamoDBSourceTaskConfig config = new DynamoDBSourceTaskConfig(configProperties);
LOGGER.info("Starting task for table: {}", config.getTableName());
LOGGER.debug("Starting task for table: {}", config.getTableName());

LOGGER.debug("Getting DynamoDB description for table: {}", config.getTableName());
if (client == null) {
Expand All @@ -123,10 +124,11 @@ public void start(Map<String, String> configProperties) {
tableDesc = client.describeTable(config.getTableName()).getTable();

initSyncDelay = config.getInitSyncDelay();
initSyncEnabled = config.getInitSyncEnable();

LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName());
setStateFromOffset();
LOGGER.info("Task status: {}", sourceInfo);
LOGGER.debug("Task status: {}", sourceInfo);

LOGGER.debug("Initiating DynamoDB table scanner and record converter.");
if (tableScanner == null) {
Expand All @@ -136,7 +138,7 @@ public void start(Map<String, String> configProperties) {
}
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix(), config.getDestinationTopicMap());

LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName());
LOGGER.debug("Starting background KCL worker thread for table: {}", tableDesc.getTableName());

AmazonDynamoDBStreams dynamoDBStreamsClient = AwsClients.buildDynamoDbStreamsClient(
config.getAwsRegion(),
Expand All @@ -150,7 +152,7 @@ public void start(Map<String, String> configProperties) {
eventsQueue,
shardRegister);
}
kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode());
kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getTableVersion(), config.getKCLTableBillingMode());

shutdown = false;
}
Expand All @@ -163,7 +165,11 @@ private void setStateFromOffset() {
} else {
LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName());
sourceInfo = new SourceInfo(tableDesc.getTableName(), clock);
sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table
if (initSyncEnabled) {
sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table
} else{
sourceInfo.initSyncStatus = InitSyncStatus.FINISHED;
}
}
}

Expand Down Expand Up @@ -225,7 +231,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {
Thread.sleep(initSyncDelay * 1000);
}

LOGGER.info("Continuing INIT_SYNC {}", sourceInfo);
LOGGER.debug("Continuing INIT_SYNC {}", sourceInfo);
ScanResult scanResult = tableScanner.getItems(sourceInfo.exclusiveStartKey);

LinkedList<SourceRecord> result = new LinkedList<>();
Expand Down Expand Up @@ -261,7 +267,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {


if (sourceInfo.initSyncStatus == InitSyncStatus.RUNNING) {
LOGGER.info(
LOGGER.debug(
"INIT_SYNC iteration returned {}. Status: {}", result.size(), sourceInfo);
} else {
LOGGER.info("INIT_SYNC FINISHED: {}", sourceInfo);
Expand Down Expand Up @@ -296,7 +302,7 @@ private List<SourceRecord> sync() throws Exception {
//
// NOTE2: KCL worker reads from multiple shards at the same time in a loop.
// Which means that there can be messages from various time instances (before and after init sync start instance).
if (isPreInitSyncRecord(arrivalTimestamp)) {
if (initSyncEnabled && isPreInitSyncRecord(arrivalTimestamp)) {
LOGGER.debug(
"Dropping old record to prevent another INIT_SYNC. ShardId: {} " +
"ApproximateArrivalTimestamp: {} CurrentTime: {}",
Expand All @@ -315,7 +321,7 @@ private List<SourceRecord> sync() throws Exception {
// * connector was down for some time
// * connector is lagging
// * connector failed to finish init sync in acceptable time frame
if (recordIsInDangerZone(arrivalTimestamp)) {
if (initSyncEnabled && recordIsInDangerZone(arrivalTimestamp)) {
sourceInfo.startInitSync();

LOGGER.info(
Expand All @@ -334,6 +340,7 @@ private List<SourceRecord> sync() throws Exception {
((RecordAdapter) record).getInternalObject();

Envelope.Operation op = getOperation(dynamoDbRecord.getEventName());
String eventId = dynamoDbRecord.getEventID();

Map<String, AttributeValue> attributes;
if (dynamoDbRecord.getDynamodb().getNewImage() != null) {
Expand All @@ -344,7 +351,9 @@ private List<SourceRecord> sync() throws Exception {

SourceRecord sourceRecord = converter.toSourceRecord(sourceInfo,
op,
eventId,
attributes,
dynamoDbRecord.getDynamodb().getOldImage(),
arrivalTimestamp.toInstant(),
dynamoDBRecords.getShardId(),
record.getSequenceNumber());
Expand Down Expand Up @@ -440,4 +449,4 @@ ArrayBlockingQueue<KclRecordsWrapper> getEventsQueue() {
SourceInfo getSourceInfo() {
return sourceInfo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ public class DynamoDBSourceTaskConfig extends DynamoDBSourceConnectorConfig {
public static final String TABLE_NAME_CONFIG = "table";
private static final String TABLE_NAME_DOC = "table for this task to watch for changes.";

public static final String TABLE_VERSION_CONFIG = "table.version";
private static final String TABLE_VERSION_DOC = "table version for this task to watch for changes.";

public static final String TASK_ID_CONFIG = "task.id";
private static final String TASK_ID_DOC = "Per table id of the task.";

private static final ConfigDef config = baseConfigDef()
.define(TABLE_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TABLE_NAME_DOC)
.define(TASK_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TASK_ID_DOC);
.define(TASK_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TASK_ID_DOC)
.define(TABLE_VERSION_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, TABLE_VERSION_DOC);

public DynamoDBSourceTaskConfig(Map<String, String> props) {
super(config, props);
Expand All @@ -23,6 +27,10 @@ public String getTableName() {
return getString(TABLE_NAME_CONFIG);
}

public String getTableVersion() {
return getString(TABLE_VERSION_CONFIG);
}

public String getTaskID() {
return getString(TASK_ID_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,18 @@ public static final class FieldName {
* The {@code after} field is used to store the state of a record after an operation.
*/
public static final String DOCUMENT = "document";
/**
* The {@code before} field is used to store the state of a record before the operation.
*/
public static final String OLD_DOCUMENT = "old_document";
/**
* The {@code op} field is used to store the kind of operation on a record.
*/
public static final String OPERATION = "op";
/**
* The {@code event_id} field is used to store a globally unique identifier for the event that was recorded in this stream record.
*/
public static final String EVENT_ID = "event_id";
/**
* The {@code origin} field is used to store the information about the source of a record, including the
* Kafka Connect partition and offset information.
Expand All @@ -73,6 +81,8 @@ public static final class FieldName {
* variations.
*/
public static final String TIMESTAMP = "ts_ms";

public static final String KEY = "key";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.trustpilot.connector.dynamodb.utils.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;

import java.lang.reflect.Type;
import java.time.Clock;
import java.time.Instant;
import java.util.LinkedHashMap;
Expand All @@ -24,13 +25,14 @@
*/
public class SourceInfo {
private static final Gson gson = new Gson();
private static final ObjectMapper objectMapper = new ObjectMapper();
private final Clock clock;

public final String version;
public final String tableName;
public boolean initSync;
public InitSyncStatus initSyncStatus = InitSyncStatus.UNDEFINED;
public Instant lastInitSyncStart;
public Instant lastInitSyncStart = Instant.now();
public Instant lastInitSyncEnd = null;
public long initSyncCount = 0L;

Expand Down Expand Up @@ -112,7 +114,12 @@ public static Map<String, Object> toOffset(SourceInfo sourceInfo) {
offset.put(INIT_SYNC_START, sourceInfo.lastInitSyncStart.toEpochMilli());

if (sourceInfo.exclusiveStartKey != null) {
offset.put(EXCLUSIVE_START_KEY, gson.toJson(sourceInfo.exclusiveStartKey));
try {
offset.put(EXCLUSIVE_START_KEY, objectMapper.writeValueAsString(sourceInfo.exclusiveStartKey));
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize exclusiveStartKey", e);
}
// offset.put(EXCLUSIVE_START_KEY, gson.toJson(sourceInfo.exclusiveStartKey));
}

if (sourceInfo.lastInitSyncEnd != null) {
Expand All @@ -130,9 +137,14 @@ public static SourceInfo fromOffset(Map<String, Object> offset, Clock clock) {
sourceInfo.lastInitSyncStart = Instant.ofEpochMilli((Long) offset.get(INIT_SYNC_START));

if (offset.containsKey(EXCLUSIVE_START_KEY)) {

Type empMapType = new TypeToken<Map<String, AttributeValue>>() {}.getType();
sourceInfo.exclusiveStartKey = gson.fromJson((String)offset.get(EXCLUSIVE_START_KEY), empMapType);
try {
String json = (String) offset.get(EXCLUSIVE_START_KEY);
sourceInfo.exclusiveStartKey = objectMapper.readValue(json, new TypeReference<Map<String, AttributeValue>>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to deserialize exclusiveStartKey", e);
}
// Type empMapType = new TypeToken<Map<String, AttributeValue>>() {}.getType();
// sourceInfo.exclusiveStartKey = gson.fromJson((String)offset.get(EXCLUSIVE_START_KEY), empMapType);
}

if (offset.containsKey(INIT_SYNC_END)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public List<String> getConsumableTables() {
final TableDescription tableDesc = client.describeTable(table).getTable();

if (this.hasValidConfig(tableDesc, table)) {
LOGGER.info("Table to sync: {}", table);
LOGGER.debug("Table to sync: {}", table);
consumableTables.add(table);
}
}
Expand Down
Loading