Skip to content

Commit b517fc3

Browse files
Chris Choschmalliso
authored andcommitted
DOCSP-14077: cdc handler (#81)
* DOCSP-14077: Sink Connector CDC handler
1 parent 568330f commit b517fc3

File tree

1 file changed

+87
-22
lines changed

1 file changed

+87
-22
lines changed

source/kafka-sink-cdc.txt

Lines changed: 87 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,89 @@ Kafka Sink Change Data Capture
1212
:depth: 1
1313
:class: singlecols
1414

15-
Change Data Capture Mode
16-
------------------------
15+
Overview
16+
--------
1717

1818
Change data capture (CDC) is an architecture that converts changes in
19-
a database into event streams. The MongoDB Kafka sink connector can
20-
process event streams using `Debezium <https://debezium.io/>`_ as an event
21-
producer for the following source databases:
19+
a source database into event streams. You can capture CDC events with the
20+
MongoDB Kafka sink connector and perform corresponding insert, update, and
21+
delete operations to a destination MongoDB cluster.
2222

23-
* `MongoDB <http://debezium.io/docs/connectors/mongodb/>`_
24-
* `MySQL <http://debezium.io/docs/connectors/mysql/>`_
25-
* `PostgreSQL <http://debezium.io/docs/connectors/postgresql/>`_
23+
You can also handle CDC using the following event producers:
2624

27-
CDC Handler Configuration
28-
-------------------------
25+
- :ref:`Debezium <cdc-debezium>`
26+
- Qlik Replicate (coming soon)
27+
28+
Change Data Capture Using the MongoDB Sink Connector
29+
----------------------------------------------------
30+
31+
To configure your MongoDB Kafka sink connector to handle CDC events from
32+
a Kafka topic, update your configuration to include the following:
33+
34+
.. code-block:: properties
35+
36+
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
37+
38+
The ``ChangeStreamHandler`` class instructs the sink connector to process
39+
change events that are in the :manual:`change stream response document format </reference/change-events/#change-stream-output>`.
40+
You can use a :doc:`MongoDB Kafka source connector </kafka-source>` to
41+
configure the change stream data that you want to publish to specific topics.
42+
43+
Remember to specify the topic and the destination in the following
44+
configuration properties:
45+
46+
- ``topics``
47+
- ``connection.uri``
48+
- ``database``
49+
- ``collection``
50+
51+
For more information on the properties above, see our guide on
52+
:doc:`Kafka Sink Connector Configuration Properties </kafka-sink-properties>`.
53+
54+
55+
ChangeStreamHandler Example
56+
~~~~~~~~~~~~~~~~~~~~~~~~~~~
57+
58+
The following sample JSON payload instantiates a new connector that uses
59+
the ``ChangeStreamHandler`` with a specified CDC configuration when posted
60+
to the `Kafka Connect REST endpoint <https://docs.confluent.io/current/connect/references/restapi.html>`__:
61+
62+
.. code-block:: json
63+
64+
{
65+
"name": "mongo-sink-changestreamhandler-cdc",
66+
"config": {
67+
"connection.uri": "mongodb://<hostname>:27017/kafkaconnect?w=1&journal=true",
68+
"topics": "myreplset.kafkaconnect.mongosrc",
69+
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
70+
"connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
71+
"key.converter": "io.confluent.connect.avro.AvroConverter",
72+
"key.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
73+
"value.converter": "io.confluent.connect.avro.AvroConverter",
74+
"value.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
75+
"collection": "mongosink"
76+
}
77+
}
78+
79+
.. include:: /includes/externalize-secrets.rst
80+
81+
.. _cdc-debezium:
82+
83+
Change Data Capture Using Debezium
84+
----------------------------------
85+
86+
The MongoDB Kafka sink connector can also process event streams using
87+
`Debezium <https://debezium.io/>`__ as an event producer for the following
88+
source databases:
89+
90+
* `MongoDB <http://debezium.io/docs/connectors/mongodb/>`__
91+
* `MySQL <http://debezium.io/docs/connectors/mysql/>`__
92+
* `PostgreSQL <http://debezium.io/docs/connectors/postgresql/>`__
2993

3094
You can configure the sink connector to process data from a CDC stream
3195
using one of the included handlers for Debezium or a custom handler that
3296
extends the abstract class `CdcHandler
33-
<https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcHandler.java>`_.
97+
<https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/sink/cdc/CdcHandler.java>`__.
3498

3599
To create a CDC handler, specify the following configuration information:
36100

@@ -40,31 +104,32 @@ To create a CDC handler, specify the following configuration information:
40104
- Topics on which the connector should listen for data in the ``topics``
41105
property.
42106
- MongoDB collection to write data to in the ``collection`` property.
43-
- The :doc:`converters </kafka-sink-data-formats>` required to
44-
handle data formats in the ``[key|value].converter`` properties. Both
45-
JSON + Schema and AVRO formats are supported.
46-
- Any :doc:`post processors </kafka-sink-postprocessors>` necessary
47-
to modify the record before saving it to MongoDB.
48107

49-
The following sample JSON payload instantiates a new connector with
50-
a specified CDC configuration when posted to the `Kafka Connect REST endpoint
51-
<https://docs.confluent.io/current/connect/references/restapi.html>`_:
108+
.. _cdc-debezium-example:
109+
110+
Debezium Example
111+
~~~~~~~~~~~~~~~~
112+
113+
The following sample JSON payload instantiates a new connector using
114+
Debezium with a specified CDC configuration when posted to the
115+
`Kafka Connect REST endpoint <https://docs.confluent.io/current/connect/references/restapi.html>`__:
52116

53117
.. code-block:: json
54118

55119
{
56120
"name": "mongo-sink-debezium-cdc",
57121
"config": {
58-
"connection.uri": "mongodb://mongodb:27017/kafkaconnect?w=1&journal=true",
122+
"connection.uri": "mongodb://<mongodb-hostname>:27017/kafkaconnect?w=1&journal=true",
59123
"topics": "myreplset.kafkaconnect.mongosrc",
60124
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
61125
"connector.class": "com.mongodb.kafka.connect.sink.MongoSinkConnector",
62126
"key.converter": "io.confluent.connect.avro.AvroConverter",
63-
"key.converter.schema.registry.url": "http://localhost:8081",
127+
"key.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
64128
"value.converter": "io.confluent.connect.avro.AvroConverter",
65-
"value.converter.schema.registry.url": "http://localhost:8081",
129+
"value.converter.schema.registry.url": "http://<schema-registry-hostname>:8081",
66130
"collection": "mongosink"
67131
}
68132
}
69133

70134
.. include:: /includes/externalize-secrets.rst
135+

0 commit comments

Comments
 (0)