Skip to content

Commit 890b71f

Browse files
committed
PHPLIB-342: Change streams should use the same session when resuming
1 parent 0331b37 commit 890b71f

File tree

3 files changed

+64
-0
lines changed

3 files changed

+64
-0
lines changed

src/ChangeStream.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ public function next()
101101
$this->hasAdvanced = true;
102102
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
103103
}
104+
// If the cursorId is 0, the server has invalidated the cursor so we
105+
// will never perform another getMore. This means that we cannot
106+
// resume and we can therefore unset the resumeCallable, which will
107+
// free any reference to Watch. This will also free the only
108+
// reference to an implicit session, since any such reference
109+
// belongs to Watch.
110+
if ($this->getCursorId() === 0) {
111+
unset($this->resumeCallable);
112+
}
104113
} catch (RuntimeException $e) {
105114
if (strpos($e->getMessage(), "not master") !== false) {
106115
$resumable = true;
@@ -130,6 +139,15 @@ public function rewind()
130139
$this->hasAdvanced = true;
131140
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
132141
}
142+
// If the cursorId is 0, the server has invalidated the cursor so we
143+
// will never perform another getMore. This means that we cannot
144+
// resume and we can therefore unset the resumeCallable, which will
145+
// free any reference to Watch. This will also free the only
146+
// reference to an implicit session, since any such reference
147+
// belongs to Watch.
148+
if ($this->getCursorId() === 0) {
149+
unset($this->resumeCallable);
150+
}
133151
} catch (RuntimeException $e) {
134152
if (strpos($e->getMessage(), "not master") !== false) {
135153
$resumable = true;

src/Operation/Watch.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
110110
}
111111
}
112112

113+
if ( ! isset($options['session'])) {
114+
try {
115+
$options['session'] = $manager->startSession();
116+
} catch (DriverRuntimeException $e) {}
117+
}
118+
113119
$this->databaseName = (string) $databaseName;
114120
$this->collectionName = (string) $collectionName;
115121
$this->pipeline = $pipeline;

tests/Operation/WatchFunctionalTest.php

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,46 @@ public function testResumeTokenNotFoundAdvancesKey()
588588
$this->assertSame(2, $changeStream->key());
589589
}
590590

591+
public function testSessionPersistsAfterResume()
592+
{
593+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
594+
595+
$changeStream = null;
596+
$originalSession = null;
597+
$sessionAfterResume = null;
598+
599+
// We want to ensure that the lsid of the initial aggregate matches the
600+
// lsid of any aggregates after the change stream resumes. After
601+
// PHPC-1152 is complete, we will ensure that the lsid of the initial
602+
// aggregate matches the lsid of any subsequent aggregates and getMores.
603+
(new CommandObserver)->observe(
604+
function() use ($operation, &$changeStream) {
605+
$changeStream = $operation->execute($this->getPrimaryServer());
606+
},
607+
function($changeStream) use (&$originalSession) {
608+
if (isset($changeStream->aggregate)) {
609+
$originalSession = $changeStream->lsid;
610+
}
611+
}
612+
);
613+
614+
$changeStream->rewind();
615+
$this->killChangeStreamCursor($changeStream);
616+
617+
(new CommandObserver)->observe(
618+
function() use (&$changeStream) {
619+
$changeStream->next();
620+
},
621+
function ($changeStream) use (&$sessionAfterResume) {
622+
if (isset($changeStream->aggregate)) {
623+
$sessionAfterResume = $changeStream->lsid;
624+
}
625+
}
626+
);
627+
628+
$this->assertEquals($sessionAfterResume, $originalSession);
629+
}
630+
591631
private function insertDocument($document)
592632
{
593633
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);

0 commit comments

Comments
 (0)