Skip to content

Commit 234d565

Browse files
committed
Merge pull request #632
2 parents b42924e + 8f1f3a3 commit 234d565

File tree

2 files changed

+45
-23
lines changed

2 files changed

+45
-23
lines changed

src/Operation/Watch.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*/
4848
class Watch implements Executable, /* @internal */ CommandSubscriber
4949
{
50-
private static $wireVersionForOperationTime = 7;
50+
private static $wireVersionForStartAtOperationTime = 7;
5151

5252
const FULL_DOCUMENT_DEFAULT = 'default';
5353
const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
@@ -267,9 +267,9 @@ private function createResumeCallable(Manager $manager)
267267
private function executeAggregate(Server $server)
268268
{
269269
/* If we've already captured an operation time or the server does not
270-
* support returning an operation time (e.g. MongoDB 3.6), execute the
271-
* aggregation directly and return its cursor. */
272-
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForOperationTime)) {
270+
* support resuming from an operation time (e.g. MongoDB 3.6), execute
271+
* the aggregation directly and return its cursor. */
272+
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
273273
return $this->aggregate->execute($server);
274274
}
275275

tests/Operation/WatchFunctionalTest.php

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
class WatchFunctionalTest extends FunctionalTestCase
2323
{
24+
private static $wireVersionForStartAtOperationTime = 7;
25+
2426
private $defaultOptions = ['maxAwaitTimeMS' => 500];
2527

2628
public function setUp()
@@ -134,6 +136,8 @@ function(array $event) use (&$commands) {
134136

135137
public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime()
136138
{
139+
$this->skipIfStartAtOperationTimeNotSupported();
140+
137141
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
138142

139143
$operationTime = null;
@@ -150,7 +154,9 @@ function (array $event) use (&$events) {
150154

151155
$this->assertCount(1, $events);
152156
$this->assertSame('aggregate', $events[0]['started']->getCommandName());
153-
$operationTime = $events[0]['succeeded']->getReply()->operationTime;
157+
$reply = $events[0]['succeeded']->getReply();
158+
$this->assertObjectHasAttribute('operationTime', $reply);
159+
$operationTime = $reply->operationTime;
154160
$this->assertInstanceOf(TimestampInterface::class, $operationTime);
155161

156162
$this->assertNull($changeStream->current());
@@ -398,20 +404,29 @@ public function testResumeMultipleTimesInSuccession()
398404

399405
$this->insertDocument(['_id' => 1]);
400406

407+
/* Insert a document and advance the change stream to ensure we capture
408+
* a resume token. This is necessary when startAtOperationTime is not
409+
* supported (i.e. 3.6 server version). */
410+
$changeStream->next();
411+
$this->assertTrue($changeStream->valid());
412+
$this->assertSame(0, $changeStream->key());
413+
414+
$this->insertDocument(['_id' => 2]);
415+
401416
/* Killing the cursor and advancing when there is a result will test
402417
* that next()'s resume attempt picks up the latest change. */
403418
$this->killChangeStreamCursor($changeStream);
404419

405420
$changeStream->next();
406421
$this->assertTrue($changeStream->valid());
407-
$this->assertSame(0, $changeStream->key());
422+
$this->assertSame(1, $changeStream->key());
408423

409424
$expectedResult = [
410425
'_id' => $changeStream->current()->_id,
411426
'operationType' => 'insert',
412-
'fullDocument' => ['_id' => 1],
427+
'fullDocument' => ['_id' => 2],
413428
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
414-
'documentKey' => ['_id' => 1],
429+
'documentKey' => ['_id' => 2],
415430
];
416431

417432
$this->assertMatchesDocument($expectedResult, $changeStream->current());
@@ -424,48 +439,48 @@ public function testResumeMultipleTimesInSuccession()
424439

425440
$changeStream->rewind();
426441
$this->assertTrue($changeStream->valid());
427-
$this->assertSame(0, $changeStream->key());
442+
$this->assertSame(1, $changeStream->key());
428443

429444
$expectedResult = [
430445
'_id' => $changeStream->current()->_id,
431446
'operationType' => 'insert',
432-
'fullDocument' => ['_id' => 1],
447+
'fullDocument' => ['_id' => 2],
433448
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
434-
'documentKey' => ['_id' => 1],
449+
'documentKey' => ['_id' => 2],
435450
];
436451

437452
$this->assertMatchesDocument($expectedResult, $changeStream->current());
438453

439-
$this->insertDocument(['_id' => 2]);
454+
$this->insertDocument(['_id' => 3]);
440455

441456
$changeStream->next();
442457
$this->assertTrue($changeStream->valid());
443-
$this->assertSame(1, $changeStream->key());
458+
$this->assertSame(2, $changeStream->key());
444459

445460
$expectedResult = [
446461
'_id' => $changeStream->current()->_id,
447462
'operationType' => 'insert',
448-
'fullDocument' => ['_id' => 2],
463+
'fullDocument' => ['_id' => 3],
449464
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
450-
'documentKey' => ['_id' => 2],
465+
'documentKey' => ['_id' => 3],
451466
];
452467

453468
$this->assertMatchesDocument($expectedResult, $changeStream->current());
454469

455470
$this->killChangeStreamCursor($changeStream);
456471

457-
$this->insertDocument(['_id' => 3]);
472+
$this->insertDocument(['_id' => 4]);
458473

459474
$changeStream->next();
460475
$this->assertTrue($changeStream->valid());
461-
$this->assertSame(2, $changeStream->key());
476+
$this->assertSame(3, $changeStream->key());
462477

463478
$expectedResult = [
464479
'_id' => $changeStream->current()->_id,
465480
'operationType' => 'insert',
466-
'fullDocument' => ['_id' => 3],
481+
'fullDocument' => ['_id' => 4],
467482
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
468-
'documentKey' => ['_id' => 3],
483+
'documentKey' => ['_id' => 4],
469484
];
470485

471486
$this->assertMatchesDocument($expectedResult, $changeStream->current());
@@ -476,18 +491,18 @@ public function testResumeMultipleTimesInSuccession()
476491
* we'll see {_id: 3} returned again. */
477492
$this->killChangeStreamCursor($changeStream);
478493

479-
$this->insertDocument(['_id' => 4]);
494+
$this->insertDocument(['_id' => 5]);
480495

481496
$changeStream->next();
482497
$this->assertTrue($changeStream->valid());
483-
$this->assertSame(3, $changeStream->key());
498+
$this->assertSame(4, $changeStream->key());
484499

485500
$expectedResult = [
486501
'_id' => $changeStream->current()->_id,
487502
'operationType' => 'insert',
488-
'fullDocument' => ['_id' => 4],
503+
'fullDocument' => ['_id' => 5],
489504
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
490-
'documentKey' => ['_id' => 4],
505+
'documentKey' => ['_id' => 5],
491506
];
492507

493508
$this->assertMatchesDocument($expectedResult, $changeStream->current());
@@ -944,4 +959,11 @@ private function killChangeStreamCursor(ChangeStream $changeStream)
944959
$operation = new DatabaseCommand($this->getDatabaseName(), $command);
945960
$operation->execute($this->getPrimaryServer());
946961
}
962+
963+
private function skipIfStartAtOperationTimeNotSupported()
964+
{
965+
if (!\MongoDB\server_supports_feature($this->getPrimaryServer(), self::$wireVersionForStartAtOperationTime)) {
966+
$this->markTestSkipped('Operation time is not supported');
967+
}
968+
}
947969
}

0 commit comments

Comments
 (0)