Skip to content

Use majority concern for all change stream test collections #1375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 51 additions & 44 deletions src/mongocxx/test/change_streams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ using bsoncxx::builder::basic::make_document;

using namespace mongocxx;

mongocxx::collection
make_test_coll(mongocxx::client& client, bsoncxx::stdx::string_view db_name, bsoncxx::stdx::string_view coll_name) {
write_concern wc_majority;
wc_majority.acknowledge_level(write_concern::level::k_majority);

read_concern rc_majority;
rc_majority.acknowledge_level(read_concern::level::k_majority);

auto db = client[db_name];
auto coll = db[coll_name];

coll.drop();
coll = db.create_collection(coll_name);

coll.write_concern(wc_majority);
coll.read_concern(rc_majority);

return coll;
}

// Create a single-item document.
// E.g. doc("foo", 123) creates {"foo":123}.
template <typename T>
Expand Down Expand Up @@ -91,9 +111,9 @@ auto const watch_interpose = [](mongoc_collection_t const*, bson_t const*, bson_

auto const destroy_interpose = [](mongoc_change_stream_t*) -> void {};

TEST_CASE("Change stream options") {
TEST_CASE("Change stream options", "[change_stream]") {
instance::current();
client mongodb_client{uri{}, test_util::add_test_server_api()};
client client{uri{}, test_util::add_test_server_api()};

if (!test_util::is_replica_set()) {
SKIP("change streams require replica set");
Expand All @@ -108,25 +128,20 @@ TEST_CASE("Change stream options") {
cs_opts.resume_after(resume_after.view());
cs_opts.start_after(start_after.view());

auto cs = mongodb_client.watch(cs_opts);
auto cs = client.watch(cs_opts);
REQUIRE_THROWS(cs.begin());
}
}

TEST_CASE("Spec Prose Tests") {
TEST_CASE("Spec Prose Tests", "[change_stream]") {
instance::current();
client client{uri{}, test_util::add_test_server_api()};

if (!test_util::is_replica_set()) {
SKIP("change streams require replica set");
}

auto db = client["db"];
auto coll = db["coll"];
coll.drop();

write_concern wc_majority;
wc_majority.majority(std::chrono::seconds(30));
auto coll = make_test_coll(client, "db", "coll");

// As a sanity check, we implement the first prose test. The behavior tested
// by the prose tests is implemented and tested by the C driver, so we won't
Expand All @@ -143,21 +158,18 @@ TEST_CASE("Spec Prose Tests") {
auto doc2 = make_document(kvp("b", 2));
auto doc3 = make_document(kvp("c", 3));

options::insert insert_opts{};
insert_opts.write_concern(wc_majority);

{
auto res = coll.insert_one(doc1.view(), insert_opts);
auto res = coll.insert_one(doc1.view());
REQUIRE(res);
REQUIRE(res->result().inserted_count() == 1);
}
{
auto res = coll.insert_one(doc2.view(), insert_opts);
auto res = coll.insert_one(doc2.view());
REQUIRE(res);
REQUIRE(res->result().inserted_count() == 1);
}
{
auto res = coll.insert_one(doc3.view(), insert_opts);
auto res = coll.insert_one(doc3.view());
REQUIRE(res);
REQUIRE(res->result().inserted_count() == 1);
}
Expand Down Expand Up @@ -188,14 +200,13 @@ TEST_CASE("Spec Prose Tests") {
}
}

TEST_CASE("Mock streams and error-handling") {
TEST_CASE("Mock streams and error-handling", "[change_stream]") {
MOCK_CHANGE_STREAM;

instance::current();
client mongodb_client{uri{}, test_util::add_test_server_api()};
client client{uri{}, test_util::add_test_server_api()};
options::change_stream options{};
database db = mongodb_client["streams"];
collection events = db["events"];
collection events = make_test_coll(client, "streams", "events");

// nop watch and destroy
collection_watch->interpose(watch_interpose).forever();
Expand Down Expand Up @@ -360,9 +371,9 @@ TEST_CASE("Mock streams and error-handling") {
return nullptr;
});

mongodb_client["db"]["collection"].watch(cs_pipeline, cs_opts);
mongodb_client["db"].watch(cs_pipeline, cs_opts);
mongodb_client.watch(cs_pipeline, cs_opts);
client["db"]["collection"].watch(cs_pipeline, cs_opts);
client["db"].watch(cs_pipeline, cs_opts);
client.watch(cs_pipeline, cs_opts);

// Ensure the interpose was called.
REQUIRE(collection_watch_called);
Expand All @@ -372,35 +383,33 @@ TEST_CASE("Mock streams and error-handling") {
}

// Put this before other tests which assume the collections already exists.
TEST_CASE("Create streams.events and assert we can read a single event", "[min36]") {
TEST_CASE("Create streams.events and assert we can read a single event", "[change_stream]") {
instance::current();
client mongodb_client{uri{}, test_util::add_test_server_api()};
client client{uri{}, test_util::add_test_server_api()};
if (!test_util::is_replica_set()) {
SKIP("change streams require replica set");
}

collection events = mongodb_client["streams"]["events"];
events.drop();
collection events = make_test_coll(client, "streams", "events");

events.insert_one(make_document(kvp("dummy", "doc")));
change_stream stream = events.watch();
events.insert_one(make_document(kvp("another", "event")));
REQUIRE(std::distance(stream.begin(), stream.end()) == 1);

// because we watch events2 in a test
auto events2 = mongodb_client["streams"]["events2"];
events2.drop();
auto events2 = make_test_coll(client, "streams", "events2");
}

TEST_CASE("Give an invalid pipeline", "[min36]") {
TEST_CASE("Give an invalid pipeline", "[change_stream]") {
instance::current();
client mongodb_client{uri{}, test_util::add_test_server_api()};
client client{uri{}, test_util::add_test_server_api()};
if (!test_util::is_replica_set()) {
SKIP("change streams require replica set");
}

options::change_stream options{};
collection events = mongodb_client["streams"]["events"];
collection events = make_test_coll(client, "streams", "events");

pipeline p;
p.match(make_document(kvp("$foo", -1)));
Expand All @@ -421,16 +430,15 @@ TEST_CASE("Give an invalid pipeline", "[min36]") {
}
}

TEST_CASE("Documentation Examples", "[min36]") {
TEST_CASE("Documentation Examples", "[change_stream]") {
instance::current();
mongocxx::pool pool{uri{}, options::pool(test_util::add_test_server_api())};
auto mongodb_client = pool.acquire();
auto client = pool.acquire();
if (!test_util::is_replica_set()) {
SKIP("change streams require replica set");
}

collection events = (*mongodb_client)["streams"]["events"];
collection inventory = events; // doc examples use this name
collection inventory = make_test_coll(*client, "streams", "events");

std::atomic_bool insert_thread_done;
insert_thread_done.store(false);
Expand Down Expand Up @@ -525,20 +533,19 @@ TEST_CASE("Documentation Examples", "[min36]") {

insert_thread_done = true;
insert_thread.join();
inventory.drop();
}

TEST_CASE("Watch 2 collections", "[min36]") {
TEST_CASE("Watch 2 collections", "[change_stream]") {
instance::current();
client mongodb_client{uri{}, test_util::add_test_server_api()};
client client{uri{}, test_util::add_test_server_api()};
if (!test_util::is_replica_set()) {
SKIP("change streams require replica set");
}

options::change_stream options{};

collection events = mongodb_client["streams"]["events"];
collection events2 = mongodb_client["streams"]["events2"];
collection events = make_test_coll(client, "streams", "events");
collection events2 = make_test_coll(client, "streams", "events2");

change_stream x = events.watch();
change_stream x2 = events.watch();
Expand Down Expand Up @@ -575,15 +582,15 @@ TEST_CASE("Watch 2 collections", "[min36]") {
}
}

TEST_CASE("Watch a Collection", "[min36]") {
TEST_CASE("Watch a Collection", "[change_stream]") {
instance::current();
client mongodb_client{uri{}, test_util::add_test_server_api()};
client client{uri{}, test_util::add_test_server_api()};
if (!test_util::is_replica_set()) {
SKIP("change streams require replica set");
}

options::change_stream options{};
collection events = mongodb_client["streams"]["events"];
collection events = make_test_coll(client, "streams", "events");

change_stream x = events.watch();

Expand Down