diff --git a/src/error.ts b/src/error.ts index 08e4b86d949..29492c58989 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1551,6 +1551,10 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean { return true; } + if (error instanceof MongoServerSelectionError) { + return true; + } + if (wireVersion != null && wireVersion >= 9) { // DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable if (error.code === MONGODB_ERROR_CODES.CursorNotFound) { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index c6156ab9549..32da9530831 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'assert'; +import { UUID } from 'bson'; import { expect } from 'chai'; import { on, once } from 'events'; import { gte, lt } from 'semver'; @@ -14,6 +15,7 @@ import { type CommandStartedEvent, type Db, isHello, + LEGACY_HELLO_COMMAND, Long, MongoAPIError, MongoChangeStreamError, @@ -45,6 +47,19 @@ const pipeline = [ { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } ]; +async function forcePrimaryStepDown(client: MongoClient) { + await client + .db('admin') + .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); + await client + .db('admin') + .command({ replSetStepDown: 15, secondaryCatchUpPeriodSecs: 10, force: true }); + + // wait for secondary to become primary but also allow previous primary to become next primary + // in subsequent test runs + await sleep(15_000); +} + describe('Change Streams', function () { let client: MongoClient; let collection: Collection; @@ -2003,9 +2018,11 @@ describe('Change Streams', function () { describe('ChangeStream resumability', function () { let client: MongoClient; + let utilClient: MongoClient; let collection: Collection; let changeStream: ChangeStream; let aggregateEvents: CommandStartedEvent[] = []; + let appName: string; const changeStreamResumeOptions: ChangeStreamOptions = { fullDocument: 'updateLookup', @@ -2055,22 +2072,36 @@ describe('ChangeStream resumability', function () { beforeEach(async function () { const dbName = 'resumabilty_tests'; const collectionName = 'foo'; - const utilClient = this.configuration.newClient(); + + utilClient = this.configuration.newClient(); + // 3.6 servers do not support creating a change stream on a database that doesn't exist await utilClient .db(dbName) .dropDatabase() .catch(e => e); await utilClient.db(dbName).createCollection(collectionName); - await utilClient.close(); - client = this.configuration.newClient({ monitorCommands: true }); + // we are going to switch primary in tests and cleanup of failpoints is difficult, + // so generating unique appname instead of cleaning for each test is an easier solution + appName = new UUID().toString(); + + client = this.configuration.newClient( + {}, + { + monitorCommands: true, + serverSelectionTimeoutMS: 10_000, + heartbeatFrequencyMS: 5_000, + appName: appName + } + ); client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents)); collection = client.db(dbName).collection(collectionName); }); afterEach(async function () { await changeStream.close(); + await utilClient.close(); await client.close(); aggregateEvents = []; }); @@ -2228,6 +2259,38 @@ describe('ChangeStream resumability', function () { expect(changeStream.closed).to.be.true; }); }); + + context('when the error is not a server error', function () { + it( + 'should resume on ServerSelectionError', + { requires: { topology: ['replicaset'] } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await collection.insertOne({ a: 1 }); + + await utilClient.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + closeConnection: true, + appName: appName + } + } as FailCommandFailPoint); + + await forcePrimaryStepDown(utilClient); + + const change = await changeStream.next(); + expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); + + expect(aggregateEvents).to.have.lengthOf(2); + const [e1, e2] = aggregateEvents; + expect(e1.address).to.not.equal(e2.address); + } + ); + }); }); context('#hasNext', function () { @@ -2541,6 +2604,37 @@ describe('ChangeStream resumability', function () { expect(changeStream.closed).to.be.true; }); }); + + context('when the error is not a server error', function () { + it( + 'should resume on ServerSelectionError', + { requires: { topology: ['replicaset'] } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await collection.insertOne({ a: 1 }); + + await utilClient.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + closeConnection: true, + appName: appName + } + } as FailCommandFailPoint); + await forcePrimaryStepDown(utilClient); + + const change = await changeStream.tryNext(); + expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); + + expect(aggregateEvents).to.have.lengthOf(2); + const [e1, e2] = aggregateEvents; + expect(e1.address).to.not.equal(e2.address); + } + ); + }); }); context('#asyncIterator', function () { @@ -2677,6 +2771,41 @@ describe('ChangeStream resumability', function () { } }); }); + + context('when the error is not a server error', function () { + it( + 'should resume on ServerSelectionError', + { requires: { topology: ['replicaset'] } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + await collection.insertOne({ a: 1 }); + + await utilClient.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + closeConnection: true, + appName: appName + } + } as FailCommandFailPoint); + await forcePrimaryStepDown(utilClient); + + const change = await changeStreamIterator.next(); + expect(change.value).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); + + expect(aggregateEvents).to.have.lengthOf(2); + const [e1, e2] = aggregateEvents; + expect(e1.address).to.not.equal(e2.address); + } + ); + }); }); }); @@ -2866,6 +2995,50 @@ describe('ChangeStream resumability', function () { expect(changeStream.closed).to.be.true; }); }); + + context('when the error is not a server error', function () { + it( + 'should resume on ServerSelectionError', + { requires: { topology: ['replicaset'] } }, + async function () { + changeStream = collection.watch([]); + + const changes = on(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + await collection.insertOne({ a: 1 }); + + const change = await changes.next(); + expect(change.value[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); + + await utilClient.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + closeConnection: true, + appName: appName + } + } as FailCommandFailPoint); + await forcePrimaryStepDown(utilClient); + + await collection.insertOne({ a: 2 }); + + const change2 = await changes.next(); + expect(change2.value[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 2 } + }); + + expect(aggregateEvents).to.have.lengthOf(2); + const [e1, e2] = aggregateEvents; + expect(e1.address).to.not.equal(e2.address); + } + ); + }); }); it(