Skip to content

Commit 687157a

Browse files
Chris Choschmalliso
authored andcommitted
DOCSP-14128: write model strategy corrections (#77)
* DOCSP-14128: write model strategy corrections
1 parent efc16e0 commit 687157a

File tree

2 files changed

+27
-16
lines changed

2 files changed

+27
-16
lines changed

source/kafka-sink-postprocessors.txt

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -650,9 +650,10 @@ The post processor applied the following changes:
650650
renaming step if the replacement key already exists at the current level.
651651

652652
.. _custom-write-models:
653+
.. _custom-write-model-strategy:
653654

654-
Custom Write Models
655-
-------------------
655+
Custom Write Model Strategy
656+
---------------------------
656657

657658
A **write model** defines the behavior of bulk write operations made on a
658659
MongoDB collection. The default write model for the connector is
@@ -662,8 +663,8 @@ MongoDB collection. The default write model for the connector is
662663
set to upsert mode.
663664

664665
You can override the default write model by specifying a custom one in the
665-
``mongodb.writemodel.strategy`` configuration setting. The following
666-
strategies are provided with the connector:
666+
``writemodel.strategy`` configuration setting. The following strategies are
667+
provided with the connector:
667668

668669
.. list-table::
669670
:header-rows: 1
@@ -699,26 +700,36 @@ strategies are provided with the connector:
699700
- | Add ``_insertedTS`` (inserted timestamp) and ``_modifiedTS`` (modified timestamp) fields into documents that match the filters provided by the ``document.id.strategy`` setting.
700701
| Set the following configuration: ``writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy``
701702

702-
Create Your Own Custom Write Strategy
703-
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
703+
Create Your Own Custom Write Model Strategy
704+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
704705

705-
If none of the pre-built write strategy models suits your needs, you can
706-
create your own. A custom write strategy model is a Java class that implements
707-
``WriteModelStrategy``.
706+
If none of the pre-built write strategies suits your needs, you can
707+
create your own. A custom write model strategy is a Java class that
708+
implements ``WriteModelStrategy``.
708709

709710
To configure your Sink Connector to use the custom ``WriteModelStrategy``,
710711
follow the steps below:
711712

712713
1. Create a class that implements the ``WriteModelStrategy`` interface
713714
and overrides the ``createWriteModel(SinkDocument)`` method.
714715

715-
#. Compile the class to a ``.class`` file or JAR.
716+
#. Compile the class to a JAR file.
716717

717-
#. Add the class to the Class Path / Plugin Path for Kafka workers.
718+
#. Add the compiled JAR to the Class Path / Plugin Path for Kafka workers.
718719
For more information about plugin paths, see the `Confluent documentation
719720
<https://docs.confluent.io/current/connect/managing/community.html>`__.
720721

721-
#. Specify your Java class in the ``mongodb.writemodel.strategy``
722+
.. note::
723+
724+
Plugins are loaded in isolation, so when deploying a custom write
725+
strategy, both the connector JAR and the custom write model strategy
726+
JAR should be on the same path as shown in the following example:
727+
728+
| ``mongodb-kafka-connect/lib/mongo-kafka-connect-all.jar``
729+
| ``mongodb-kafka-connect/lib/custom-write-model-strategy.jar``
730+
731+
732+
#. Specify your Java class in the ``writemodel.strategy``
722733
:ref:`configuration setting <kafka-sink-properties>`.
723734

724735
The following is an example of a custom write strategy. It extracts the
@@ -736,18 +747,18 @@ in the MongoDB collection that matches the ID from the value document.
736747
*/
737748

738749
public class CustomWriteModelStrategy implements WriteModelStrategy {
739-
750+
740751
private static String ID = "_id";
741752
@Override
742753
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
743754
BsonDocument changeStreamDocument = document.getValueDoc()
744755
.orElseThrow(() -> new DataException("Missing value document"));
745-
756+
746757
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
747758
if (fullDocument.isEmpty()) {
748759
return null; // Return null to indicate no op.
749760
}
750-
761+
751762
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
752763
}
753764
}

source/kafka-sink-properties.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ data to sink to MongoDB. For an example configuration file, see
213213
- string
214214
- | The class that specifies the ``WriteModel`` to use for :manual:`Bulk
215215
Writes </core/bulk-write-operations/index.html>`. See :ref:`Custom
216-
Write Models <custom-write-models>` for more information.
216+
Write Model Strategy <custom-write-model-strategy>` for more information.
217217
|
218218
| **Default**: ``com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy``
219219
| **Accepted Values**: A fully qualified Java class name of a class that implements ``WriteModelStrategy``

0 commit comments

Comments
 (0)