-
Notifications
You must be signed in to change notification settings - Fork 266
PHPLIB-351: Cluster and DB-level change streams and startAtOperationTime #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b1e8d8f
to
04ddc86
Compare
* @param RuntimeException $exception | ||
* @return boolean | ||
*/ | ||
private function isResumableError(RuntimeException $exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The criteria for a resumable error changed since the spec was first implemented in 1.3.0. See: Resumable Error.
} | ||
|
||
if (in_array($exception->getCode(), [self::$errorCodeCappedPositionLost, self::$errorCodeCursorKilled, self::$errorCodeInterrupted])) { | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My read of the new resumable error criteria lead me to conclude that checking the error message string for "not master" or "node is recovering" was redundant. This logic takes the into account and I've opened mongodb/specifications#365 to correct the spec.
@@ -79,22 +92,44 @@ class Watch implements Executable | |||
* * resumeAfter (document): Specifies the logical starting point for the | |||
* new change stream. | |||
* | |||
* Using this option in conjunction with "startAtOperationTime" will | |||
* result in a server error. The options are mutually exclusive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per startatoperationtime
in the spec, we're not supposed to raise a client-side error if both options are used.
} | ||
|
||
$this->databaseName = (string) $databaseName; | ||
$this->collectionName = (string) $collectionName; | ||
$this->collectionName = isset($collectionName) ? (string) $collectionName : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aggregate
takes care of handling the null
Collection name (per PHPLIB-358).
final public function commandSucceeded(CommandSucceededEvent $event) | ||
{ | ||
if ($event->getCommandName() !== 'aggregate') { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered throwing a LogicException here, as this class should never be registered as a CommandSubscriber except for the brief duration that we execute an aggregate
command; however, I don't think that's necessary. Open to changing this if you'd prefer the extra sanity check, though.
|
||
$reply = $event->getReply(); | ||
|
||
if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By this point, we've already verified that the server's wire version is >= 7. If operationTime
isn't a Timestamp, something is quite wrong. I just wanted to be defensive.
Failing silently also makes sense as this resume behavior is internal behavior (akin to us creating an "implicit" session in the constructor and not bothering the user if that fails).
$operation = new Aggregate( | ||
'admin', | ||
null, | ||
[['$currentOp' => (object) []]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not specifying any options nor am I asserting the response as this is only testing that executing such a command with a null
collection name results in { aggregate: 1 }
in the command document.
@@ -130,6 +131,100 @@ function(stdClass $command) use (&$commands) { | |||
$this->assertSame($expectedCommands, $commands); | |||
} | |||
|
|||
public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime() | |||
{ | |||
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally inserted a document before starting the change stream, but that lead to a strange behavior. The initial aggregate
command would not find the document (of course), but it's response would include an operationTime
time based on the document's insertion since it was the most recent entry in the oplog.
When we kill and resume the cursor in the subsequent test activity, that operation time would be used for startAtOperationTime
in the resuming aggregate
and the document would show up.
I was under the impression that using the operation time from aggregate
was intended to prevent extra documents from showing up from before the launch point of the original change stream, but perhaps this is a known edge case.
Perhaps @daprahamian, @kevinAlbs, or @ShaneHarvey can chime in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShaneHarvey mentioned that he and @behackett talked about this some time ago and concluded that it was expected behavior (if no other write operation on the server occurs between the insert
and aggregate
nor does enough time elapse for a periodic no-op entry to be created).
They tossed around the idea of having the driver manually increment the operation time returned by aggregate
, but nothing came of that. Perhaps they talked to some folks on the server team and they advised against it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I just talked with Bernie, incrementing the operation time does not really solve the problem. We can fix the issue in SPEC-1122.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aye, SPEC-1122 ("Update startAtOperationTime during change stream iteration" for those without access) would help provided we're not resuming because the first getMore
has failed; however, I'll admit that this is a fairly contrived edge case more likely to show up in a test suite than real life.
@@ -484,16 +579,8 @@ public function testTypeMapOption(array $typeMap, $expectedChangeDocument) | |||
|
|||
$changeStream->next(); | |||
$this->assertTrue($changeStream->valid()); | |||
$changeDocument = $changeStream->current(); | |||
|
|||
// Unset the resume token and namespace, which are intentionally omitted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to assertMatchesDocument()
, we can ignore extra fields in the actual document that are not present in our expected value. This allows us to do without these manual unsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that also address the extra "clusterTime" information that made tests fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it does.
$extraKeys = []; | ||
|
||
/* Avoid unsetting fields while we're iterating on the ArrayObject to | ||
* work around https://bugs.php.net/bug.php?id=70246 */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a great time discovering this bug. 😛
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For fun I would say: @sgolemon, fix yo shit ;-)
af295aa
to
20d9148
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left some comments/questions, but LGTM.
@@ -289,7 +292,7 @@ public function execute(Server $server) | |||
private function createCommand(Server $server) | |||
{ | |||
$cmd = [ | |||
'aggregate' => $this->collectionName, | |||
'aggregate' => isset($this->collectionName) ? $this->collectionName : 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of "1" wasn't really documented: mongodb/docs#3357
src/Operation/Watch.php
Outdated
* specifying null for both the database and collection name. | ||
* | ||
* @param string|null $databaseName Database name | ||
* @param string|null $collectionName Collection name | ||
* @param array $pipeline List of pipeline operations | ||
* @param array $options Command options | ||
* @param Manager $manager Manager instance from the driver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is "Manager" the last option in this list, but the first argument in the method declaration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Looks like an old typo: https://github.com/mongodb/mongo-php-library/blame/master/src/Operation/Watch.php#L93
I'll fix it.
@@ -484,16 +579,8 @@ public function testTypeMapOption(array $typeMap, $expectedChangeDocument) | |||
|
|||
$changeStream->next(); | |||
$this->assertTrue($changeStream->valid()); | |||
$changeDocument = $changeStream->current(); | |||
|
|||
// Unset the resume token and namespace, which are intentionally omitted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that also address the extra "clusterTime" information that made tests fail?
$extraKeys = []; | ||
|
||
/* Avoid unsetting fields while we're iterating on the ArrayObject to | ||
* work around https://bugs.php.net/bug.php?id=70246 */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For fun I would say: @sgolemon, fix yo shit ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM other than 1 small comment!
src/Operation/Watch.php
Outdated
} | ||
|
||
/* In the absence of an explicit session, create one to ensure that the | ||
* initial aggregation and any resume attempts can use the same session | ||
* ("implicit from the user's perspective per PHPLIB-342). We can ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a " missing after 'perspective'
763f85d
to
21e1f30
Compare
MongoDB 4.0 adds a "clusterTime" field to change stream documents. Rather than add conditional logic to expect this in tests, implement a new comparison function that fields not found in our expected document.
97bd301
to
ef31465
Compare
MongoDB\Driver\Exception\RuntimeException does not conflict in this file, so we can also remove its alias.
Skip redundant error message checks, since their codes do not overlap with the three errors specifically excluded by the spec and all other server errors are resumable. Deprecate the unused class constant, which can be removed in 2.0 (PHPLIB-360).
https://jira.mongodb.org/browse/PHPLIB-351
https://jira.mongodb.org/browse/PHPLIB-358