Skip to content

Commit 205c542

Browse files
author
Chris Cho
authored
Merge pull request #2 from ccho-mongodb/DOCSP-22437-tutorial-setup
DOCSP-22437: Tutorials branch
2 parents c3ab30a + 776ab04 commit 205c542

File tree

9 files changed

+1025
-1
lines changed

9 files changed

+1025
-1
lines changed
Loading

source/includes/tutorials/setup.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.. note:: Before You Get Started
2+
3+
Before you start this tutorial, you must complete the steps in the
4+
:ref:`<kafka-tutorials-docker-setup>`.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
After you complete this tutorial, you can free resources by stopping the
2+
tutorial Docker containers and MongoDB data. Navigate to the tutorial
3+
directory "mongodb-kafka-base" that you created in the setup step and run the
4+
following command:
5+
6+
.. tabs::
7+
8+
.. tab:: Linux or Mac OS
9+
:tabid: shell
10+
11+
.. code-block:: none
12+
:copyable: true
13+
14+
sh stop.sh
15+
16+
.. tab:: Windows
17+
:tabid: powershell
18+
19+
.. code-block:: none
20+
:copyable: true
21+
22+
powershell ./stop.ps1
23+
24+
To restart the containers, follow the same steps required to start them
25+
in the :ref:`Tutorial Setup <tutorial-setup-run-environment>`.
26+
16 KB
Binary file not shown.

source/tutorials.txt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,25 @@ Tutorials
66
:titlesonly:
77
:maxdepth: 1
88

9+
Tutorial Setup </tutorials/tutorial-setup>
10+
Explore MongoDB Change Streams </tutorials/explore-change-streams>
11+
Getting Started with the MongoDB Kafka Source Connector </tutorials/source-connector>
12+
Getting Started with the MongoDB Kafka Sink Connector </tutorials/sink-connector>
913
Replicate Data with a Change Data Capture Handler </tutorials/replicate-with-cdc>
1014
Migrate an Existing Collection to a Time Series Collection </tutorials/migrate-time-series>
1115

12-
Read the following sections to learn of how to perform specific tasks with the {+mkc+}:
16+
The tutorials in this section show you how to perform specific tasks with
17+
the {+mkc+}.
18+
19+
For instructions on setting up your environment for the following tutorials,
20+
read the :ref:`Tutorial Setup <kafka-tutorials-docker-setup>` first:
21+
22+
- :ref:`Explore Change Streams <kafka-tutorial-explore-change-streams>`
23+
- :ref:`Getting Started with the MongoDB Kafka Source Connector <kafka-tutorial-source-connector>`
24+
- :ref:`Getting Started with the MongoDB Kafka Sink Connector <kafka-tutorial-sink-connector>`
25+
26+
Since the following tutorials contain different development environments,
27+
follow the setup instructions contained within them:
1328

