-
Notifications
You must be signed in to change notification settings - Fork 266
PHPLIB-451: ChangeStream::rewind() should never execute getMore #636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/Model/TailableCursorIterator.php
Outdated
* @param Cursor $cursor | ||
* @param boolean $isFirstBatchIsEmpty | ||
*/ | ||
public function __construct(Cursor $cursor, $isFirstBatchIsEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$isFirstBatchIsEmpty
is a little oddly phrased to me. Would $isFirstBatchEmpty
make more sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Total brain fart. This was originally firstBatchIsEmpty
and I must have forgotten to rephrase that when I switched to an is
prefix. Will fix.
src/Operation/Watch.php
Outdated
return; | ||
} | ||
|
||
$this->operationTime = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to set operationTime
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just being a bit defensive here. If aggregate
fails, I didn't want any stale values to remain in place. Granted, if it does fail then the change stream will expose the exception and won't bother to resume anyway.
Now that you bring this up, I do think there's a subtle behavioral difference with this PR in that we now always collect operationTime
from every aggregate
response. Previously, we would only collect operationTime
(and use APM) for the very first aggregate
command executed by a Watch instance, and skip APM if $this->operationTime
was already set going into a second aggregate
execution (i.e. resume attempt). Resume Process in the spec does refer to "original aggregation", but I wasn't sure if that was simply to differentiate from a getMore
command or it was actually specifying not to capture an operation time from a resuming aggregate
.
I don't see why a resuming aggregate
's operation time would be undesirable, but it would probably be best to revert this to the original behavior and only capture operationTime
on the very first aggregate
.
Note: this will be a moot point for 4.2+, since postBatchOperationTime
will be available for empty batches. That said, it's still very relevant for a 4.0 server.
/* If we've already captured an operation time or the server does not | ||
* support resuming from an operation time (e.g. MongoDB 3.6), execute | ||
* the aggregation directly and return its cursor. */ | ||
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original logic was actually a bit backwards, but it worked in practice.
The server has reported operation times for as long as change streams have been supported (since 3.6); however, the startAtOperationTime
option has only been supported since 4.0. Previously, createResumeCallable
would decide to use startAtOperationTime
purely by whether $this->operationTime
was set, as we would only ever set that if this condition was true and we ran the aggregate
command through APM.
With this PR, we need to always check whether aggregate
's firstBatch
is empty and therefore always run through APM. That means that $this->operationTime
was always captured and could result in the driver attempting to use startAtOperationTime
to resume against a 3.6 server (in the absence of a resume token).
To address this, I moved the server_supports_feature
check to createResumeCallable
before we decide to use that option. That fixes the issue with resuming against a 3.6 server when no changes have been returned. This also means that the APM callback can continue collecting $this->operationTime
all the time. Alternatively, I suppose we could have added a server_supports_feature
within the APM callback and decide whether to ignore the operation time, but createResumeCallable
seems more straightforward since that's when we actually attempt to specify the option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, thanks!
This will avoid spurious test failures due to timing and ensure that each acknowledged write is returned in the next change stream iteration.
This can be used to ensure rewinding is a NOP (i.e. getMore will not be executed) for tailable cursors with an empty firstBatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me!
Tests were revised to no longer expect rewind() to encounter an error and resume. In most tests, rewind() is a NOP because the change stream has no events to return upon creation. Since Watch now always executes aggregate commands with APM, the check for startAtOperationTime support was moved to createResumeCallable(). We continue to only capture operationTime from the very first aggregate command. Various assertions for current() returning null were also changed to instead check valid(). Both forms are equivalent, but checking valid() is more consistent with our iteration examples.
https://jira.mongodb.org/browse/PHPLIB-451
https://jira.mongodb.org/browse/PHPLIB-416
A few highlights from the commit messages:
w:majority
to avoid spurious test failures due to timing.getMore
will not be executed) for tailable cursors with an emptyfirstBatch
.rewind()
to encounter an error and resume. In most tests,rewind()
is a NOP because the change stream has no events to return upon creation.startAtOperationTime
support was moved tocreateResumeCallable()
.current()
returning null were also changed to instead checkvalid()
. Both forms are equivalent, but checkingvalid()
is more consistent with our iteration examples.