Skip to content

SWIFT-1610, SWIFT-1378, SWIFT-1632: Spec tests sync + expose clusterTime on ChangeStreamEvent #775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Sources/MongoSwift/ChangeStreamEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
/// Only present for server versions 6.0 and above.
public let wallTime: Date?

/// The cluster time at which the change occurred. Only present for server versions 4.0 and above.
public let clusterTime: BSONTimestamp?

/**
* Always present for operations of type `insert` and `replace`. Also present for operations of type `update` if
* the user has specified `.updateLookup` for the `fullDocument` option in the `ChangeStreamOptions` used to create
Expand All @@ -136,7 +139,7 @@ public struct ChangeStreamEvent<T: Codable>: Codable {
public let fullDocument: T?

private enum CodingKeys: String, CodingKey {
case operationType, _id, ns, to, documentKey, updateDescription, wallTime, fullDocument
case operationType, _id, ns, to, documentKey, updateDescription, wallTime, clusterTime, fullDocument
}

// Custom decode method to work around the fact that `invalidate` events do not have an `ns` field in the raw
Expand Down Expand Up @@ -164,6 +167,7 @@ public struct ChangeStreamEvent<T: Codable>: Codable {

self.documentKey = try container.decodeIfPresent(BSONDocument.self, forKey: .documentKey)
self.wallTime = try container.decodeIfPresent(Date.self, forKey: .wallTime)
self.clusterTime = try container.decodeIfPresent(BSONTimestamp.self, forKey: .clusterTime)
self.updateDescription = try container.decodeIfPresent(UpdateDescription.self, forKey: .updateDescription)
self.fullDocument = try container.decodeIfPresent(T.self, forKey: .fullDocument)
}
Expand Down
12 changes: 5 additions & 7 deletions Sources/TestsCommon/APMUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,11 @@ public class TestCommandMonitor: CommandEventHandler {
}

public enum EventType: String, Decodable {
case commandStartedEvent, commandSucceededEvent, commandFailedEvent,
connectionCreatedEvent, connectionReadyEvent, connectionClosedEvent,
connectionCheckedInEvent, connectionCheckedOutEvent, connectionCheckOutFailedEvent,
poolCreatedEvent, poolReadyEvent, poolClearedEvent, poolClosedEvent,
topologyDescriptionChanged, topologyOpening, topologyClosed, serverDescriptionChanged,
serverOpening, serverClosed, serverHeartbeatStarted, serverHeartbeatSucceeded,
serverHeartbeatFailed
case commandStartedEvent, commandSucceededEvent, commandFailedEvent, connectionCreatedEvent, connectionReadyEvent,
connectionClosedEvent, connectionCheckedInEvent, connectionCheckOutStartedEvent, connectionCheckedOutEvent,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only actual change here mod reorganizing lines was adding connectionCheckOutStartedEvent which one of the tests I synced uses

connectionCheckOutFailedEvent, poolCreatedEvent, poolReadyEvent, poolClearedEvent, poolClosedEvent,
topologyDescriptionChanged, topologyOpening, topologyClosed, serverDescriptionChanged, serverOpening,
serverClosed, serverHeartbeatStarted, serverHeartbeatSucceeded, serverHeartbeatFailed
}

extension CommandEvent {
Expand Down
6 changes: 3 additions & 3 deletions Tests/MongoSwiftTests/AsyncAwaitTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ extension MongoClient {
}

internal func getUnmetRequirement(_ testRequirement: TestRequirement) async throws -> UnmetRequirement? {
async let topologyType = try self.topologyType()
async let serverVersion = try self.serverVersion()
async let params = try self.serverParameters()
async let topologyType = try await self.topologyType()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happened to notice while adding new test using this that this method was actually still calling the blocking methods that wait() to get these values. oops

async let serverVersion = try await self.serverVersion()
async let params = try await self.serverParameters()
return try await testRequirement.getUnmetRequirement(givenCurrent: serverVersion, topologyType, params)
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/MongoSwiftTests/AsyncAwaitTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ final class MongoCursorAsyncAwaitTests: MongoSwiftTestCase {
}

testAsync {
let opts = CreateCollectionOptions(capped: true, size: 5)
let opts = CreateCollectionOptions(capped: true, max: 5, size: 100_000)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unrelated but this test starting failing against latest once SERVER-67246 went in. size is actually supposed to be the max size in bytes, not the number of documents (that's what max is for).
previously the server forced you to have a minimum max size of 4096, so the size option here was basically ignored. but that restriction was removed and it is now valid to actually have a 5 byte capped collection. due to this, no documents could actually fit in the collection so this test started to fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, i'm surprised the previously incorrect value was silently ignored by the server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's interesting; per the docs here:

If the size field is less than or equal to 4096, then the collection will have a cap of 4096 bytes.

try await self.withTestNamespace(collectionOptions: opts) { _, _, coll in
try await coll.insertMany([["x": 1], ["x": 2], ["x": 3]])

Expand Down
28 changes: 27 additions & 1 deletion Tests/MongoSwiftTests/ChangeStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ final class ChangeStreamTests: MongoSwiftTestCase {
// TODO: SWIFT-1458 Unskip.
"change-streams-showExpandedEvents.json",
// TODO: SWIFT-1472 Unskip.
"change-streams-pre_and_post_images.json"
"change-streams-pre_and_post_images.json",
// TODO: SWIFT-1614 Unskip.
"change-streams-disambiguatedPaths.json"
]
let tests = try retrieveSpecTestFiles(
specName: "change-streams",
Expand All @@ -182,5 +184,29 @@ final class ChangeStreamTests: MongoSwiftTestCase {
let testRunner = try await UnifiedTestRunner()
try await testRunner.runFiles(tests)
}

func testClusterTime() async throws {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while there is a spec test, we run all of the spec tests with a ChangeStream<BSONDocument> since some of them do projections, etc. and don't match the model type. so this test is to assert we correctly deserialize the clusterTime when ChangeStreamEvent is used.

try await self.withTestClient { client in
// cluster time is only included as of 4.0.
let requirement = TestRequirement(
minServerVersion: ServerVersion(major: 4, minor: 0, patch: 0),
acceptableTopologies: [.replicaSet, .sharded, .shardedReplicaSet, .loadBalanced]
)
let unmetRequirement = try await client.getUnmetRequirement(requirement)
guard unmetRequirement == nil else {
printSkipMessage(testName: self.name, unmetRequirement: unmetRequirement!)
return
}
let db = client.db(Self.testDatabase)
try await db.collection(self.getCollectionName()).drop()
let coll = try await db.createCollection(self.getCollectionName())

let stream = try await coll.watch()

_ = try await coll.insertOne(["x": 1])
let event = try await stream.next()
expect(event?.clusterTime).toNot(beNil())
}
}
}
#endif
5 changes: 3 additions & 2 deletions Tests/MongoSwiftTests/Failpoint.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ extension FailPoint {
/// Enables the failpoint, and returns a `EnabledFailpoint` which can handle disabling when needed
internal func enable(
using client: MongoClient,
options: RunCommandOptions? = nil
options: RunCommandOptions? = nil,
session: ClientSession? = nil
) async throws -> EnabledFailpoint {
try await client.db("admin").runCommand(self.failPoint, options: options)
try await client.db("admin").runCommand(self.failPoint, options: options, session: session)
return EnabledFailpoint(failPoint: self, client: client)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ struct UnifiedFailPoint: UnifiedOperationProtocol {
/// The client entity to use for setting the failpoint.
let client: String

/// Optional name of a session entity to use for setting the failpoint.
let session: String?

static var knownArguments: Set<String> {
["failPoint", "client"]
["failPoint", "client", "session"]
}

func execute(on _: UnifiedOperation.Object, context: Context) async throws -> UnifiedOperationResult {
let testClient = try context.entities.getEntity(id: self.client).asTestClient()
let session = try context.entities.resolveSession(id: self.session)
let opts = RunCommandOptions(readPreference: .primary)
let fpGuard = try await self.failPoint.enable(using: testClient.client, options: opts)
let fpGuard = try await self.failPoint.enable(using: testClient.client, options: opts, session: session)
context.enabledFailPoints.append(fpGuard)
return .none
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
{
"description": "change-streams-clusterTime",
"schemaVersion": "1.3",
"createEntities": [
{
"client": {
"id": "client0",
"useMultipleMongoses": false
}
},
{
"database": {
"id": "database0",
"client": "client0",
"databaseName": "database0"
}
},
{
"collection": {
"id": "collection0",
"database": "database0",
"collectionName": "collection0"
}
}
],
"runOnRequirements": [
{
"minServerVersion": "4.0.0",
"topologies": [
"replicaset",
"sharded-replicaset",
"load-balanced",
"sharded"
]
}
],
"initialData": [
{
"collectionName": "collection0",
"databaseName": "database0",
"documents": []
}
],
"tests": [
{
"description": "clusterTime is present",
"operations": [
{
"name": "createChangeStream",
"object": "collection0",
"arguments": {
"pipeline": []
},
"saveResultAsEntity": "changeStream0"
},
{
"name": "insertOne",
"object": "collection0",
"arguments": {
"document": {
"_id": 1
}
}
},
{
"name": "iterateUntilDocumentOrError",
"object": "changeStream0",
"expectResult": {
"ns": {
"db": "database0",
"coll": "collection0"
},
"clusterTime": {
"$$exists": true
}
}
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
description: "change-streams-clusterTime"
schemaVersion: "1.3"
createEntities:
- client:
id: &client0 client0
useMultipleMongoses: false
- database:
id: &database0 database0
client: *client0
databaseName: *database0
- collection:
id: &collection0 collection0
database: *database0
collectionName: *collection0

runOnRequirements:
- minServerVersion: "4.0.0"
topologies: [ replicaset, sharded-replicaset, load-balanced, sharded ]

initialData:
- collectionName: *collection0
databaseName: *database0
documents: []

tests:
- description: "clusterTime is present"
operations:
- name: createChangeStream
object: *collection0
arguments: { pipeline: [] }
saveResultAsEntity: &changeStream0 changeStream0
- name: insertOne
object: *collection0
arguments:
document: { _id: 1 }
- name: iterateUntilDocumentOrError
object: *changeStream0
expectResult:
ns: { db: *database0, coll: *collection0 }
clusterTime: { $$exists: true }
Loading