Skip to content
This repository was archived by the owner on May 28, 2025. It is now read-only.

Upload jar post update #32

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
5 changes: 4 additions & 1 deletion docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"resource.tagging.service.endpoint": "",

"kafka.topic.prefix": "dynamodb-",
"kafka.topic.map": "",
"tasks.max": "1",

"init.sync.delay.period": 60,
Expand All @@ -44,7 +45,9 @@

`dynamodb.table.ingestion.tag.key` - only tables marked with this tag key will be ingested.

`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}-{dynamodb-table-name}`
`kafka.topic.prefix` - all topics created by this connector will have this prefix in their name. Following this pattern `{prefix}{dynamodb-table-name}`

`kafka.topic.map` - A JSON mapping between dynamodb table name and topic name. The topics will be named like `{prefix}{map[table-name]}`. If the map is not specified, the table name will be used.

`tasks.max` - **MUST** always exceed number of tables found for tracking. If max tasks count is lower then found tables count, no tasks will be started!

Expand Down
1 change: 1 addition & 0 deletions source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ description = "Kafka Connect Source connector that reads from DynamoDB streams"
dependencies {
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.amazonaws:aws-java-sdk-resourcegroupstaggingapi:1.11.551'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'

compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}"
compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ private void startBackgroundReconfigurationTasks(ConnectorContext connectorConte
public void run() {
try {
if (consumableTables != null) {
LOGGER.info("Looking for changed DynamoDB tables");
LOGGER.debug("Looking for changed DynamoDB tables");
List<String> consumableTablesRefreshed = tablesProvider.getConsumableTables();
if (!consumableTables.equals(consumableTablesRefreshed)) {
LOGGER.info("Detected changes in DynamoDB tables. Requesting tasks reconfiguration.");
LOGGER.debug("Detected changes in DynamoDB tables. Requesting tasks reconfiguration.");
connectorContext.requestTaskReconfiguration();
}
}
Expand Down Expand Up @@ -131,7 +131,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {

List<Map<String, String>> taskConfigs = new ArrayList<>(consumableTables.size());
for (String table : consumableTables) {
LOGGER.info("Configuring task for table {}", table);
LOGGER.debug("Configuring task for table {}", table);
Map<String, String> taskProps = new HashMap<>(configProperties);

taskProps.put(DynamoDBSourceTaskConfig.TABLE_NAME_CONFIG, table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED";

public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix";
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table.";
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table by default.";
public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix";
public static final String DST_TOPIC_PREFIX_DEFAULT = "dynamodb-";

public static final String DST_TOPIC_MAP_CONFIG = "kafka.topic.map";
public static final String DST_TOPIC_MAP_DOC = "Define Kafka topic namespace map.";
public static final String DST_TOPIC_MAP_DISPLAY = "Topic namespace map";
public static final String DST_TOPIC_MAP_DEFAULT = null;

public static final String REDISCOVERY_PERIOD_CONFIG = "connect.dynamodb.rediscovery.period";
public static final String REDISCOVERY_PERIOD_DOC = "Time period in milliseconds to rediscover stream enabled DynamoDB tables";
Expand Down Expand Up @@ -189,6 +193,15 @@ public static ConfigDef baseConfigDef() {
CONNECTOR_GROUP, 1,
ConfigDef.Width.MEDIUM,
DST_TOPIC_PREFIX_DISPLAY)

.define(DST_TOPIC_MAP_CONFIG,
ConfigDef.Type.STRING,
DST_TOPIC_MAP_DEFAULT,
ConfigDef.Importance.HIGH,
DST_TOPIC_MAP_DOC,
CONNECTOR_GROUP, 2,
ConfigDef.Width.LONG,
DST_TOPIC_MAP_DISPLAY)

.define(SRC_INIT_SYNC_DELAY_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -249,6 +262,10 @@ public String getDestinationTopicPrefix() {
return getString(DST_TOPIC_PREFIX_CONFIG);
}

public String getDestinationTopicMap() {
return getString(DST_TOPIC_MAP_CONFIG);
}

public long getRediscoveryPeriod() {
return getLong(REDISCOVERY_PERIOD_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public String version() {
public void start(Map<String, String> configProperties) {

DynamoDBSourceTaskConfig config = new DynamoDBSourceTaskConfig(configProperties);
LOGGER.info("Starting task for table: {}", config.getTableName());
LOGGER.debug("Starting task for table: {}", config.getTableName());

LOGGER.debug("Getting DynamoDB description for table: {}", config.getTableName());
if (client == null) {
Expand All @@ -126,17 +126,17 @@ public void start(Map<String, String> configProperties) {

LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName());
setStateFromOffset();
LOGGER.info("Task status: {}", sourceInfo);
LOGGER.debug("Task status: {}", sourceInfo);

LOGGER.debug("Initiating DynamoDB table scanner and record converter.");
if (tableScanner == null) {
tableScanner = new DynamoDBTableScanner(client,
tableDesc.getTableName(),
tableDesc.getProvisionedThroughput().getReadCapacityUnits());
}
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix());
converter = new RecordConverter(tableDesc, config.getDestinationTopicPrefix(), config.getDestinationTopicMap());

LOGGER.info("Starting background KCL worker thread for table: {}", tableDesc.getTableName());
LOGGER.debug("Starting background KCL worker thread for table: {}", tableDesc.getTableName());

AmazonDynamoDBStreams dynamoDBStreamsClient = AwsClients.buildDynamoDbStreamsClient(
config.getAwsRegion(),
Expand Down Expand Up @@ -225,7 +225,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {
Thread.sleep(initSyncDelay * 1000);
}

LOGGER.info("Continuing INIT_SYNC {}", sourceInfo);
LOGGER.debug("Continuing INIT_SYNC {}", sourceInfo);
ScanResult scanResult = tableScanner.getItems(sourceInfo.exclusiveStartKey);

LinkedList<SourceRecord> result = new LinkedList<>();
Expand Down Expand Up @@ -261,7 +261,7 @@ private LinkedList<SourceRecord> initSync() throws Exception {


if (sourceInfo.initSyncStatus == InitSyncStatus.RUNNING) {
LOGGER.info(
LOGGER.debug(
"INIT_SYNC iteration returned {}. Status: {}", result.size(), sourceInfo);
} else {
LOGGER.info("INIT_SYNC FINISHED: {}", sourceInfo);
Expand Down Expand Up @@ -334,6 +334,7 @@ private List<SourceRecord> sync() throws Exception {
((RecordAdapter) record).getInternalObject();

Envelope.Operation op = getOperation(dynamoDbRecord.getEventName());
String eventId = dynamoDbRecord.getEventID();

Map<String, AttributeValue> attributes;
if (dynamoDbRecord.getDynamodb().getNewImage() != null) {
Expand All @@ -344,7 +345,9 @@ private List<SourceRecord> sync() throws Exception {

SourceRecord sourceRecord = converter.toSourceRecord(sourceInfo,
op,
eventId,
attributes,
dynamoDbRecord.getDynamodb().getOldImage(),
arrivalTimestamp.toInstant(),
dynamoDBRecords.getShardId(),
record.getSequenceNumber());
Expand Down Expand Up @@ -440,4 +443,4 @@ ArrayBlockingQueue<KclRecordsWrapper> getEventsQueue() {
SourceInfo getSourceInfo() {
return sourceInfo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,18 @@ public static final class FieldName {
* The {@code after} field is used to store the state of a record after an operation.
*/
public static final String DOCUMENT = "document";
/**
* The {@code before} field is used to store the state of a record before the operation.
*/
public static final String OLD_DOCUMENT = "old_document";
/**
* The {@code op} field is used to store the kind of operation on a record.
*/
public static final String OPERATION = "op";
/**
* The {@code event_id} field is used to store a globally unique identifier for the event that was recorded in this stream record.
*/
public static final String EVENT_ID = "event_id";
/**
* The {@code origin} field is used to store the information about the source of a record, including the
* Kafka Connect partition and offset information.
Expand All @@ -73,6 +81,8 @@ public static final class FieldName {
* variations.
*/
public static final String TIMESTAMP = "ts_ms";

public static final String KEY = "key";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public List<String> getConsumableTables() {
final TableDescription tableDesc = client.describeTable(table).getTable();

if (this.hasValidConfig(tableDesc, table)) {
LOGGER.info("Table to sync: {}", table);
LOGGER.debug("Table to sync: {}", table);
consumableTables.add(table);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public DynamoDBTablesProvider(AWSResourceGroupsTaggingAPI groupsTaggingAPI,
}

public List<String> getConsumableTables() {
LOGGER.info("Searching for tables with tag.key: {}", ingestionTagKey);
LOGGER.debug("Searching for tables with tag.key: {}", ingestionTagKey);

final List<String> consumableTables = new LinkedList<>();
GetResourcesRequest resourcesRequest = getGetResourcesRequest();
Expand All @@ -58,7 +58,7 @@ public List<String> getConsumableTables() {
final TableDescription tableDesc = client.describeTable(tableName).getTable();

if (hasValidConfig(tableDesc, tableName)) {
LOGGER.info("Table to sync: {}", tableName);
LOGGER.debug("Table to sync: {}", tableName);
consumableTables.add(tableName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void process(ProcessRecordsInput processRecordsInput) {

String firstProcessedSeqNo = records.get(0).getSequenceNumber();
lastProcessedSeqNo = records.get(records.size() - 1).getSequenceNumber();
LOGGER.info("Added {} records to eventsQueue. Table: {} ShardID: {}, FirstSeqNo: {}, LastSeqNo: {}",
LOGGER.debug("Added {} records to eventsQueue. Table: {} ShardID: {}, FirstSeqNo: {}, LastSeqNo: {}",
records.size(),
tableName,
shardId,
Expand All @@ -152,7 +152,7 @@ private void checkpoint(IRecordProcessorCheckpointer checkpointer) {

if (!lastCommittedRecordSequenceNumber.equals("")) { // If at least one record was committed to Kafka
try {
LOGGER.info("KCL checkpoint table: {} shardId: {} at sequenceNumber: {}",
LOGGER.debug("KCL checkpoint table: {} shardId: {} at sequenceNumber: {}",
tableName,
shardId,
lastCommittedRecordSequenceNumber);
Expand Down Expand Up @@ -233,7 +233,7 @@ private void onTerminate(ShutdownInput shutdownInput) throws InvalidStateExcepti
shardId);
shardRegister.remove(shardId);

LOGGER.info(
LOGGER.debug(
"Shard ended. All data committed. Checkpoint and proceed to next one. Table: {} ShardID: {}",
tableName,
shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void start(AmazonDynamoDB dynamoDBClient,
cloudWatchClient);


LOGGER.info("Creating KCL worker for Stream: {} ApplicationName: {} WorkerId: {}",
LOGGER.debug("Creating KCL worker for Stream: {} ApplicationName: {} WorkerId: {}",
clientLibConfiguration.getStreamName(),
clientLibConfiguration.getApplicationName(),
clientLibConfiguration.getWorkerIdentifier()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,12 @@ public static SchemaBuilder builder() {
public static Schema schema() {
return builder().build();
}

public static Schema optionalSchema() {
return SchemaBuilder.string()
.name(LOGICAL_NAME+".optional")
.version(1)
.optional()
.build();
}
}
Loading