Skip to content

Commit 439f6aa

Browse files
biniona-mongodbschmalliso
authored andcommitted
(DOCSP-18200) SMTs + Heartbeats (#221)
1 parent 517eb50 commit 439f6aa

File tree

5 files changed

+74
-8
lines changed

5 files changed

+74
-8
lines changed

snooty.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ source-connector="MongoDB Kafka source connector"
2020
connector_driver_url_base="https://docs.mongodb.com/drivers/java/sync/v{+connector_driver_version+}/"
2121
pipeline-size = "2.4 GB"
2222
stable-api = "Stable API"
23+
default-heartbeat-topic = "__mongodb_heartbeats"

source/source-connector/configuration-properties.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ See the following categories for a list of related configuration properties:
5151
- Specify what data the connector should convert to Change Stream
5252
events.
5353

54-
* - :ref:`Error Handling and Resume Properties <source-configuration-error-handling>`
54+
* - :ref:`Error Handling and Resuming from Interruption Properties <source-configuration-error-handling>`
5555
- Specify how the connector handles errors and resumes reading after an
5656
interruption.
5757

@@ -69,6 +69,6 @@ See the following categories for a list of related configuration properties:
6969
Change Stream </source-connector/configuration-properties/change-stream>
7070
Output Format </source-connector/configuration-properties/output-format>
7171
Copy Existing </source-connector/configuration-properties/copy-existing>
72-
Error Handling </source-connector/configuration-properties/error-handling>
72+
Error Handling and Resuming from Interruption </source-connector/configuration-properties/error-handling>
7373
All Properties </source-connector/configuration-properties/all-properties>
7474

source/source-connector/configuration-properties/all-properties.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ To view only the options related to copying data, see the
9292
:start-after: source-configuration-copy-existing-table-start
9393
:end-before: source-configuration-copy-existing-table-end
9494

95-
Error Handling
96-
--------------
95+
Error Handling and Resuming from Interruption
96+
---------------------------------------------
9797

9898
.. include:: /source-connector/configuration-properties/error-handling.txt
9999
:start-after: source-configuration-error-handling-description-start

source/source-connector/configuration-properties/error-handling.txt

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
.. _source-configuration-error-handling:
22

3-
====================================
4-
Error Handling and Resume Properties
5-
====================================
3+
========================================================
4+
Error Handling and Resuming from Interruption Properties
5+
========================================================
66

77
.. default-domain:: mongodb
88

@@ -133,7 +133,70 @@ Settings
133133
heartbeat messages. You must provide a positive value in the
134134
``heartbeat.interval.ms`` setting to enable this feature.
135135
|
136-
| **Default**: ``__mongodb_heartbeats``
136+
| **Default**: ``{+default-heartbeat-topic+}``
137137
| **Accepted Values**: A valid Kafka topic name
138138

139139
.. _source-configuration-error-handling-table-end:
140+
141+
.. _source-configuration-error-handling-smt:
142+
143+
Heartbeats with Single Message Transforms
144+
-----------------------------------------
145+
146+
If you enable heartbeats and specify **Single Message Transforms (SMTs)** in your
147+
{+kc+} deployment, you must exclude your heartbeat messages from
148+
your SMTs. SMTs are a feature of {+kc+} that enables you to specify transformations on
149+
the messages that pass through your {+source-connector+} without having to deploy a
150+
stream processing application.
151+
152+
To exclude heartbeat messages from your SMTs, you must create and apply a
153+
**predicate** to your SMTs. Predicates are a feature of SMTs that
154+
enables you to check if a message matches a conditional statement before
155+
applying a transformation.
156+
157+
The following configuration defines the ``IsHeartbeat`` predicate which matches
158+
heartbeat messages sent to the default heartbeat topic:
159+
160+
.. code-block:: properties
161+
162+
predicates=IsHeartbeat
163+
predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
164+
predicates.IsHeartbeat.pattern={+default-heartbeat-topic+}
165+
166+
The following configuration uses the preceding predicate to exclude heartbeat
167+
messages from an ``ExtractField`` transformation:
168+
169+
.. code-block:: properties
170+
171+
transforms=Extract
172+
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
173+
transforms.Extract.field=<the field to extract from your {+ak+} key>
174+
transforms.Extract.predicate=IsHeartbeat
175+
transforms.Extract.negate=true
176+
177+
# apply the default key schema as the extract transformation requires a struct object
178+
output.format.key=schema
179+
180+
If you do not exclude your heartbeat messages from the preceding transformation,
181+
your connector raises the following error once it processes a heartbeat message:
182+
183+
.. code-block:: none
184+
185+
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ...
186+
...
187+
Only Struct objects supported for [field extraction], found: java.lang.String
188+
189+
To learn more about SMTs, see
190+
`How to Use Single Message Transforms in Kafka Connect <https://www.confluent.io/blog/kafka-connect-single-message-transformation-tutorial-with-examples/>`__
191+
from Confluent.
192+
193+
To learn more about predicates, see
194+
`Filter (Apache Kafka) <https://docs.confluent.io/platform/current/connect/transforms/filter-ak.html#predicates>`__
195+
from Confluent.
196+
197+
To learn more about the ``ExtractField`` transformation, see
198+
`ExtractField <https://docs.confluent.io/platform/current/connect/transforms/extractfield.html>`__
199+
from Confluent.
200+
201+
To learn more about the default key schema, see the
202+
:ref:`<kafka-source-apply-schemas-default-schema>` page.

source/source-connector/fundamentals/specify-schema.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ or JSON Schema, see the :ref:`<intro-converters>` guide.
5454
To learn more about keys and values in Apache Kafka, see the
5555
`official Apache Kafka introduction <http://kafka.apache.org/intro#intro_concepts_and_terms>`__.
5656

57+
.. _kafka-source-apply-schemas-default-schema:
58+
5759
Default Schemas
5860
---------------
5961

0 commit comments

Comments
 (0)