Skip to content
Merged
4 changes: 4 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,10 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean {
return true;
}

if (error instanceof MongoServerSelectionError) {
return true;
}

if (wireVersion != null && wireVersion >= 9) {
// DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable
if (error.code === MONGODB_ERROR_CODES.CursorNotFound) {
Expand Down
179 changes: 176 additions & 3 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { strict as assert } from 'assert';
import { UUID } from 'bson';
import { expect } from 'chai';
import { on, once } from 'events';
import { gte, lt } from 'semver';
Expand All @@ -14,6 +15,7 @@ import {
type CommandStartedEvent,
type Db,
isHello,
LEGACY_HELLO_COMMAND,
Long,
MongoAPIError,
MongoChangeStreamError,
Expand Down Expand Up @@ -45,6 +47,19 @@ const pipeline = [
{ $addFields: { comment: 'The documentKey field has been projected out of this document.' } }
];

async function forcePrimaryStepDown(client: MongoClient) {
await client
.db('admin')
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
await client
.db('admin')
.command({ replSetStepDown: 15, secondaryCatchUpPeriodSecs: 10, force: true });

// wait for secondary to become primary but also allow previous primary to become next primary
// in subsequent test runs
await sleep(15_000);
}

describe('Change Streams', function () {
let client: MongoClient;
let collection: Collection;
Expand Down Expand Up @@ -2003,9 +2018,11 @@ describe('Change Streams', function () {

describe('ChangeStream resumability', function () {
let client: MongoClient;
let utilClient: MongoClient;
let collection: Collection;
let changeStream: ChangeStream;
let aggregateEvents: CommandStartedEvent[] = [];
let appName: string;

const changeStreamResumeOptions: ChangeStreamOptions = {
fullDocument: 'updateLookup',
Expand Down Expand Up @@ -2055,22 +2072,36 @@ describe('ChangeStream resumability', function () {
beforeEach(async function () {
const dbName = 'resumabilty_tests';
const collectionName = 'foo';
const utilClient = this.configuration.newClient();

utilClient = this.configuration.newClient();

// 3.6 servers do not support creating a change stream on a database that doesn't exist
await utilClient
.db(dbName)
.dropDatabase()
.catch(e => e);
await utilClient.db(dbName).createCollection(collectionName);
await utilClient.close();

client = this.configuration.newClient({ monitorCommands: true });
// we are going to switch primary in tests and cleanup of failpoints is difficult,
// so generating unique appname instead of cleaning for each test is an easier solution
appName = new UUID().toString();

client = this.configuration.newClient(
{},
{
monitorCommands: true,
serverSelectionTimeoutMS: 10_000,
heartbeatFrequencyMS: 5_000,
appName: appName
}
);
client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents));
collection = client.db(dbName).collection(collectionName);
});

afterEach(async function () {
await changeStream.close();
await utilClient.close();
await client.close();
aggregateEvents = [];
});
Expand Down Expand Up @@ -2228,6 +2259,38 @@ describe('ChangeStream resumability', function () {
expect(changeStream.closed).to.be.true;
});
});

context('when the error is not a server error', function () {
it(
'should resume on ServerSelectionError',
{ requires: { topology: ['replicaset'] } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await collection.insertOne({ a: 1 });

await utilClient.db('admin').command({
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
closeConnection: true,
appName: appName
}
} as FailCommandFailPoint);

await forcePrimaryStepDown(utilClient);

const change = await changeStream.next();
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } });

expect(aggregateEvents).to.have.lengthOf(2);
const [e1, e2] = aggregateEvents;
expect(e1.address).to.not.equal(e2.address);
}
);
});
});

context('#hasNext', function () {
Expand Down Expand Up @@ -2541,6 +2604,37 @@ describe('ChangeStream resumability', function () {
expect(changeStream.closed).to.be.true;
});
});

context('when the error is not a server error', function () {
it(
'should resume on ServerSelectionError',
{ requires: { topology: ['replicaset'] } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await collection.insertOne({ a: 1 });

await utilClient.db('admin').command({
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
closeConnection: true,
appName: appName
}
} as FailCommandFailPoint);
await forcePrimaryStepDown(utilClient);

const change = await changeStream.tryNext();
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } });

expect(aggregateEvents).to.have.lengthOf(2);
const [e1, e2] = aggregateEvents;
expect(e1.address).to.not.equal(e2.address);
}
);
});
});

context('#asyncIterator', function () {
Expand Down Expand Up @@ -2677,6 +2771,41 @@ describe('ChangeStream resumability', function () {
}
});
});

context('when the error is not a server error', function () {
it(
'should resume on ServerSelectionError',
{ requires: { topology: ['replicaset'] } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
const changeStreamIterator = changeStream[Symbol.asyncIterator]();

await collection.insertOne({ a: 1 });

await utilClient.db('admin').command({
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
closeConnection: true,
appName: appName
}
} as FailCommandFailPoint);
await forcePrimaryStepDown(utilClient);

const change = await changeStreamIterator.next();
expect(change.value).to.containSubset({
operationType: 'insert',
fullDocument: { a: 1 }
});

expect(aggregateEvents).to.have.lengthOf(2);
const [e1, e2] = aggregateEvents;
expect(e1.address).to.not.equal(e2.address);
}
);
});
});
});

Expand Down Expand Up @@ -2866,6 +2995,50 @@ describe('ChangeStream resumability', function () {
expect(changeStream.closed).to.be.true;
});
});

context('when the error is not a server error', function () {
it(
'should resume on ServerSelectionError',
{ requires: { topology: ['replicaset'] } },
async function () {
changeStream = collection.watch([]);

const changes = on(changeStream, 'change');
await once(changeStream.cursor, 'init');

await collection.insertOne({ a: 1 });

const change = await changes.next();
expect(change.value[0]).to.containSubset({
operationType: 'insert',
fullDocument: { a: 1 }
});

await utilClient.db('admin').command({
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
closeConnection: true,
appName: appName
}
} as FailCommandFailPoint);
await forcePrimaryStepDown(utilClient);

await collection.insertOne({ a: 2 });

const change2 = await changes.next();
expect(change2.value[0]).to.containSubset({
operationType: 'insert',
fullDocument: { a: 2 }
});

expect(aggregateEvents).to.have.lengthOf(2);
const [e1, e2] = aggregateEvents;
expect(e1.address).to.not.equal(e2.address);
}
);
});
});

it(
Expand Down