Skip to content

Commit ec4cf0a

Browse files
Chris Choschmalliso
authored andcommitted
DOCSP-15809 custom pipeline (#155)
* DOCSP-15809: Usage Example Custom Pipeline
1 parent e6c0007 commit ec4cf0a

File tree

2 files changed

+109
-16
lines changed

2 files changed

+109
-16
lines changed

source/source-connector/fundamentals/change-streams.txt

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.. _kafka-source-change-streams:
2+
13
==============
24
Change Streams
35
==============
@@ -14,14 +16,14 @@ Overview
1416
--------
1517

1618
In this guide, you can learn about **change streams** and how they are
17-
used in a {+mkc+} source connector.
19+
used in a {+mkc+} source connector.
1820

1921
Change Streams
2022
--------------
2123

2224
Change streams are a feature of MongoDB that allow you to receive real-time
2325
updates on data changes. Change streams return
24-
**change event documents**. A change event document is a document in your
26+
**change event documents**. A change event document is a document in your
2527
**oplog** that contains idempotent instructions to recreate a change that
2628
occurred in your MongoDB deployment as well as the metadata related to that
2729
change.
@@ -36,24 +38,35 @@ works.
3638
Change streams are available for replica sets and sharded clusters. A
3739
standalone MongoDB instance cannot produce a change stream.
3840

39-
For more information on change streams, see the following MongoDB manual entries:
41+
For more information on the oplog, see the MongoDB manual entry on the
42+
:manual:`Replica Set Oplog </core/replica-set-oplog/>`.
43+
44+
You can use an aggregation pipeline to modify change stream events in one or
45+
more of the following ways:
46+
47+
- Filter change events by operation type
48+
- Project specific fields
49+
- Update the value of fields
50+
- Add additional fields
51+
- Trim the amount of data generated by the change stream
4052

41-
- :manual:`Change streams </changeStreams/>`
42-
- :manual:`Oplog </core/replica-set-oplog/>`
53+
For a list of aggregation operators you can use with a change stream, see
54+
the guide on :manual:`Modify Change Stream Output <changeStreams/#modify-change-stream-output>`
55+
in the MongoDB manual.
4356

4457
Change Event Structure
4558
~~~~~~~~~~~~~~~~~~~~~~
4659

4760
You can find the complete structure of change event documents, including
48-
descriptions of all fields,
49-
:ref:`in the MongoDB manual <change-stream-output>`.
61+
descriptions of all fields,
62+
:ref:`in the MongoDB manual <change-stream-output>`.
5063

5164
.. note:: The Full Document Option
5265

5366
If you want Kafka Connect to receive just the document created or modified
5467
from your change operation, use the ``publish.full.document.only=true``
55-
option. For more information see our
56-
guide on <TODO: Link to source connector properties>.
68+
option. For more information, see the :ref:`<source-configuration-change-stream>`
69+
page.
5770

5871
Source Connectors
5972
-----------------
@@ -67,7 +80,7 @@ connector closes its change stream when you stop it.
6780

6881
Your source connector uses the MongoDB Java driver to create a change
6982
stream. For more information, see this guide on change streams in the MongoDB
70-
java driver. <TODO: Link to Watch For Changes Usage Example>
83+
Java driver. <TODO: Link to Watch For Changes Usage Example>
7184

7285
Resume Tokens
7386
~~~~~~~~~~~~~
@@ -76,7 +89,7 @@ Your connector stores a **resume token** to keep track of what changes
7689
it has processed. A resume token is a piece of data that references
7790
the ``_id`` field of a change event document in your MongoDB oplog.
7891
Your connector only processes relevant change event documents written to the oplog after the
79-
document referenced by its resume token.
92+
document referenced by its resume token.
8093

8194
If your source connector does not have a resume token, such as when you start
8295
the connector for the first time, your connector processes relevant change
@@ -89,5 +102,5 @@ events written to the oplog after it first connects to MongoDB.
89102

90103
If your source connector's resume token does not correspond to any entry in your
91104
oplog, your connector has an invalid resume token. To learn how to recover from an
92-
invalid resume token,
105+
invalid resume token,
93106
:ref:`see our troubleshooting guide <kafka-troubleshoot-recover-invalid-resume-token>`.
Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,85 @@
1-
===============
2-
Custom Pipeline
3-
===============
1+
.. _source-usage-example-custom-pipeline:
42

5-
TODO:
3+
============================================
4+
Customize a Pipeline to Filter Change Events
5+
============================================
6+
7+
.. default-domain:: mongodb
8+
9+
This usage example demonstrates how to configure a **pipeline** to
10+
customize the data that your source connector consumes. A pipeline is a
11+
MongoDB aggregation pipeline composed of instructions to the database to
12+
filter or transform data.
13+
14+
MongoDB notifies the connector of data changes that match your aggregation
15+
pipeline on a **change stream**. A change stream is a sequence of events that
16+
describe data changes a client made to a MongoDB deployment in real-time.
17+
For more information, see the MongoDB Server manual entry on
18+
:manual:`Change Streams </changeStreams/>`.
19+
20+
21+
Example
22+
-------
23+
24+
Suppose you're an event coordinator needs to collect names and arrival times
25+
of each guest at a specific event. Whenever a guest checks into the event,
26+
an application inserts a new document that contains the following details:
27+
28+
.. code-block:: json
29+
:copyable: false
30+
31+
{
32+
"_id": ObjectId(...),
33+
"eventId": 321,
34+
"name": "Dorothy Gale",
35+
"arrivalTime": 2021-10-31T20:30:00.245Z
36+
}
37+
38+
You can define your connector ``pipeline`` setting to instruct the change
39+
stream to filter the change event information as follows:
40+
41+
- Create change events for insert operations and omit events for all other
42+
types of operations.
43+
- Create change events only for documents that match the ``fullDocument.eventId``
44+
value "321" and omit all other documents.
45+
- Omit the ``_id`` and ``eventId`` fields from the ``fullDocument`` object
46+
using a projection.
47+
48+
To apply these transformations, assign the following aggregation pipeline
49+
to your ``pipeline`` setting:
50+
51+
.. code-block:: properties
52+
53+
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
54+
55+
.. important::
56+
57+
Make sure that the results of the pipeline contain the top-level ``_id``
58+
field of the ``payload`` object, which MongoDB uses as the value of the
59+
:manual:`resume token </changeStreams/#resume-a-change-stream>`.
60+
61+
When the application inserts the sample document, your configured
62+
connector publishes the following record to your Kafka topic:
63+
64+
.. code-block:: json
65+
:copyable: false
66+
:emphasize-lines: 7,8
67+
68+
{
69+
...
70+
"payload": {
71+
_id: { _data: ... },
72+
"operationType": "insert",
73+
"fullDocument": {
74+
"name": "Dorothy Gale",
75+
"arrivalTime": "2021-10-31T20:30:00.245Z",
76+
},
77+
"ns": { ... },
78+
"documentKey": {
79+
_id: {"$oid": ... }
80+
}
81+
}
82+
}
83+
84+
For more information on managing change streams with the source connector, see
85+
the connector documentation on :ref:`Change Streams <kafka-source-change-streams>`.

0 commit comments

Comments
 (0)