You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: README.md
+12-12Lines changed: 12 additions & 12 deletions
Original file line number
Diff line number
Diff line change
@@ -2,17 +2,17 @@
2
2
3
3
# kafka-connect-dynamodb
4
4
5
-
A plug-in for the [Kafka Connect framework](https://kafka.apache.org/documentation.html#connect) which implements a "source connector" for DynamoDB table Streams. This source connector extends [Confluent ecosystem](https://www.confluent.io/hub/) and allows replicating DynamoDB tables into Kafka topics. Once data is in Kafka you can use various Confluent sink connectors to push this data to different destinations systems for e.g. into BigQuery for easy analytics.
5
+
A [Kafka Connector](http://kafka.apache.org/documentation.html#connect) which implements a "source connector" for AWS DynamoDB table Streams. This source connector allows replicating DynamoDB tables into Kafka topics. Once data is in Kafka you can use various Kafka sink connectors to push this data to different destinations systems, e.g. - BigQuery for easy analytics.
6
6
7
-
## Additional features
7
+
## Notable features
8
8
*`autodiscovery` - monitors and automatically discovers DynamoDB tables to start/stop syncing from (based on AWS TAG's)
9
-
*`INIT_SYNC` - automatically detects and if needed performs initial(existing) data replication before continuing tracking changes from the Stream
9
+
*`initial sync` - automatically detects and if needed performs initial(existing) data replication before tracking changes from the DynamoDB table stream
10
10
11
11
## Alternatives
12
12
13
-
We found only one existing alternative implementation by [shikhar](https://github.com/shikhar/kafka-connect-dynamodb), but it seems to be missing major features and is no longer supported.
13
+
Prior our development we found only one existing implementation by [shikhar](https://github.com/shikhar/kafka-connect-dynamodb), but it seems to be missing major features (initial sync, handling shard changes) and is no longer supported.
14
14
15
-
Also it tries to manage DynamoDB Stream shards manually by using one Connect task to read from each DynamoDB Streams shard, but since DynamoDB Stream shards are dynamic compared to static ones in Kinesis streams this approach would require rebalancing all Confluent Connect cluster tasks far to often.
15
+
Also it tries to manage DynamoDB Stream shards manually by using one Kafka Connect task to read from each DynamoDB Streams shard, but since DynamoDB Stream shards are dynamic compared to static ones in Kinesis streams this approach would require rebalancing all Kafka Connect cluster tasks far to often.
16
16
17
17
Contrary in our implementation we opted to use Amazon Kinesis Client with DynamoDB Streams Kinesis Adapter which takes care of all shard reading and tracking tasks.
18
18
@@ -22,7 +22,7 @@ Contrary in our implementation we opted to use Amazon Kinesis Client with Dynamo
22
22
23
23
* Java 8
24
24
* Gradlew 5.3.1
25
-
*Confluent Kafka Connect Framework >= 2.1.1
25
+
* Kafka Connect Framework >= 2.1.1
26
26
* Amazon Kinesis Client 1.9.1
27
27
* DynamoDB Streams Kinesis Adapter 1.4.0
28
28
@@ -36,16 +36,16 @@ Contrary in our implementation we opted to use Amazon Kinesis Client with Dynamo
36
36
37
37
* KCL(Amazon Kinesis Client) keeps metadata in separate dedicated DynamoDB table for each DynamoDB Stream it's tracking. Meaning that there will be one additional table created for each table this connector is tracking.
38
38
39
-
* Current implementation supports only one Confluent task(= KCL worker) reading from one table at any given time.
40
-
*This limits maximum throughput possible from one table to about**~2000 records(change events) per second**.
41
-
* This limitation is imposed by current plugin logic and not by the KCL library or Kafka connect framework. Running multiple tasks would require additional synchronization mechanisms for `INIT SYNC` state tracking and might be implemented in feature.
39
+
* Current implementation supports only one Kafka Connect task(= KCL worker) reading from one table at any given time.
40
+
*Due to this limitation we tested maximum throughput from one table to be**~2000 records(change events) per second**.
41
+
* This limitation is imposed by current connector logic and not by the KCL library or Kafka Connect framework. Running multiple tasks would require additional synchronization mechanisms for `INIT SYNC` state tracking and might be implemented incremental feature.
42
42
43
43
* Running multiple tasks for different tables on the same JVM has negative impact on overall performance of both tasks.
44
-
* This is so because Amazon Kinesis Client library has some global locking happening.
44
+
* This is because Amazon Kinesis Client library has some global locking happening.
45
45
* This issue has been solved in newer KCL versions, but reading from DynamoDB Streams requires usage of DynamoDB Streams Kinesis Adapter library. And this library still depends on older Amazon Kinesis Client 1.9.1.
46
46
* That being said you will only encounter this issue by running lots of tasks on one machine with really high load.
47
47
48
-
* DynamoDB table unit capacity must be large enough to enable`INIT_SYNC` to finished in around 16 hours. Otherwise you risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.
48
+
* DynamoDB table unit capacity must be large enough to ensure`INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.
49
49
50
50
* Required AWS roles:
51
51
```json
@@ -126,7 +126,7 @@ We use [SemVer](http://semver.org/) for versioning. For the versions available,
126
126
127
127
## Releases
128
128
129
-
Releases are done by creating new release(aka tag) via Github user interface. Once created Travis will pick it up, build and upload final .jar file as asset for the Github release.
129
+
Releases are done by creating new release(aka tag) via Github user interface. Once created Travis CI will pick it up, build and upload final .jar file as asset for the Github release.
Copy file name to clipboardExpand all lines: docs/data.md
+2-2Lines changed: 2 additions & 2 deletions
Original file line number
Diff line number
Diff line change
@@ -2,7 +2,7 @@
2
2
# Topic messages structure
3
3
4
4
*`Topic key` - contains all defined DynamoDB table keys.
5
-
*`Topic value` - contains ful DynamoDB document serialised as DynamoDB Json string together with additional metadata.
5
+
*`Topic value` - contains full DynamoDB document serialised as DynamoDB Json string together with additional metadata.
6
6
7
7
```json
8
8
[
@@ -42,7 +42,7 @@
42
42
*`u` - existing record updated
43
43
*`d` - existing record deleted
44
44
45
-
`init_sync_start` - is set then`INIT_SYNC` starts and will retain same value not only for `INIT_SYNC` records but for all following events as well. Untill next `INIT_SYNC` events happens.
45
+
`init_sync_start` - is set when`INIT_SYNC` starts and will retain same value not only for `INIT_SYNC` records but for all following events as well. Untill next `INIT_SYNC` events happens.
This plugin can sync multiple DynamoDB tables at the same time and it does so without requiring explicit configuration for each one. On start and at regular intervals after that(by default 60s) it queries AWS api for tables which match following criteria and starts Kafka Connect task for each of them:
6
+
This connector can sync multiple DynamoDB tables at the same time and it does so without requiring explicit configuration for each one. On start and at regular time intervals (by default 60s) it queries AWS api for DynamoDB tables which match following criteria and starts Kafka Connect task for each of them:
7
7
* table must have configured ingestion TAG key set
8
8
* table mush have configured stack(environment) TAG key and value set
9
9
* table must have DynamoDB streams enabled (in `new_image` or `new_and_old_image` mode)
10
10
11
11
12
12
### 2. "INIT_SYNC"
13
13
14
-
`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the thirst time. But it can be repeated in case of unexpected issues. For e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (as data is stored only for 24 hours).
14
+
`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (as data is stored only for 24 hours).
15
15
16
16
### 3. "SYNC"
17
17
18
-
Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes happening to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time.
18
+
Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes that happen to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time.
19
19
20
20
# How does it work
21
21
22
-
This plugin depends on Kafka Connect framework for most tasks related to Kafka and uses Kinesis Client Library(KCL) + DynamoDB Streams Adapter libraries for DynamoDB Streams consumption.
22
+
This connector depends on Kafka Connect framework for most tasks related to Kafka and uses Kinesis Client Library(KCL) + DynamoDB Streams Adapter libraries for DynamoDB Streams consumption.
23
23
24
24
Read the following articles to familiarize yourself with them:
At it's core this plugin starts one Kafka Connect task for each table it syncs. And each task starts a dedicated KCL(Kinesis Consumer Library) worker to read data from the stream.
29
+
At it's core this connector starts one Kafka Connect task for each table it syncs. And each task starts a dedicated KCL(Kinesis Consumer Library) worker to read data from the stream.
30
30
31
31
## State tracking
32
32
33
-
Connector tracks it's state at all stages and is able to continue there it stopped after restart. But, state and progress **tracking happens at regular intervals** and not after each processed event. Meaning that there can and **will be event duplicates in destination topic**!
33
+
Connector tracks it's state at all stages and is able to continue where it stopped after restart. However state and progress **tracking happens at regular intervals** and not after each processed event, meaning that there can and **will be event duplicates in destination topic**!
34
34
35
35
Since we are using two different frameworks/libraries together there are two different ways how each of them stores state:
36
-
* Kafka connect leverages dedicated `state` topics there connector tasks can push offsets(state) for each partition they are consuming. This plugin has no support for source table "partitions" and only one task is allowed to consume one table at a time therefor it uses table name as partition key and leverage `offsets` dictionary to store tasks state and progress of that state.
37
-
* KCL library uses separate dedicated DynamoDB table for each DynamoDB Stream it tracks to remember it's own progress. It is used only to track which messages have been consumed already. Since we can only say that message has been consumed once it's delivered to Kafka special synchronization logic is implemented in this plugin.
36
+
* Kafka connect leverages dedicated `state` topics where connector tasks can push offsets(state) for each partition they are consuming. This connector has no support for source table "partitions" and only one task is allowed to consume one table at a time, therefore it uses table name as partition key and leverage `offsets` dictionary to store tasks state and progress of that state.
37
+
* KCL library uses separate dedicated DynamoDB table for each DynamoDB Stream it tracks to remember it's own progress. It is used only to track which messages have been consumed already. Since we can only say that message has been consumed once it's delivered to Kafka special synchronization logic is implemented in this connector.
38
38
39
-
> NOTE: KCL library separate`state` table in DynamoDB for each stream it tracks! This table is created automatically if it doesn't exist.
39
+
> NOTE: KCL library uses`state` table in DynamoDB for each stream it tracks! This table is created automatically if it doesn't exist.
40
40
41
41
### `DISCOVERY` and task configuration
42
42
43
-
Plugin uses resource group api to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if stack TAG is set and streams are actually enabled. For each table which meats all requirements separate dedicated Kafka Connect task is started.
43
+
Connector uses resource group api to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if stack TAG is set and streams are actually enabled. For each table which meats all requirements separate dedicated Kafka Connect task is started.
44
44
45
45
Same `discovery` phase is executed on start and after every 60 seconds(default config value). Each started task can be in one of the following states.
46
46
@@ -53,8 +53,8 @@ During `INIT_SYNC` phase all records from source table are scanned in batches. A
53
53
54
54
### `SYNC` state
55
55
56
-
After `INIT_SYNC`plugin starts reading messages from DynamoDB Stream. Thirst thing it makes sure is to drop all events which happened before `INIT_SYNC` was started (except for those created during last hour before `INIT_SYNC`). This is done to prevent unnecessary duplicate events(since we already have latest state) and to advance KCL reader into `save zone`.
56
+
After `INIT_SYNC`connector starts reading messages from DynamoDB Stream. First it makes sure to drop all events which happened before `INIT_SYNC` was started (except for those created during last hour before `INIT_SYNC`). This is done to prevent unnecessary duplicate events(since we already have latest state) and to advance KCL reader into `save zone`.
57
57
58
-
Events are considered to be in `save zone` if they there create no earlier then -20 hours before `now`. Otherwise plugin has no way to validate that it hasn't skipped some of the events and it has to initiate `INIT_SYNC`!
58
+
Events are considered to be in `save zone` if they there create no earlier then -20 hours before `now`. Otherwise connector has no way to validate that it hasn't skipped some of the events and it has to initiate `INIT_SYNC`!
59
59
60
60
> NOTE: DynamoDB Streams store data for 24hours only
Copy file name to clipboardExpand all lines: docs/options.md
+2-2Lines changed: 2 additions & 2 deletions
Original file line number
Diff line number
Diff line change
@@ -32,7 +32,7 @@
32
32
"connect.dynamodb.rediscovery.period": "60000"
33
33
}
34
34
```
35
-
`dynamodb.table.env.tag.key` - tag key used to define environment(stack). Useful if you have `staging` and `production` under same AWS account. Or if you want to use different Confluent Connect clusters to sync different tables.
35
+
`dynamodb.table.env.tag.key` - tag key used to define environment(stack). Useful if you have `staging` and `production` under same AWS account. Or if you want to use different Kafka Connect clusters to sync different tables.
36
36
37
37
`dynamodb.table.env.tag.value` - defines from which environment or stack to ingest tables. For e.g. 'staging' or 'production'...
38
38
@@ -42,7 +42,7 @@
42
42
43
43
`tasks.max` - **MUST** always exceed number of tables found for tracking. If max tasks count is lower then found tables count, no tasks will be started!
44
44
45
-
`init.sync.delay.period` - time value in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Confluent Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).
45
+
`init.sync.delay.period` - time value in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).
46
46
47
47
`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.
0 commit comments