From 3f65dd74d3f84977a5a300a37f0327ef619289a7 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Mon, 8 Sep 2025 08:58:32 +0200 Subject: [PATCH 01/10] test(NODE-6858): add tests for ServerSelectionError --- .../change-streams/change_stream.test.ts | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index c6156ab9549..b9913f4e319 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -14,6 +14,7 @@ import { type CommandStartedEvent, type Db, isHello, + LEGACY_HELLO_COMMAND, Long, MongoAPIError, MongoChangeStreamError, @@ -2001,7 +2002,7 @@ describe('Change Streams', function () { }); }); -describe('ChangeStream resumability', function () { +describe.only('ChangeStream resumability', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; @@ -2228,6 +2229,65 @@ describe('ChangeStream resumability', function () { expect(changeStream.closed).to.be.true; }); }); + + context.only('when the error is not a server error', function () { + let client1: MongoClient; + let client2: MongoClient; + + beforeEach(async function () { + client1 = this.configuration.newClient( + {}, + { serverSelectionTimeoutMS: 1000, appName: 'client-errors' } + ); + client2 = this.configuration.newClient(); + + collection = client1.db('client-errors').collection('test'); + }); + + afterEach(async function () { + await client2.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'off', + data: { appName: 'client-errors' } + } as FailCommandFailPoint); + + await client1?.close(); + await client2?.close(); + }); + + it( + 'should resume on ServerSelectionError', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await collection.insertOne({ a: 1 }); + + await client2.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + closeConnection: true, + handshakeCommands: true, + failInternalCommands: true, + appName: 'client-errors' + } + } as FailCommandFailPoint); + await client2 + .db('admin') + .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.secondary }); + await client2 + .db('admin') + .command({ replSetStepDown: 15, secondaryCatchUpPeriodSecs: 10, force: true }); + // await sleep(15_000); + + const change = await changeStream.next(); + expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); + } + ); + }); }); context('#hasNext', function () { From 8535a7cc2eca42d10ee6cfaa3997c1781e64b847 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Fri, 12 Sep 2025 15:23:39 +0200 Subject: [PATCH 02/10] test(NODE-6858): add tests for all iterator methods --- src/error.ts | 4 + .../change-streams/change_stream.test.ts | 145 +++++++++++++----- 2 files changed, 114 insertions(+), 35 deletions(-) diff --git a/src/error.ts b/src/error.ts index 08e4b86d949..0e882db07e7 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1551,6 +1551,10 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean { return true; } + // if (error instanceof MongoServerSelectionError) { + // return true; + // } + if (wireVersion != null && wireVersion >= 9) { // DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable if (error.code === MONGODB_ERROR_CODES.CursorNotFound) { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index b9913f4e319..0ef1c69bf89 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -27,6 +27,7 @@ import * as mock from '../../tools/mongodb-mock/index'; import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder'; import { type FailCommandFailPoint, sleep } from '../../tools/utils'; import { delay, filterForCommands } from '../shared'; +import { UUID } from 'bson'; const initIteratorMode = async (cs: ChangeStream) => { const initEvent = once(cs.cursor, 'init'); @@ -2007,6 +2008,7 @@ describe.only('ChangeStream resumability', function () { let collection: Collection; let changeStream: ChangeStream; let aggregateEvents: CommandStartedEvent[] = []; + let appName: string; const changeStreamResumeOptions: ChangeStreamOptions = { fullDocument: 'updateLookup', @@ -2065,7 +2067,15 @@ describe.only('ChangeStream resumability', function () { await utilClient.db(dbName).createCollection(collectionName); await utilClient.close(); - client = this.configuration.newClient({ monitorCommands: true }); + // we are going to switch primary in tests and cleanup of failpoints is difficult, + // so generating unique appname instead of cleaning for each test is an easier solution + appName = new UUID().toString(); + + client = this.configuration.newClient({ + monitorCommands: true, + serverSelectionTimeoutMS: 5_000, + appName: appName + }); client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents)); collection = client.db(dbName).collection(collectionName); }); @@ -2230,41 +2240,19 @@ describe.only('ChangeStream resumability', function () { }); }); - context.only('when the error is not a server error', function () { - let client1: MongoClient; - let client2: MongoClient; - - beforeEach(async function () { - client1 = this.configuration.newClient( - {}, - { serverSelectionTimeoutMS: 1000, appName: 'client-errors' } - ); - client2 = this.configuration.newClient(); - - collection = client1.db('client-errors').collection('test'); - }); - - afterEach(async function () { - await client2.db('admin').command({ - configureFailPoint: 'failCommand', - mode: 'off', - data: { appName: 'client-errors' } - } as FailCommandFailPoint); - - await client1?.close(); - await client2?.close(); - }); - + context('when the error is not a server error', function () { + // This test requires a replica set to call replSetFreeze command it( 'should resume on ServerSelectionError', - { requires: { topology: '!single' } }, + { requires: { topology: ['replicaset'] } }, async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); await collection.insertOne({ a: 1 }); - await client2.db('admin').command({ + // mimic the node termination by closing the connection and failing on heartbeat + await client.db('admin').command({ configureFailPoint: 'failCommand', mode: 'alwaysOn', data: { @@ -2272,19 +2260,22 @@ describe.only('ChangeStream resumability', function () { closeConnection: true, handshakeCommands: true, failInternalCommands: true, - appName: 'client-errors' + appName: appName } } as FailCommandFailPoint); - await client2 - .db('admin') - .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.secondary }); - await client2 + // force new election in the cluster + await client .db('admin') - .command({ replSetStepDown: 15, secondaryCatchUpPeriodSecs: 10, force: true }); - // await sleep(15_000); + .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); + await client.db('admin').command({ replSetStepDown: 30, force: true }); + await sleep(1500); const change = await changeStream.next(); expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); + + expect(aggregateEvents).to.have.lengthOf(2); + const [e1, e2] = aggregateEvents; + expect(e1.address).to.not.equal(e2.address); } ); }); @@ -2601,6 +2592,46 @@ describe.only('ChangeStream resumability', function () { expect(changeStream.closed).to.be.true; }); }); + + context('when the error is not a server error', function () { + // This test requires a replica set to call replSetFreeze command + it( + 'should resume on ServerSelectionError', + { requires: { topology: ['replicaset'] } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await collection.insertOne({ a: 1 }); + + // mimic the node termination by closing the connection and failing on heartbeat + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + closeConnection: true, + handshakeCommands: true, + failInternalCommands: true, + appName: appName + } + } as FailCommandFailPoint); + // force new election in the cluster + await client + .db('admin') + .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); + await client.db('admin').command({ replSetStepDown: 30, force: true }); + await sleep(1500); + + const change = await changeStream.tryNext(); + expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); + + expect(aggregateEvents).to.have.lengthOf(2); + const [e1, e2] = aggregateEvents; + expect(e1.address).to.not.equal(e2.address); + } + ); + }); }); context('#asyncIterator', function () { @@ -2737,6 +2768,50 @@ describe.only('ChangeStream resumability', function () { } }); }); + + context('when the error is not a server error', function () { + // This test requires a replica set to call replSetFreeze command + it( + 'should resume on ServerSelectionError', + { requires: { topology: ['replicaset'] } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + await collection.insertOne({ a: 1 }); + + // mimic the node termination by closing the connection and failing on heartbeat + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + closeConnection: true, + handshakeCommands: true, + failInternalCommands: true, + appName: appName + } + } as FailCommandFailPoint); + // force new election in the cluster + await client + .db('admin') + .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); + await client.db('admin').command({ replSetStepDown: 30, force: true }); + await sleep(1500); + + const change = await changeStreamIterator.next(); + expect(change.value).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); + + expect(aggregateEvents).to.have.lengthOf(2); + const [e1, e2] = aggregateEvents; + expect(e1.address).to.not.equal(e2.address); + } + ); + }); }); }); From 45adf9cd4b8abcbe4e4c32faa7a7e43947891de5 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Fri, 12 Sep 2025 18:04:14 +0200 Subject: [PATCH 03/10] test(NODE-6858): handle MongoServerSelectionError --- src/error.ts | 6 +++--- test/integration/change-streams/change_stream.test.ts | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/error.ts b/src/error.ts index 0e882db07e7..29492c58989 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1551,9 +1551,9 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean { return true; } - // if (error instanceof MongoServerSelectionError) { - // return true; - // } + if (error instanceof MongoServerSelectionError) { + return true; + } if (wireVersion != null && wireVersion >= 9) { // DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 0ef1c69bf89..e9a02482489 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2003,7 +2003,7 @@ describe('Change Streams', function () { }); }); -describe.only('ChangeStream resumability', function () { +describe('ChangeStream resumability', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; From 535b29c10ebe7f7f7a87f1b1da586d651245930b Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Mon, 15 Sep 2025 09:37:06 +0200 Subject: [PATCH 04/10] test(NODE-6858): lower heartbeat to handle primary termination --- src/error.ts | 6 ++--- .../change-streams/change_stream.test.ts | 25 ++++++++++++------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/error.ts b/src/error.ts index 29492c58989..0e882db07e7 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1551,9 +1551,9 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean { return true; } - if (error instanceof MongoServerSelectionError) { - return true; - } + // if (error instanceof MongoServerSelectionError) { + // return true; + // } if (wireVersion != null && wireVersion >= 9) { // DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index e9a02482489..5eab4e02218 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2003,7 +2003,7 @@ describe('Change Streams', function () { }); }); -describe('ChangeStream resumability', function () { +describe.only('ChangeStream resumability', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; @@ -2071,11 +2071,15 @@ describe('ChangeStream resumability', function () { // so generating unique appname instead of cleaning for each test is an easier solution appName = new UUID().toString(); - client = this.configuration.newClient({ - monitorCommands: true, - serverSelectionTimeoutMS: 5_000, - appName: appName - }); + client = this.configuration.newClient( + {}, + { + monitorCommands: true, + serverSelectionTimeoutMS: 5_000, + heartbeatFrequencyMS: 500, + appName: appName + } + ); client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents)); collection = client.db(dbName).collection(collectionName); }); @@ -2268,7 +2272,8 @@ describe('ChangeStream resumability', function () { .db('admin') .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); await client.db('admin').command({ replSetStepDown: 30, force: true }); - await sleep(1500); + + await sleep(500); const change = await changeStream.next(); expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); @@ -2621,7 +2626,8 @@ describe('ChangeStream resumability', function () { .db('admin') .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); await client.db('admin').command({ replSetStepDown: 30, force: true }); - await sleep(1500); + + await sleep(500); const change = await changeStream.tryNext(); expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); @@ -2798,7 +2804,8 @@ describe('ChangeStream resumability', function () { .db('admin') .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); await client.db('admin').command({ replSetStepDown: 30, force: true }); - await sleep(1500); + + await sleep(500); const change = await changeStreamIterator.next(); expect(change.value).to.containSubset({ From f8b9a0d99bcac8dfef74e451043e54ebaa4e9909 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Mon, 15 Sep 2025 10:14:09 +0200 Subject: [PATCH 05/10] test(NODE-6858): add tests for event emmitter based iteration --- src/error.ts | 6 +- .../change-streams/change_stream.test.ts | 68 +++++++++++++++---- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/src/error.ts b/src/error.ts index 0e882db07e7..29492c58989 100644 --- a/src/error.ts +++ b/src/error.ts @@ -1551,9 +1551,9 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean { return true; } - // if (error instanceof MongoServerSelectionError) { - // return true; - // } + if (error instanceof MongoServerSelectionError) { + return true; + } if (wireVersion != null && wireVersion >= 9) { // DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 5eab4e02218..826e23cc6ec 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2255,23 +2255,19 @@ describe.only('ChangeStream resumability', function () { await collection.insertOne({ a: 1 }); - // mimic the node termination by closing the connection and failing on heartbeat await client.db('admin').command({ configureFailPoint: 'failCommand', mode: 'alwaysOn', data: { failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], closeConnection: true, - handshakeCommands: true, - failInternalCommands: true, appName: appName } } as FailCommandFailPoint); - // force new election in the cluster await client .db('admin') .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); - await client.db('admin').command({ replSetStepDown: 30, force: true }); + await client.db('admin').command({ replSetStepDown: 5, force: true }); await sleep(500); @@ -2609,23 +2605,19 @@ describe.only('ChangeStream resumability', function () { await collection.insertOne({ a: 1 }); - // mimic the node termination by closing the connection and failing on heartbeat await client.db('admin').command({ configureFailPoint: 'failCommand', mode: 'alwaysOn', data: { failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], closeConnection: true, - handshakeCommands: true, - failInternalCommands: true, appName: appName } } as FailCommandFailPoint); - // force new election in the cluster await client .db('admin') .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); - await client.db('admin').command({ replSetStepDown: 30, force: true }); + await client.db('admin').command({ replSetStepDown: 5, force: true }); await sleep(500); @@ -2787,23 +2779,19 @@ describe.only('ChangeStream resumability', function () { await collection.insertOne({ a: 1 }); - // mimic the node termination by closing the connection and failing on heartbeat await client.db('admin').command({ configureFailPoint: 'failCommand', mode: 'alwaysOn', data: { failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], closeConnection: true, - handshakeCommands: true, - failInternalCommands: true, appName: appName } } as FailCommandFailPoint); - // force new election in the cluster await client .db('admin') .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); - await client.db('admin').command({ replSetStepDown: 30, force: true }); + await client.db('admin').command({ replSetStepDown: 5, force: true }); await sleep(500); @@ -3008,6 +2996,56 @@ describe.only('ChangeStream resumability', function () { expect(changeStream.closed).to.be.true; }); }); + + context('when the error is not a server error', function () { + // This test requires a replica set to call replSetFreeze command + it( + 'should resume on ServerSelectionError', + { requires: { topology: ['replicaset'] } }, + async function () { + changeStream = collection.watch([]); + + const changes = on(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + await collection.insertOne({ a: 1 }); + + const change = await changes.next(); + expect(change.value[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); + + await client.db('admin').command({ + configureFailPoint: 'failCommand', + mode: 'alwaysOn', + data: { + failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND], + closeConnection: true, + appName: appName + } + } as FailCommandFailPoint); + await client + .db('admin') + .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); + await client.db('admin').command({ replSetStepDown: 5, force: true }); + + await sleep(500); + + await collection.insertOne({ a: 2 }); + + const change2 = await changes.next(); + expect(change2.value[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 2 } + }); + + expect(aggregateEvents).to.have.lengthOf(2); + const [e1, e2] = aggregateEvents; + expect(e1.address).to.not.equal(e2.address); + } + ); + }); }); it( From cb164dc9558d1dffd3b353b7e516bc0f6ce0d407 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Mon, 15 Sep 2025 12:22:54 +0200 Subject: [PATCH 06/10] test(NODE-6858): adjust periods for election as well as heartbeat and server selection --- .../change-streams/change_stream.test.ts | 60 +++++++++---------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 826e23cc6ec..f5cdff8f593 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -47,6 +47,19 @@ const pipeline = [ { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } ]; +async function forcePrimaryStepDown(client: MongoClient) { + await client + .db('admin') + .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); + await client + .db('admin') + .command({ replSetStepDown: 15, secondaryCatchUpPeriodSecs: 10, force: true }); + + // wait for secondary to become primary but also allow previous primary to become next primary + // in subsequent test runs + await sleep(15_000); +} + describe('Change Streams', function () { let client: MongoClient; let collection: Collection; @@ -2005,6 +2018,7 @@ describe('Change Streams', function () { describe.only('ChangeStream resumability', function () { let client: MongoClient; + let utilClient: MongoClient; let collection: Collection; let changeStream: ChangeStream; let aggregateEvents: CommandStartedEvent[] = []; @@ -2058,14 +2072,15 @@ describe.only('ChangeStream resumability', function () { beforeEach(async function () { const dbName = 'resumabilty_tests'; const collectionName = 'foo'; - const utilClient = this.configuration.newClient(); + + utilClient = this.configuration.newClient(); + // 3.6 servers do not support creating a change stream on a database that doesn't exist await utilClient .db(dbName) .dropDatabase() .catch(e => e); await utilClient.db(dbName).createCollection(collectionName); - await utilClient.close(); // we are going to switch primary in tests and cleanup of failpoints is difficult, // so generating unique appname instead of cleaning for each test is an easier solution @@ -2075,8 +2090,8 @@ describe.only('ChangeStream resumability', function () { {}, { monitorCommands: true, - serverSelectionTimeoutMS: 5_000, - heartbeatFrequencyMS: 500, + serverSelectionTimeoutMS: 10_000, + heartbeatFrequencyMS: 5_000, appName: appName } ); @@ -2086,6 +2101,7 @@ describe.only('ChangeStream resumability', function () { afterEach(async function () { await changeStream.close(); + await utilClient.close(); await client.close(); aggregateEvents = []; }); @@ -2255,7 +2271,7 @@ describe.only('ChangeStream resumability', function () { await collection.insertOne({ a: 1 }); - await client.db('admin').command({ + await utilClient.db('admin').command({ configureFailPoint: 'failCommand', mode: 'alwaysOn', data: { @@ -2264,12 +2280,8 @@ describe.only('ChangeStream resumability', function () { appName: appName } } as FailCommandFailPoint); - await client - .db('admin') - .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); - await client.db('admin').command({ replSetStepDown: 5, force: true }); - await sleep(500); + await forcePrimaryStepDown(utilClient); const change = await changeStream.next(); expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); @@ -2605,7 +2617,7 @@ describe.only('ChangeStream resumability', function () { await collection.insertOne({ a: 1 }); - await client.db('admin').command({ + await utilClient.db('admin').command({ configureFailPoint: 'failCommand', mode: 'alwaysOn', data: { @@ -2614,12 +2626,7 @@ describe.only('ChangeStream resumability', function () { appName: appName } } as FailCommandFailPoint); - await client - .db('admin') - .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); - await client.db('admin').command({ replSetStepDown: 5, force: true }); - - await sleep(500); + await forcePrimaryStepDown(utilClient); const change = await changeStream.tryNext(); expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } }); @@ -2779,7 +2786,7 @@ describe.only('ChangeStream resumability', function () { await collection.insertOne({ a: 1 }); - await client.db('admin').command({ + await utilClient.db('admin').command({ configureFailPoint: 'failCommand', mode: 'alwaysOn', data: { @@ -2788,12 +2795,7 @@ describe.only('ChangeStream resumability', function () { appName: appName } } as FailCommandFailPoint); - await client - .db('admin') - .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); - await client.db('admin').command({ replSetStepDown: 5, force: true }); - - await sleep(500); + await forcePrimaryStepDown(utilClient); const change = await changeStreamIterator.next(); expect(change.value).to.containSubset({ @@ -2998,7 +3000,6 @@ describe.only('ChangeStream resumability', function () { }); context('when the error is not a server error', function () { - // This test requires a replica set to call replSetFreeze command it( 'should resume on ServerSelectionError', { requires: { topology: ['replicaset'] } }, @@ -3016,7 +3017,7 @@ describe.only('ChangeStream resumability', function () { fullDocument: { a: 1 } }); - await client.db('admin').command({ + await utilClient.db('admin').command({ configureFailPoint: 'failCommand', mode: 'alwaysOn', data: { @@ -3025,12 +3026,7 @@ describe.only('ChangeStream resumability', function () { appName: appName } } as FailCommandFailPoint); - await client - .db('admin') - .command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY }); - await client.db('admin').command({ replSetStepDown: 5, force: true }); - - await sleep(500); + await forcePrimaryStepDown(utilClient); await collection.insertOne({ a: 2 }); From 9fc2da210938849df46f76c68b07775c1cb33ca1 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Mon, 15 Sep 2025 12:43:43 +0200 Subject: [PATCH 07/10] test(NODE-6858): run all tests --- test/integration/change-streams/change_stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index f5cdff8f593..577df9eb221 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2016,7 +2016,7 @@ describe('Change Streams', function () { }); }); -describe.only('ChangeStream resumability', function () { +describe('ChangeStream resumability', function () { let client: MongoClient; let utilClient: MongoClient; let collection: Collection; From b7851896980464c57b0dd7d63a87314ab1193230 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Mon, 15 Sep 2025 13:07:32 +0200 Subject: [PATCH 08/10] test(NODE-6858): fix lint --- test/integration/change-streams/change_stream.test.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 577df9eb221..32da9530831 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'assert'; +import { UUID } from 'bson'; import { expect } from 'chai'; import { on, once } from 'events'; import { gte, lt } from 'semver'; @@ -27,7 +28,6 @@ import * as mock from '../../tools/mongodb-mock/index'; import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder'; import { type FailCommandFailPoint, sleep } from '../../tools/utils'; import { delay, filterForCommands } from '../shared'; -import { UUID } from 'bson'; const initIteratorMode = async (cs: ChangeStream) => { const initEvent = once(cs.cursor, 'init'); @@ -2261,7 +2261,6 @@ describe('ChangeStream resumability', function () { }); context('when the error is not a server error', function () { - // This test requires a replica set to call replSetFreeze command it( 'should resume on ServerSelectionError', { requires: { topology: ['replicaset'] } }, @@ -2607,7 +2606,6 @@ describe('ChangeStream resumability', function () { }); context('when the error is not a server error', function () { - // This test requires a replica set to call replSetFreeze command it( 'should resume on ServerSelectionError', { requires: { topology: ['replicaset'] } }, @@ -2775,7 +2773,6 @@ describe('ChangeStream resumability', function () { }); context('when the error is not a server error', function () { - // This test requires a replica set to call replSetFreeze command it( 'should resume on ServerSelectionError', { requires: { topology: ['replicaset'] } }, From 6e0381bb1b4db0b70c628d5f6a0aec78158ff822 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Tue, 16 Sep 2025 11:59:22 +0200 Subject: [PATCH 09/10] test(NODE-6858): run test on sharded & lb topologies Co-authored-by: Durran Jordan --- test/integration/change-streams/change_stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 32da9530831..c0f8d0ddf5e 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2263,7 +2263,7 @@ describe('ChangeStream resumability', function () { context('when the error is not a server error', function () { it( 'should resume on ServerSelectionError', - { requires: { topology: ['replicaset'] } }, + { requires: { topology: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'] } }, async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); From 4f2f5f04611409183224312beb788ec4d73b1589 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Tue, 16 Sep 2025 12:59:41 +0200 Subject: [PATCH 10/10] test(NODE-6858): run tests only on rs setup (revert prev commit) This reverts commit 6e0381bb1b4db0b70c628d5f6a0aec78158ff822. --- test/integration/change-streams/change_stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index c0f8d0ddf5e..32da9530831 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2263,7 +2263,7 @@ describe('ChangeStream resumability', function () { context('when the error is not a server error', function () { it( 'should resume on ServerSelectionError', - { requires: { topology: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'] } }, + { requires: { topology: ['replicaset'] } }, async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream);