Skip to content

Commit c42a804

Browse files
Chris Choschmalliso
authored andcommitted
DOCSP-15399: add deleteone business key strategy (#93)
* DOCSP-15399: add deleteone business key strategy
1 parent c701ec9 commit c42a804

File tree

1 file changed

+116
-14
lines changed

1 file changed

+116
-14
lines changed

source/kafka-sink-postprocessors.txt

Lines changed: 116 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ provided with this connector:
173173
- | Full Path: ``com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``
174174
| Generates a random UUID as a string.
175175

176+
176177
You can assign the ``document.id.strategy`` property as follows:
177178

178179
.. code-block:: properties
@@ -449,6 +450,10 @@ specified.
449450
[key|value].projection.type=BlockList
450451
[key|value].projection.list=**.high
451452

453+
The record contains the following data after applying the projection:
454+
455+
.. code-block:: json
456+
452457
{
453458
"city": "Springfield",
454459
"population: {
@@ -504,7 +509,7 @@ RenameByMapping Example
504509
^^^^^^^^^^^^^^^^^^^^^^^
505510

506511
The ``RenameByMapping`` post processor setting specifies one or more
507-
objects that assigns fields matching a string to a new name in a Key or Value
512+
objects that assign fields matching a string to a new name in a Key or Value
508513
document.
509514

510515
Each object contains the text to match in the ``oldName`` element and the
@@ -683,19 +688,29 @@ provided with the connector:
683688

684689
.. seealso::
685690

686-
Example of usage in :ref:`writemodel-strategy-business-key`.
691+
Example of usage in :ref:`ReplaceOneBusinessKey example <replaceonebusinesskey-example>`.
692+
687693
* - DeleteOneDefaultStrategy
688694
- | Deletes at most one document that matches the id specified by the ``document.id.strategy`` setting, only when the document contains a null value record.
689695
| Implicitly specified when the configuration setting ``mongodb.delete.on.null.values=true`` is set.
690696
| You can set this explicitly with the following configuration: ``writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy``
691697

698+
* - DeleteOneBusinessKeyStrategy
699+
- | Deletes at most one document that matches filters provided in the ``document.id.strategy`` setting.
700+
| Set the following configuration: ``writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy``
701+
702+
.. seealso::
703+
704+
Example of usage in :ref:`DeleteOneBusinessKey example <deleteonebusinesskey-example>`.
705+
692706
* - UpdateOneTimestampsStrategy
693707
- | Add ``_insertedTS`` (inserted timestamp) and ``_modifiedTS`` (modified timestamp) fields into documents.
694708
| Set the following configuration: ``writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy``
695709

696710
.. seealso::
697711

698-
Example of usage in :ref:`writemodel-strategy-timestamps`.
712+
Example of usage in :ref:`WriteModel Strategy: Inserted and Modified Timestamps<writemodel-strategy-timestamps>`.
713+
699714
* - UpdateOneBusinessKeyTimestampStrategy
700715
- | Add ``_insertedTS`` (inserted timestamp) and ``_modifiedTS`` (modified timestamp) fields into documents that match the filters provided by the ``document.id.strategy`` setting.
701716
| Set the following configuration: ``writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy``
@@ -768,11 +783,19 @@ in the MongoDB collection that matches the ID from the value document.
768783
WriteModel Strategy: Business Keys
769784
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
770785

771-
A business key is a value comprised of data within the sink record that
772-
identifies it as a unique document. This example defines a business key
773-
using data contained within multiple fields in the record as well as
774-
instructs the post processor to generate *BSON ObjectIds* for inserts,
775-
but not for updates.
786+
A **business key** is a value comprised of data within the sink record that
787+
identifies it as a unique document. The following examples demonstrate
788+
WriteModel strategies that define business keys using one or more fields
789+
from the sink record.
790+
791+
.. _replaceonebusinesskey-example:
792+
793+
ReplaceOneBusinessKeyStrategy Example
794+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
795+
796+
In this example, we assemble a business key from multiple fields of a
797+
record and instruct the post processor to generate BSON **ObjectId**
798+
instances for inserts, but not for updates.
776799

777800
Follow the steps below to configure this strategy:
778801

@@ -784,7 +807,7 @@ Follow the steps below to configure this strategy:
784807
#. In the connector configuration, specify the
785808
``ReplaceOneBusinessKeyStrategy`` writemodel strategy.
786809

787-
For this example, we track airplane capacity by the flight number and
810+
Suppose our task is to track airplane capacity by the flight number and
788811
airport location represented by ``flight_no`` and ``airport_code``,
789812
respectively. An example message contains the following:
790813

@@ -802,7 +825,7 @@ respectively. An example message contains the following:
802825
To implement the strategy, we first create a unique index on the
803826
``flight_no`` and ``airport_code`` fields in the MongoDB shell:
804827

805-
.. code-block:: none
828+
.. code-block:: javascript
806829

807830
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })
808831

@@ -850,6 +873,86 @@ values without changing the ``_id`` field.
850873
}
851874
}
852875

876+
.. _deleteonebusinesskey-example:
877+
878+
DeleteOneBusinessKeyStrategy Example
879+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
880+
881+
In this example, we show how to use the ``DeleteOneBusinessKeyStrategy``
882+
to do the following:
883+
884+
- Assemble a business key from a single field from a record.
885+
- Instruct the post processor to delete a document in the target
886+
collection that contains a value matching the one in the business key.
887+
888+
Suppose our task is to track airline passenger ticket reservations and
889+
cancellations. We can use the ``DeleteOneBusinessKeyStrategy`` to delete
890+
reservations when customers cancel or when reservations go unclaimed.
891+
892+
The following MongoDB document represents a ticket reservation in the
893+
``tickets`` database and ``reservations`` collection:
894+
895+
.. code-block:: json
896+
897+
{
898+
"_id": ObjectId('5db0b81199b7fbcc89631d07'),
899+
"flight_no": "Z920",
900+
"airport_code": "CDG",
901+
"reservation_no": "RT211583W"
902+
}
903+
904+
The following steps show how we can configure the value of the
905+
``reservation_no`` field as the business key and create the delete
906+
strategy:
907+
908+
#. Create a :manual:`unique index </core/index-unique>` that corresponds to
909+
your business key in your target MongoDB collection.
910+
#. In the connector configuration, specify the ``PartialValueStrategy``
911+
as the id strategy to identify the field that you want to use as the
912+
business key.
913+
#. In the connector configuration, specify the
914+
``DeleteOneBusinessKeyStrategy`` writemodel strategy.
915+
#. Specify the source topic and target MongoDB database and collection.
916+
917+
To implement the ``DeleteOneBusinessKeyStrategy`` with our connector, first
918+
we create a unique index on the ``reservation_no`` field in the MongoDB shell:
919+
920+
.. code-block:: javascript
921+
922+
db.collection.createIndex({ "reservation_no": 1 }, { unique: true })
923+
924+
Next, we specify the ``PartialValueStrategy`` strategy as the id strategy
925+
and use the ``projection`` setting to consider only the ``reservation_no``
926+
field with the following configuration:
927+
928+
.. code-block:: properties
929+
930+
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
931+
document.id.strategy.partial.value.projection.list=reservation_no
932+
document.id.strategy.partial.value.projection.type=AllowList
933+
934+
We specify the strategy in the configuration as follows:
935+
936+
.. code-block:: properties
937+
938+
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy
939+
940+
We define the Kafka topic from which we receive cancellations and the
941+
target ticket reservation database and collection in the configuration
942+
file as follows:
943+
944+
.. code-block:: properties
945+
946+
topics=ReservationCancellations
947+
database=tickets
948+
collection=reservations
949+
950+
When the connector receives sink data from the "ReservationCancellations"
951+
topic, it extracts the business key using the strategy configuration
952+
settings and performs a delete operation on the target collection. If the
953+
``reservation_no`` field contains a value of "RT211583W", the connector
954+
deletes deletes the matching document from the collection.
955+
853956
.. _writemodel-strategy-timestamps:
854957

855958
WriteModel Strategy: Inserted and Modified Timestamps
@@ -904,11 +1007,10 @@ The MongoDB document inserted after processing the initial message for the
9041007
train includes the following data:
9051008

9061009
.. code-block:: json
907-
:emphasize-lines: 3-4
9081010

9091011
{
9101012
"_id": "MN-1234",
911-
"_insertedTS": ISODate('2019-10-23T15:08:000Z"),
1013+
"_insertedTS": ISODate("2019-10-23T15:08:000Z"),
9121014
"_modifiedTS": ISODate("2019-10-23T15:08:000Z"),
9131015
"start": "Beacon",
9141016
"destination": "Grand Central"
@@ -919,13 +1021,13 @@ After an hour, the train reports its current location along its route. The
9191021
``position`` and ``_modifiedTS`` fields are updated:
9201022

9211023
.. code-block:: json
922-
:emphasize-lines: 4,7
9231024

9241025
{
9251026
"_id": "MN-1234",
926-
"_insertedTS": ISODate('2019-10-23T15:08:000Z"),
1027+
"_insertedTS": ISODate("2019-10-23T15:08:000Z"),
9271028
"_modifiedTS": ISODate("2019-10-23T16:08:000Z"),
9281029
"start": "Beacon",
9291030
"destination": "Grand Central"
9301031
"position": [ 41.156, -73.870 ]
9311032
}
1033+

0 commit comments

Comments
 (0)