1429
- :doc:`Replicate Data with a Change Data Capture Handler </tutorials/replicate-with-cdc>`
1530
- :doc:`Migrate an Existing Collection to a Time Series Collection </tutorials/migrate-time-series>`
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
.. _kafka-tutorial-explore-change-streams:
2+
3+
==============================
4+
Explore MongoDB Change Streams
5+
==============================
6+
7+
.. contents:: On this page
8+
:local:
9+
:backlinks: none
10+
:depth: 1
11+
:class: singlecol
12+
13+
Follow this tutorial to learn how to create a change stream on a MongoDB
14+
collection and observe the change events it creates.
15+
16+
.. include:: /includes/tutorials/setup.rst
17+
18+
Explore Change Streams
19+
----------------------
20+
21+
.. procedure::
22+
:style: connected
23+
24+
.. step:: Connect to the Docker Container
25+
26+
Create two interactive shell sessions on the tutorial Docker
27+
Container, each in a separate window.
28+
29+
.. code-block:: bash
30+
:copyable: true
31+
:caption: This command starts an interactive shell called shell1
32+
33+
docker run --rm --name shell1 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash
34+
35+
.. code-block:: bash
36+
:copyable: true
37+
:caption: This command starts an interactive shell called shell2
38+
39+
docker run --rm --name shell2 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash
40+
41+
.. step:: Open a Change Stream
42+
43+
In **shell1**, create a Python script to open a change stream using
44+
the PyMongo driver.
45+
46+
.. code-block:: bash
47+
:copyable: true
48+
49+
nano openchangestream.py
50+
51+
Paste the following code into the file and save the changes:
52+
53+
.. code-block:: python
54+
:copyable: true
55+
56+
import os
57+
import pymongo
58+
from bson.json_util import dumps
59+
60+
client = pymongo.MongoClient('mongodb://mongo1')
61+
db = client.get_database(name='Tutorial1')
62+
with db.orders.watch() as stream:
63+
print('\nChange Stream is opened on the Tutorial1.orders namespace. Currently watching ...\n\n')
64+
for change in stream:
65+
print(dumps(change, indent=2))
66+
67+
Run the Python script:
68+
69+
.. code-block:: bash
70+
:copyable: true
71+
72+
python3 openchangestream.py
73+
74+
The script outputs the following message after it starts successfully:
75+
76+
.. code-block:: bash
77+
:copyable: false
78+
79+
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
80+
81+
.. step:: Trigger a Change Event
82+
83+
In **shell2**, connect to MongoDB using ``mongosh``, the MongoDB
84+
shell, using the following command:
85+
86+
.. code-block:: bash
87+
:copyable: true
88+
89+
mongosh "mongodb://mongo1"
90+
91+
After you connect successfully, you should see the following
92+
MongoDB shell prompt:
93+
94+
.. code-block::
95+
:copyable: false
96+
97+
rs0 [direct: primary] test>
98+
99+
At the prompt, type the following commands:
100+
101+
.. code-block:: javascript
102+
:copyable: true
103+
104+
use Tutorial1
105+
db.orders.insertOne( { 'test' : 1 } )
106+
107+
After entering the preceding commands, switch to **shell1** to view
108+
the change stream output, which should resemble the following:
109+
110+
.. code-block:: json
111+
:copyable: true
112+
:emphasize-lines: 5
113+
114+
{
115+
"_id": {
116+
"_data": "826264..."
117+
},
118+
"operationType": "insert",
119+
"clusterTime": {
120+
"$timestamp": {
121+
"t": 1650754657,
122+
"i": 1
123+
}
124+
},
125+
"fullDocument": {
126+
"_id": {
127+
"$oid": "62648461d9440c0c72a2202c"
128+
},
129+
"test": 1
130+
},
131+
"ns": {
132+
"db": "Tutorial1",
133+
"coll": "orders"
134+
},
135+
"documentKey": {
136+
"_id": {
137+
"$oid": "62648461d9440c0c72a2202c"
138+
}
139+
}
140+
}
141+
142+
To stop the script, press :kbd:`Ctrl+C`.
143+
144+
By the end of this step, you've successfully triggered and observed a
145+
change stream event.
146+
147+
.. step:: Open a Filtered Change Stream
148+
149+
You can apply a filter to a change stream by passing it an aggregation
150+
pipeline.
151+
152+
In **shell1**, create a new Python script to open a filtered change
153+
stream using the PyMongo driver.
154+
155+
.. code-block:: bash
156+
:copyable: true
157+
158+
nano pipeline.py
159+
160+
Paste the following code into the file and save the changes:
161+
162+
.. code-block:: python
163+
:copyable: true
164+
:emphasize-lines: 6
165+
166+
import os
167+
import pymongo
168+
from bson.json_util import dumps
169+
client = pymongo.MongoClient('mongodb://mongo1')
170+
db = client.get_database(name='Tutorial1')
171+
pipeline = [ { "$match": { "$and": [ { "fullDocument.type":"temp" }, { "fullDocument.value":{ "$gte":100 } } ] } } ]
172+
with db.sensors.watch(pipeline=pipeline) as stream:
173+
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
174+
for change in stream:
175+
print(dumps(change, indent=2))
176+
177+
Run the Python script:
178+
179+
.. code-block:: python
180+
181+
python3 pipeline.py
182+
183+
The script outputs the following message after it starts successfully:
184+
185+
.. code-block::
186+
:copyable: false
187+
188+
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
189+
190+
.. step:: Observe the Filtered Change Stream
191+
192+
Return to your **shell2** session which should be connected to
193+
MongoDB using ``mongosh``.
194+
195+
At the prompt, type the following commands:
196+
197+
.. code-block:: javascript
198+
199+
use Tutorial1
200+
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
201+
202+
As indicated by the script output, the change stream creates a change
203+
event because it matches the following pipeline:
204+
205+
.. code-block:: json
206+
:copyable: false
207+
208+
[ { "$match": { "$and": [ { "fullDocument.type":"temp" }, { "fullDocument.value":{ "$gte":100 } } ] } } ]
209+
210+
Try inserting the following documents in in **shell2** to verify the
211+
change stream only produces events when the documents match the filter:
212+
213+
.. code-block:: javascript
214+
215+
db.sensors.insertOne( { 'type' : 'temp', 'value':99 } )
216+
db.sensors.insertOne( { 'type' : 'pressure', 'value':22 } )
217+
218+
.. step:: (Optional) Stop the Docker Containers
219+
220+
.. include:: /includes/tutorials/stop-containers.rst
221+
222+
Summary
223+
-------
224+
225+
In this tutorial, you created a change stream on MongoDB and observed the
226+
output. The {+source-connector+} reads the change events from a change
227+
stream that you configure, and writes them to a Kafka topic.
228+
229+
To learn how to configure a change stream and Kafka topic for a source
230+
connector, proceed to the :ref:`<kafka-tutorial-source-connector>`
231+
tutorial.
232+
233+
Learn More
234+
----------
235+
236+
Read the following resources to learn more about concepts mentioned in
237+
this tutorial:
238+
239+
- :ref:`Change Streams and the Source Connector <kafka-source-change-streams>`
240+
- :manual:`Modify Change Stream Output </changeStreams/#modify-change-stream-output>`
241+
- :mongosh:`MongoDB Shell (mongosh) </>`
242+

0 commit comments

Comments
 (0)