Skip to content

Commit 5d2a8a4

Browse files
committed
Merge pull request #528
2 parents 0331b37 + 5fd00fb commit 5d2a8a4

File tree

3 files changed

+105
-0
lines changed

3 files changed

+105
-0
lines changed

src/ChangeStream.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ public function next()
101101
$this->hasAdvanced = true;
102102
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
103103
}
104+
/* If the cursorId is 0, the server has invalidated the cursor so we
105+
* will never perform another getMore. This means that we cannot
106+
* resume and we can therefore unset the resumeCallable, which will
107+
* free any reference to Watch. This will also free the only
108+
* reference to an implicit session, since any such reference
109+
* belongs to Watch. */
110+
if ((string)$this->getCursorId() === '0') {
111+
$this->resumeCallable = null;
112+
}
104113
} catch (RuntimeException $e) {
105114
if (strpos($e->getMessage(), "not master") !== false) {
106115
$resumable = true;
@@ -130,6 +139,10 @@ public function rewind()
130139
$this->hasAdvanced = true;
131140
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
132141
}
142+
// As with next(), free the callable once we know it will never be used.
143+
if ((string)$this->getCursorId() === '0') {
144+
$this->resumeCallable = null;
145+
}
133146
} catch (RuntimeException $e) {
134147
if (strpos($e->getMessage(), "not master") !== false) {
135148
$resumable = true;

src/Operation/Watch.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
110110
}
111111
}
112112

113+
if ( ! isset($options['session'])) {
114+
try {
115+
$options['session'] = $manager->startSession();
116+
} catch (DriverRuntimeException $e) {}
117+
}
118+
113119
$this->databaseName = (string) $databaseName;
114120
$this->collectionName = (string) $collectionName;
115121
$this->pipeline = $pipeline;

tests/Operation/WatchFunctionalTest.php

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
use MongoDB\Driver\Server;
99
use MongoDB\Driver\Exception\ConnectionTimeoutException;
1010
use MongoDB\Exception\ResumeTokenException;
11+
use MongoDB\Operation\CreateCollection;
1112
use MongoDB\Operation\DatabaseCommand;
13+
use MongoDB\Operation\DropCollection;
1214
use MongoDB\Operation\InsertOne;
1315
use MongoDB\Operation\Watch;
1416
use MongoDB\Tests\CommandObserver;
@@ -588,6 +590,90 @@ public function testResumeTokenNotFoundAdvancesKey()
588590
$this->assertSame(2, $changeStream->key());
589591
}
590592

593+
public function testSessionPersistsAfterResume()
594+
{
595+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
596+
597+
$changeStream = null;
598+
$originalSession = null;
599+
$sessionAfterResume = [];
600+
$commands = [];
601+
602+
/* We want to ensure that the lsid of the initial aggregate matches the
603+
* lsid of any aggregates after the change stream resumes. After
604+
* PHPC-1152 is complete, we will ensure that the lsid of the initial
605+
* aggregate matches the lsid of any subsequent aggregates and getMores.
606+
*/
607+
(new CommandObserver)->observe(
608+
function() use ($operation, &$changeStream) {
609+
$changeStream = $operation->execute($this->getPrimaryServer());
610+
},
611+
function($changeStream) use (&$originalSession) {
612+
if (isset($changeStream->aggregate)) {
613+
$originalSession = bin2hex((string) $changeStream->lsid->id);
614+
}
615+
}
616+
);
617+
618+
$changeStream->rewind();
619+
$this->killChangeStreamCursor($changeStream);
620+
621+
(new CommandObserver)->observe(
622+
function() use (&$changeStream) {
623+
$changeStream->next();
624+
},
625+
function ($changeStream) use (&$sessionAfterResume, &$commands) {
626+
$commands[] = key((array) $changeStream);
627+
$sessionAfterResume[] = bin2hex((string) $changeStream->lsid->id);
628+
}
629+
);
630+
631+
$expectedCommands = [
632+
/* We expect a getMore to be issued because we are calling next(). */
633+
'getMore',
634+
/* Since we have killed the cursor, ChangeStream will resume by
635+
* issuing a new aggregate commmand. */
636+
'aggregate',
637+
/* When ChangeStream resumes, it overwrites its original cursor with
638+
* the new cursor resulting from the last aggregate command. This
639+
* removes the last reference to the old cursor, which causes the
640+
* driver to kill it (via mongoc_cursor_destroy()). */
641+
'killCursors',
642+
/* Finally, ChangeStream will rewind the new cursor as the last step
643+
* of the resume process. This results in one last getMore. */
644+
'getMore',
645+
];
646+
647+
$this->assertSame($expectedCommands, $commands);
648+
649+
foreach ($sessionAfterResume as $session) {
650+
$this->assertEquals($session, $originalSession);
651+
}
652+
}
653+
654+
public function testSessionFreed()
655+
{
656+
$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName());
657+
$operation->execute($this->getPrimaryServer());
658+
659+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
660+
$changeStream = $operation->execute($this->getPrimaryServer());
661+
662+
$rc = new ReflectionClass($changeStream);
663+
$rp = $rc->getProperty('resumeCallable');
664+
$rp->setAccessible(true);
665+
666+
$this->assertNotNull($rp->getValue($changeStream));
667+
668+
// Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
669+
$operation = new DropCollection($this->getDatabaseName(), $this->getCollectionName());
670+
$operation->execute($this->getPrimaryServer());
671+
672+
$changeStream->next();
673+
674+
$this->assertNull($rp->getValue($changeStream));
675+
}
676+
591677
private function insertDocument($document)
592678
{
593679
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);

0 commit comments

Comments
 (0)