Skip to content

Commit 358ce92

Browse files
authored
Added UpdateOneDefaultStrategy (#144)
A WriteStrategy that performs a `$set` upsert operation. For scenarios where the topic data contains fragments of the desired data in MongoDB KAFKA-255
1 parent c6397c0 commit 358ce92

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* Original Work: Apache License, Version 2.0, Copyright 2017 Hans-Peter Grahsl.
17+
*/
18+
19+
package com.mongodb.kafka.connect.sink.writemodel.strategy;
20+
21+
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.ID_FIELD;
22+
23+
import org.apache.kafka.connect.errors.DataException;
24+
25+
import org.bson.BsonDocument;
26+
import org.bson.BsonValue;
27+
28+
import com.mongodb.client.model.UpdateOneModel;
29+
import com.mongodb.client.model.UpdateOptions;
30+
import com.mongodb.client.model.WriteModel;
31+
32+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
33+
34+
public class UpdateOneDefaultStrategy implements WriteModelStrategy {
35+
36+
private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
37+
38+
@Override
39+
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
40+
BsonDocument vd =
41+
document
42+
.getValueDoc()
43+
.orElseThrow(
44+
() ->
45+
new DataException(
46+
"Could not build the WriteModel,the value document was missing unexpectedly"));
47+
48+
BsonValue idValue = vd.get(ID_FIELD);
49+
if (idValue == null) {
50+
throw new DataException(
51+
"Could not build the WriteModel,the `_id` field was missing unexpectedly");
52+
}
53+
vd.remove(ID_FIELD);
54+
return new UpdateOneModel<>(
55+
new BsonDocument(ID_FIELD, idValue), new BsonDocument("$set", vd), UPDATE_OPTIONS);
56+
}
57+
}

src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class WriteModelStrategyTest {
5656
new ReplaceOneDefaultStrategy();
5757
private static final ReplaceOneBusinessKeyStrategy REPLACE_ONE_BUSINESS_KEY_STRATEGY =
5858
new ReplaceOneBusinessKeyStrategy();
59+
private static final UpdateOneDefaultStrategy UPDATE_ONE_DEFAULT_STRATEGY =
60+
new UpdateOneDefaultStrategy();
5961
private static final UpdateOneTimestampsStrategy UPDATE_ONE_TIMESTAMPS_STRATEGY =
6062
new UpdateOneTimestampsStrategy();
6163
private static final UpdateOneBusinessKeyTimestampStrategy
@@ -231,6 +233,31 @@ void testReplaceOneBusinessKeyStrategyPartialWithValidSinkDocument() {
231233
"replacement expected to be done in upsert mode");
232234
}
233235

236+
@Test
237+
@DisplayName(
238+
"when sink document is valid for UpdateOneDefaultStrategy then correct UpdateOneModel")
239+
void testUpdateOneDefaultStrategyWithValidSinkDocument() {
240+
WriteModel<BsonDocument> result =
241+
UPDATE_ONE_DEFAULT_STRATEGY.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
242+
assertTrue(result instanceof UpdateOneModel, "result expected to be of type UpdateOneModel");
243+
244+
UpdateOneModel<BsonDocument> writeModel = (UpdateOneModel<BsonDocument>) result;
245+
246+
BsonDocument setValuesDocument = VALUE_DOC.clone();
247+
setValuesDocument.remove("_id");
248+
BsonDocument expectedSetDocument = new BsonDocument("$set", setValuesDocument);
249+
250+
assertEquals(
251+
expectedSetDocument,
252+
writeModel.getUpdate(),
253+
"replacement doc not matching what is expected");
254+
assertTrue(
255+
writeModel.getFilter() instanceof BsonDocument,
256+
"filter expected to be of type BsonDocument");
257+
assertEquals(ID_FILTER, writeModel.getFilter());
258+
assertTrue(writeModel.getOptions().isUpsert(), "update expected to be done in upsert mode");
259+
}
260+
234261
@Test
235262
@DisplayName(
236263
"when sink document is valid for UpdateOneTimestampsStrategy then correct UpdateOneModel")
@@ -412,6 +439,10 @@ void testIEmptyOrMissingSinkDocumentData() {
412439
() ->
413440
REPLACE_ONE_BUSINESS_KEY_PARTIAL_STRATEGY.createWriteModel(
414441
SINK_DOCUMENT_EMPTY)),
442+
() ->
443+
assertThrows(
444+
DataException.class,
445+
() -> UPDATE_ONE_DEFAULT_STRATEGY.createWriteModel(SINK_DOCUMENT_EMPTY)),
415446
() ->
416447
assertThrows(
417448
DataException.class,

0 commit comments

Comments
 (0)