diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index c35f5e53918..2b0e340e33a 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -154,8 +154,8 @@ export class Connection extends TypedEventEmitter { address: string; socketTimeoutMS: number; monitorCommands: boolean; + /** Indicates that the connection (including underlying TCP socket) has been closed. */ closed: boolean; - destroyed: boolean; lastHelloMS?: number; serverApi?: ServerApi; helloOk?: boolean; @@ -204,7 +204,6 @@ export class Connection extends TypedEventEmitter { this.monitorCommands = options.monitorCommands; this.serverApi = options.serverApi; this.closed = false; - this.destroyed = false; this[kHello] = null; this[kClusterTime] = null; @@ -294,56 +293,19 @@ export class Connection extends TypedEventEmitter { } onError(error: Error) { - if (this.closed) { - return; - } - - this[kStream].destroy(error); - - this.closed = true; - - for (const op of this[kQueue].values()) { - op.cb(error); - } - - this[kQueue].clear(); - this.emit(Connection.CLOSE); + this.cleanup(true, error); } onClose() { - if (this.closed) { - return; - } - - this.closed = true; - const message = `connection ${this.id} to ${this.address} closed`; - for (const op of this[kQueue].values()) { - op.cb(new MongoNetworkError(message)); - } - - this[kQueue].clear(); - this.emit(Connection.CLOSE); + this.cleanup(true, new MongoNetworkError(message)); } onTimeout() { - if (this.closed) { - return; - } - this[kDelayedTimeoutId] = setTimeout(() => { - this[kStream].destroy(); - - this.closed = true; - const message = `connection ${this.id} to ${this.address} timed out`; const beforeHandshake = this.hello == null; - for (const op of this[kQueue].values()) { - op.cb(new MongoNetworkTimeoutError(message, { beforeHandshake })); - } - - this[kQueue].clear(); - this.emit(Connection.CLOSE); + this.cleanup(true, new MongoNetworkTimeoutError(message, { beforeHandshake })); }, 1).unref(); // No need for this timer to hold the event loop open } @@ -364,7 +326,7 @@ export class Connection extends TypedEventEmitter { // First check if the map is of invalid size if (this[kQueue].size > 1) { - this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE)); + this.cleanup(true, new MongoRuntimeError(INVALID_QUEUE_SIZE)); } else { // Get the first orphaned operation description. const entry = this[kQueue].entries().next(); @@ -444,34 +406,67 @@ export class Connection extends TypedEventEmitter { } destroy(options: DestroyOptions, callback?: Callback): void { + if (this.closed) { + process.nextTick(() => callback?.()); + return; + } + if (typeof callback === 'function') { + this.once('close', () => process.nextTick(() => callback())); + } + + // load balanced mode requires that these listeners remain on the connection + // after cleanup on timeouts, errors or close so we remove them before calling + // cleanup. this.removeAllListeners(Connection.PINNED); this.removeAllListeners(Connection.UNPINNED); + const message = `connection ${this.id} to ${this.address} closed`; + this.cleanup(options.force, new MongoNetworkError(message)); + } - if (this[kStream] == null || this.destroyed) { - this.destroyed = true; - if (typeof callback === 'function') { - callback(); - } - + /** + * A method that cleans up the connection. When `force` is true, this method + * forcibly destroys the socket. + * + * If an error is provided, any in-flight operations will be closed with the error. + * + * This method does nothing if the connection is already closed. + */ + private cleanup(force: boolean, error?: Error): void { + if (this.closed) { return; } - if (options.force) { - this[kStream].destroy(); - this.destroyed = true; - if (typeof callback === 'function') { - callback(); + this.closed = true; + + const completeCleanup = () => { + for (const op of this[kQueue].values()) { + op.cb(error); } + this[kQueue].clear(); + + this.emit(Connection.CLOSE); + }; + + this[kStream].removeAllListeners(); + this[kMessageStream].removeAllListeners(); + + this[kMessageStream].destroy(); + + if (force) { + this[kStream].destroy(); + completeCleanup(); return; } - this[kStream].end(() => { - this.destroyed = true; - if (typeof callback === 'function') { - callback(); - } - }); + if (!this[kStream].writableEnded) { + this[kStream].end(() => { + this[kStream].destroy(); + completeCleanup(); + }); + } else { + completeCleanup(); + } } command( diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index c1801cf9e6c..8298b13c778 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -12,9 +12,11 @@ import { hasSessionSupport, isHello, MessageStream, + MongoNetworkError, MongoNetworkTimeoutError, MongoRuntimeError, - ns + ns, + OperationDescription } from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils'; @@ -31,6 +33,8 @@ const connectionOptionsDefaults = { /** The absolute minimum socket API needed by Connection as of writing this test */ class FakeSocket extends EventEmitter { + destroyed = false; + writableEnded: boolean; address() { // is never called } @@ -39,6 +43,15 @@ class FakeSocket extends EventEmitter { } destroy() { // is called, has no side effects + this.writableEnded = true; + this.destroyed = true; + } + end(cb) { + this.writableEnded = true; + // nextTick to simulate I/O delay + if (typeof cb === 'function') { + process.nextTick(cb); + } } get remoteAddress() { return 'iLoveJavaScript'; @@ -48,12 +61,26 @@ class FakeSocket extends EventEmitter { } } +class InputStream extends Readable { + writableEnded: boolean; + constructor(options?) { + super(options); + } + + end(cb) { + this.writableEnded = true; + if (typeof cb === 'function') { + process.nextTick(cb); + } + } +} + describe('new Connection()', function () { let server; after(() => mock.cleanup()); before(() => mock.createServer().then(s => (server = s))); - it('should support fire-and-forget messages', function (done) { + it('supports fire-and-forget messages', function (done) { server.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -82,7 +109,7 @@ describe('new Connection()', function () { }); }); - it('should destroy streams which time out', function (done) { + it('destroys streams which time out', function (done) { server.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -113,7 +140,7 @@ describe('new Connection()', function () { }); }); - it('should throw a network error with kBeforeHandshake set to false on timeout after handshake', function (done) { + it('throws a network error with kBeforeHandshake set to false on timeout after handshake', function (done) { server.setMessageHandler(request => { const doc = request.document; if (isHello(doc)) { @@ -142,7 +169,7 @@ describe('new Connection()', function () { }); }); - it('should throw a network error with kBeforeHandshake set to true on timeout before handshake', function (done) { + it('throws a network error with kBeforeHandshake set to true on timeout before handshake', function (done) { server.setMessageHandler(() => { // respond to no requests to trigger timeout event }); @@ -175,7 +202,7 @@ describe('new Connection()', function () { context('when multiple hellos exist on the stream', function () { let callbackSpy; - const inputStream = new Readable(); + const inputStream = new InputStream(); const document = { ok: 1 }; const last = { isWritablePrimary: true }; @@ -374,69 +401,334 @@ describe('new Connection()', function () { }); }); - describe('onTimeout()', () => { + describe('when the socket times out', () => { let connection: sinon.SinonSpiedInstance; let clock: sinon.SinonFakeTimers; let timerSandbox: sinon.SinonFakeTimers; - let driverSocket: sinon.SinonSpiedInstance; + let driverSocket: FakeSocket; let messageStream: MessageStream; - let kDelayedTimeoutId: symbol; + let callbackSpy; + let closeCount = 0; + let kDelayedTimeoutId; let NodeJSTimeoutClass: any; beforeEach(() => { timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers(); + clock = sinon.useFakeTimers({ + toFake: ['nextTick', 'setTimeout', 'clearTimeout'] + }); + driverSocket = new FakeSocket(); + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = new Connection(driverSocket, connectionOptionsDefaults); + const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); + messageStream = connection[messageStreamSymbol]; + + callbackSpy = sinon.spy(); + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + connection.on('close', () => { + closeCount++; + }); + connection.once(Connection.PINNED, () => { + /* no-op */ + }); + connection.once(Connection.UNPINNED, () => { + /* no-op */ + }); + + // Stick an operation description in the queue. + const queueSymbol = getSymbolFrom(connection, 'queue'); + connection[queueSymbol].set(1, operationDescription); + + kDelayedTimeoutId = getSymbolFrom(connection, 'delayedTimeoutId'); NodeJSTimeoutClass = setTimeout(() => null, 1).constructor; + }); - driverSocket = sinon.spy(new FakeSocket()); + afterEach(() => { + closeCount = 0; + sinon.restore(); + timerSandbox.restore(); + clock.restore(); + }); + + context('delayed timeout for lambda behavior', () => { + let cleanupSpy; + let timeoutSpy; + beforeEach(() => { + cleanupSpy = sinon.spy(connection, 'cleanup'); + timeoutSpy = sinon.spy(connection, 'onTimeout'); + }); + + it('delays timeout errors by one tick', async () => { + expect(connection).to.have.property(kDelayedTimeoutId, null); + + driverSocket.emit('timeout'); + expect(cleanupSpy).to.not.have.been.called; + expect(connection) + .to.have.property(kDelayedTimeoutId) + .that.is.instanceOf(NodeJSTimeoutClass); + expect(connection).to.have.property('closed', false); + + clock.tick(1); + + expect(cleanupSpy).to.have.been.calledOnce; + expect(connection).to.have.property('closed', true); + }); + + it('clears timeout errors if more data is available', () => { + expect(connection).to.have.property(kDelayedTimeoutId, null); + + driverSocket.emit('timeout'); + expect(timeoutSpy).to.have.been.calledOnce; + expect(cleanupSpy).not.to.have.been.called; + expect(connection) + .to.have.property(kDelayedTimeoutId) + .that.is.instanceOf(NodeJSTimeoutClass); + + // emit a message before the clock ticks even once + // onMessage ignores unknown 'responseTo' value + messageStream.emit('message', { responseTo: null }); + + // New message before clock ticks 1 will clear the timeout + expect(connection).to.have.property(kDelayedTimeoutId, null); + + // ticking the clock should do nothing, there is no timeout anymore + clock.tick(1); + + expect(cleanupSpy).not.to.have.been.called; + expect(connection).to.have.property('closed', false); + expect(connection).to.have.property(kDelayedTimeoutId, null); + }); + }); + + context('when the timeout expires and no more data has come in', () => { + beforeEach(() => { + driverSocket.emit('timeout'); + clock.tick(1); + }); + + it('destroys the MessageStream', () => { + expect(messageStream.destroyed).to.be.true; + }); + + it('ends the socket', () => { + expect(driverSocket.writableEnded).to.be.true; + }); + + it('destroys the socket after ending it', () => { + expect(driverSocket.destroyed).to.be.true; + }); + + it('passes the error along to any callbacks in the operation description queue (asynchronously)', () => { + expect(callbackSpy).to.have.been.calledOnce; + const error = callbackSpy.firstCall.args[0]; + expect(error).to.be.instanceof(MongoNetworkTimeoutError); + }); + + it('emits a Connection.CLOSE event (asynchronously)', () => { + expect(closeCount).to.equal(1); + }); + + it('does NOT remove all Connection.PINNED listeners', () => { + expect(connection.listenerCount(Connection.PINNED)).to.equal(1); + }); + + it('does NOT remove all Connection.UNPINNED listeners', () => { + expect(connection.listenerCount(Connection.UNPINNED)).to.equal(1); + }); + + it('removes all listeners on the MessageStream', () => { + expect(messageStream.eventNames()).to.have.lengthOf(0); + }); + + it('removes all listeners on the socket', () => { + expect(driverSocket.eventNames()).to.have.lengthOf(0); + }); + }); + }); + + describe('when the MessageStream errors', () => { + let connection: sinon.SinonSpiedInstance; + let clock: sinon.SinonFakeTimers; + let timerSandbox: sinon.SinonFakeTimers; + let driverSocket: FakeSocket; + let messageStream: MessageStream; + let callbackSpy; + const error = new Error('something went wrong'); + let closeCount = 0; + + beforeEach(() => { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers({ + toFake: ['nextTick'] + }); + driverSocket = new FakeSocket(); // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + connection = new Connection(driverSocket, connectionOptionsDefaults); const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); - kDelayedTimeoutId = getSymbolFrom(connection, 'delayedTimeoutId'); messageStream = connection[messageStreamSymbol]; + + callbackSpy = sinon.spy(); + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + + connection.on('close', () => { + closeCount++; + }); + + connection.once(Connection.PINNED, () => { + /* no-op */ + }); + connection.once(Connection.UNPINNED, () => { + /* no-op */ + }); + + // Stick an operation description in the queue. + const queueSymbol = getSymbolFrom(connection, 'queue'); + connection[queueSymbol].set(1, operationDescription); + + messageStream.emit('error', error); }); afterEach(() => { + closeCount = 0; + sinon.restore(); timerSandbox.restore(); clock.restore(); }); - it('should delay timeout errors by one tick', async () => { - expect(connection).to.have.property(kDelayedTimeoutId, null); + it('destroys the MessageStream synchronously', () => { + expect(messageStream.destroyed).to.be.true; + }); + + it('ends the socket', () => { + expect(driverSocket.writableEnded).to.be.true; + }); - driverSocket.emit('timeout'); - expect(connection.onTimeout).to.have.been.calledOnce; - expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass); - expect(connection).to.have.property('closed', false); - expect(driverSocket.destroy).to.not.have.been.called; + it('destroys the socket after ending it (synchronously)', () => { + expect(driverSocket.destroyed).to.be.true; + }); - clock.tick(1); + it('passes the error along to any callbacks in the operation description queue (synchronously)', () => { + expect(callbackSpy).to.have.been.calledOnceWithExactly(error); + }); - expect(driverSocket.destroy).to.have.been.calledOnce; - expect(connection).to.have.property('closed', true); + it('emits a Connection.CLOSE event (synchronously)', () => { + expect(closeCount).to.equal(1); }); - it('should clear timeout errors if more data is available', () => { - expect(connection).to.have.property(kDelayedTimeoutId, null); + it('does NOT remove all Connection.PINNED listeners', () => { + expect(connection.listenerCount(Connection.PINNED)).to.equal(1); + }); - driverSocket.emit('timeout'); - expect(connection.onTimeout).to.have.been.calledOnce; - expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass); + it('does NOT remove all Connection.UNPINNED listeners', () => { + expect(connection.listenerCount(Connection.UNPINNED)).to.equal(1); + }); - // emit a message before the clock ticks even once - // onMessage ignores unknown 'responseTo' value - messageStream.emit('message', { responseTo: null }); + it('removes all listeners on the MessageStream', () => { + expect(messageStream.eventNames()).to.have.lengthOf(0); + }); - // New message before clock ticks 1 will clear the timeout - expect(connection).to.have.property(kDelayedTimeoutId, null); + it('removes all listeners on the socket', () => { + expect(driverSocket.eventNames()).to.have.lengthOf(0); + }); + }); - // ticking the clock should do nothing, there is no timeout anymore - clock.tick(1); + describe('when the underlying socket closes', () => { + let connection: sinon.SinonSpiedInstance; + let clock: sinon.SinonFakeTimers; + let timerSandbox: sinon.SinonFakeTimers; + let driverSocket: FakeSocket; + let messageStream: MessageStream; + let callbackSpy; + let closeCount = 0; - expect(driverSocket.destroy).to.not.have.been.called; - expect(connection).to.have.property('closed', false); - expect(connection).to.have.property(kDelayedTimeoutId, null); + beforeEach(() => { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers({ + toFake: ['nextTick'] + }); + driverSocket = new FakeSocket(); + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = new Connection(driverSocket, connectionOptionsDefaults); + const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); + messageStream = connection[messageStreamSymbol]; + + callbackSpy = sinon.spy(); + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + + connection.on('close', () => { + closeCount++; + }); + + connection.once(Connection.PINNED, () => { + /* no-op */ + }); + connection.once(Connection.UNPINNED, () => { + /* no-op */ + }); + + // Stick an operation description in the queue. + const queueSymbol = getSymbolFrom(connection, 'queue'); + connection[queueSymbol].set(1, operationDescription); + + driverSocket.emit('close'); + }); + + afterEach(() => { + closeCount = 0; + sinon.restore(); + timerSandbox.restore(); + clock.restore(); + }); + + it('destroys the MessageStream synchronously', () => { + expect(messageStream.destroyed).to.be.true; + }); + + it('ends the socket', () => { + expect(driverSocket.writableEnded).to.be.true; + }); + + it('destroys the socket after ending it (synchronously)', () => { + expect(driverSocket.destroyed).to.be.true; + }); + + it('calls any callbacks in the queue with a MongoNetworkError (synchronously)', () => { + expect(callbackSpy).to.have.been.calledOnce; + const error = callbackSpy.firstCall.args[0]; + expect(error).to.be.instanceof(MongoNetworkError); + }); + + it('emits a Connection.CLOSE event (synchronously)', () => { + expect(closeCount).to.equal(1); + }); + + it('does NOT remove all Connection.PINNED listeners', () => { + expect(connection.listenerCount(Connection.PINNED)).to.equal(1); + }); + + it('does NOT remove all Connection.UNPINNED listeners', () => { + expect(connection.listenerCount(Connection.UNPINNED)).to.equal(1); + }); + + it('removes all listeners on the MessageStream', () => { + expect(messageStream.eventNames()).to.have.lengthOf(0); + }); + + it('removes all listeners on the socket', () => { + expect(driverSocket.eventNames()).to.have.lengthOf(0); }); }); @@ -491,4 +783,138 @@ describe('new Connection()', function () { }); }); }); + + describe('destroy()', () => { + let connection: sinon.SinonSpiedInstance; + let clock: sinon.SinonFakeTimers; + let timerSandbox: sinon.SinonFakeTimers; + let driverSocket: sinon.SinonSpiedInstance; + let messageStream: MessageStream; + beforeEach(() => { + timerSandbox = createTimerSandbox(); + clock = sinon.useFakeTimers({ + toFake: ['nextTick'] + }); + + driverSocket = sinon.spy(new FakeSocket()); + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); + messageStream = sinon.spy(connection[messageStreamSymbol]); + }); + + afterEach(() => { + timerSandbox.restore(); + clock.restore(); + }); + + it('removes all Connection.PINNED listeners', () => { + connection.once(Connection.PINNED, () => { + /* no-op */ + }); + connection.destroy({ force: true }); + expect(connection.listenerCount(Connection.PINNED)).to.equal(0); + }); + + it('removes all Connection.UNPINNED listeners', () => { + connection.once(Connection.UNPINNED, () => { + /* no-op */ + }); + connection.destroy({ force: true }); + expect(connection.listenerCount(Connection.UNPINNED)).to.equal(0); + }); + + context('when a callback is provided', () => { + context('when the connection was already destroyed', () => { + let callbackSpy; + beforeEach(() => { + connection.destroy({ force: true }); + connection.cleanup.resetHistory(); + callbackSpy = sinon.spy(); + }); + it('does not attempt to cleanup the socket again', () => { + connection.destroy({ force: true }, callbackSpy); + expect(connection.cleanup).not.to.have.been.called; + }); + + it('calls the callback (asynchronously)', () => { + connection.destroy({ force: true }, callbackSpy); + expect(callbackSpy).not.to.have.been.called; + clock.runAll(); + expect(callbackSpy).to.have.been.called; + }); + }); + + context('when the connection was not destroyed', () => { + let callbackSpy; + beforeEach(() => { + callbackSpy = sinon.spy(); + }); + it('cleans up the connection', () => { + connection.destroy({ force: true }, callbackSpy); + expect(connection.cleanup).to.have.been.called; + }); + + it('calls the callback (asynchronously)', () => { + connection.destroy({ force: true }, callbackSpy); + expect(callbackSpy).not.to.have.been.called; + clock.runAll(); + expect(callbackSpy).to.have.been.called; + }); + }); + }); + + context('when options.force == true', function () { + it('destroys the tcp socket (synchronously)', () => { + expect(driverSocket.destroy).not.to.have.been.called; + connection.destroy({ force: true }); + expect(driverSocket.destroy).to.have.been.calledOnce; + }); + + it('does not call stream.end', () => { + connection.destroy({ force: true }); + expect(driverSocket.end).to.not.have.been.called; + }); + + it('destroys the messageStream (synchronously)', () => { + connection.destroy({ force: true }); + expect(messageStream.destroy).to.have.been.calledOnce; + }); + + it('when destroy({ force: true }) is called multiple times, it calls stream.destroy exactly once', () => { + connection.destroy({ force: true }); + connection.destroy({ force: true }); + connection.destroy({ force: true }); + expect(driverSocket.destroy).to.have.been.calledOnce; + }); + }); + + context('when options.force == false', function () { + it('destroys the tcp socket (asynchronously)', () => { + connection.destroy({ force: false }); + expect(driverSocket.destroy).not.to.have.been.called; + clock.tick(1); + expect(driverSocket.destroy).to.have.been.called; + }); + + it('ends the tcp socket (synchronously)', () => { + connection.destroy({ force: false }); + expect(driverSocket.end).to.have.been.calledOnce; + }); + + it('destroys the messageStream (synchronously)', () => { + connection.destroy({ force: false }); + expect(messageStream.destroy).to.have.been.calledOnce; + }); + + it('calls stream.end exactly once when destroy is called multiple times', () => { + connection.destroy({ force: false }); + connection.destroy({ force: false }); + connection.destroy({ force: false }); + connection.destroy({ force: false }); + clock.tick(1); + expect(driverSocket.end).to.have.been.calledOnce; + }); + }); + }); });