From 0ebf3704d85df8061e4a15c9db0e5a8a202ce88c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 13 Dec 2023 14:36:49 -0500 Subject: [PATCH 01/12] chore(NODE-5771): benchmark new connection --- modern.cjs | 42 +++++++++++++++++++++++++++ old.cjs | 42 +++++++++++++++++++++++++++ test/benchmarks/driverBench/common.js | 6 +++- test/benchmarks/driverBench/index.js | 16 ++++++++++ 4 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 modern.cjs create mode 100644 old.cjs diff --git a/modern.cjs b/modern.cjs new file mode 100644 index 00000000000..2f6c9b84ac0 --- /dev/null +++ b/modern.cjs @@ -0,0 +1,42 @@ +const totalStart = performance.now(); +/* eslint-disable no-console */ +/* eslint-disable @typescript-eslint/no-var-requires */ +const process = require('node:process'); +const { MongoClient } = require('./lib/index.js'); +const { ModernConnection } = require('./lib/cmap/connection.js'); + +const tweet = require('./test/benchmarks/driverBench/spec/single_and_multi_document/tweet.json'); + +const client = new MongoClient(process.env.MONGODB_URI, { connectionType: ModernConnection }); + +async function main() { + console.log('modern connection'); + + const db = client.db('test'); + let collection = db.collection('test'); + await collection.drop().catch(() => null); + collection = await db.createCollection('test'); + await collection.insertOne(tweet); + + const total = 10_000; + + for (let i = 0; i < total; i++) { + await collection.findOne(); + } + + const start = performance.now() - totalStart; + for (let i = 0; i < total; i++) { + await collection.findOne(); + } + const end = performance.now() - totalStart; + + console.log( + `end - start = ms time for 10k findOne calls (script boot: ${totalStart.toFixed(3)})` + ); + console.log(`${end.toFixed(3)} - ${start.toFixed(3)} = ${(end - start).toFixed(4)}`); + console.log(`avg findOne: ${((end - start) / total).toFixed(3)} ms`); + + await client.close(); +} + +main().catch(console.error); diff --git a/old.cjs b/old.cjs new file mode 100644 index 00000000000..3dcc7914e69 --- /dev/null +++ b/old.cjs @@ -0,0 +1,42 @@ +const totalStart = performance.now(); +/* eslint-disable no-console */ +/* eslint-disable @typescript-eslint/no-var-requires */ +const process = require('node:process'); +const { MongoClient } = require('./lib/index.js'); +const { Connection } = require('./lib/cmap/connection.js'); + +const tweet = require('./test/benchmarks/driverBench/spec/single_and_multi_document/tweet.json'); + +const client = new MongoClient(process.env.MONGODB_URI, { connectionType: Connection }); + +async function main() { + console.log('old connection'); + + const db = client.db('test'); + let collection = db.collection('test'); + await collection.drop().catch(() => null); + collection = await db.createCollection('test'); + await collection.insertOne(tweet); + + const total = 10_000; + + for (let i = 0; i < total; i++) { + await collection.findOne(); + } + + const start = performance.now() - totalStart; + for (let i = 0; i < total; i++) { + await collection.findOne(); + } + const end = performance.now() - totalStart; + + console.log( + `end - start = ms time for 10k findOne calls (script boot: ${totalStart.toFixed(3)})` + ); + console.log(`${end.toFixed(3)} - ${start.toFixed(3)} = ${(end - start).toFixed(4)}`); + console.log(`avg findOne: ${((end - start) / total).toFixed(3)} ms`); + + await client.close(); +} + +main().catch(console.error); diff --git a/test/benchmarks/driverBench/common.js b/test/benchmarks/driverBench/common.js index e194ea91874..45415c6023e 100644 --- a/test/benchmarks/driverBench/common.js +++ b/test/benchmarks/driverBench/common.js @@ -6,6 +6,8 @@ const { Readable } = require('stream'); const { pipeline } = require('stream/promises'); const { MongoClient } = require('../../..'); const { GridFSBucket } = require('../../..'); +// eslint-disable-next-line no-restricted-modules +const { ModernConnection, Connection } = require('../../../lib/cmap/connection'); // eslint-disable-next-line no-restricted-modules const { MONGODB_ERROR_CODES } = require('../../../lib/error'); @@ -25,7 +27,9 @@ function loadSpecString(filePath) { } function makeClient() { - this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://localhost:27017'); + this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017', { + connectionType: Connection + }); } function connectClient() { diff --git a/test/benchmarks/driverBench/index.js b/test/benchmarks/driverBench/index.js index 1a1d847822f..09e2d5148ac 100644 --- a/test/benchmarks/driverBench/index.js +++ b/test/benchmarks/driverBench/index.js @@ -1,6 +1,7 @@ 'use strict'; const MongoBench = require('../mongoBench'); +const os = require('node:os'); const Runner = MongoBench.Runner; @@ -11,6 +12,21 @@ const { inspect } = require('util'); const { writeFile } = require('fs/promises'); const { makeParallelBenchmarks, makeSingleBench, makeMultiBench } = require('../mongoBench/suites'); +const hw = os.cpus(); +const ram = os.totalmem() / 1024 ** 3; +const platform = { name: hw[0].model, cores: hw.length, ram: `${ram}GB` }; + +const systemInfo = () => + [ + `Connection`, + `\n- cpu: ${platform.name}`, + `- cores: ${platform.cores}`, + `- arch: ${os.arch()}`, + `- os: ${process.platform} (${os.release()})`, + `- ram: ${platform.ram}` + ].join('\n'); +console.log(systemInfo()); + function average(arr) { return arr.reduce((x, y) => x + y, 0) / arr.length; } From 12bb88ab7aa88a9a751907449722bf48270d065e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 13 Dec 2023 17:56:52 -0500 Subject: [PATCH 02/12] perf: remove done signal for abortable helper --- src/cmap/connection.ts | 2 +- src/utils.ts | 45 ++++++++++++++++++------------------------ 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 8d1842c7943..837f13a192a 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1243,7 +1243,7 @@ export async function writeCommand( connection: ModernConnection, command: WriteProtocolMessageType, options: Partial> & { - signal?: AbortSignal; + signal: AbortSignal; } ): Promise { const finalCommand = diff --git a/src/utils.ts b/src/utils.ts index 05b786c2eb8..9c06d106796 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1314,36 +1314,29 @@ export function isHostMatch(match: RegExp, host?: string): boolean { */ export async function abortable( promise: Promise, - { signal }: { signal?: AbortSignal } = {} + { signal }: { signal: AbortSignal } ): Promise { - const { abort, done } = aborted(signal); + const { promise: aborted, reject } = promiseWithResolvers(); + + function rejectOnAbort() { + reject(signal.reason); + } + signal.addEventListener('abort', rejectOnAbort, { once: true }); + try { - return await Promise.race([promise, abort]); + return await Promise.race([promise, aborted]); } finally { - done.abort(); + signal.removeEventListener('abort', rejectOnAbort); } } -/** - * Takes an AbortSignal and creates a promise that will reject when the signal aborts - * If the argument provided is nullish the returned promise will **never** resolve. - * Also returns a done controller - abort the done controller when your task is done to remove the abort listeners - * @param signal - an optional abort signal to link to a promise rejection - */ -function aborted(signal?: AbortSignal): { - abort: Promise; - done: AbortController; -} { - const done = new AbortController(); - if (signal?.aborted) { - return { abort: Promise.reject(signal.reason), done }; - } - const abort = new Promise((_, reject) => - signal?.addEventListener('abort', () => reject(signal.reason), { - once: true, - // @ts-expect-error: @types/node erroneously claim this does not exist - signal: done.signal - }) - ); - return { abort, done }; +function promiseWithResolvers() { + let resolve: Parameters>[0]>[0]; + let reject: Parameters>[0]>[1]; + const promise = new Promise(function withResolversExecutor(promiseResolve, promiseReject) { + resolve = promiseResolve; + reject = promiseReject; + }); + // @ts-expect-error: TS does not know what I know + return { promise, resolve, reject } as const; } From c36c10362134eaf151392e28eedb456e793ebf5b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 13 Dec 2023 18:01:58 -0500 Subject: [PATCH 03/12] perf: create promisified socketWrite once per connection --- src/cmap/connection.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 837f13a192a..003d23eb0a2 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -819,6 +819,8 @@ export class ModernConnection extends TypedEventEmitter { /** @event */ static readonly UNPINNED = UNPINNED; + socketWrite: (buffer: Uint8Array, options: { signal: AbortSignal }) => Promise; + constructor(stream: Stream, options: ConnectionOptions) { super(); @@ -839,6 +841,11 @@ export class ModernConnection extends TypedEventEmitter { this.socket.on('error', this.onError.bind(this)); this.socket.on('close', this.onClose.bind(this)); this.socket.on('timeout', this.onTimeout.bind(this)); + + const socketWrite = promisify(this.socket.write.bind(this.socket)); + this.socketWrite = (buffer, options) => { + return abortable(socketWrite(buffer), options); + }; } async commandAsync(...args: Parameters) { @@ -1256,9 +1263,7 @@ export async function writeCommand( const buffer = Buffer.concat(await finalCommand.toBin()); - const socketWriteFn = promisify(connection.socket.write.bind(connection.socket)); - - return abortable(socketWriteFn(buffer), options); + return connection.socketWrite(buffer, options); } /** From bb83edbf9ecdaba6c7dbcc576191637f95e34923 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 13 Dec 2023 18:21:30 -0500 Subject: [PATCH 04/12] perf: workaround creating many AbortControllers --- src/cmap/connection.ts | 30 ++-------- src/cmap/wire_protocol/on_data.ts | 96 +++++++++++++++++++++++++++++++ src/index.ts | 1 - src/utils.ts | 2 +- 4 files changed, 101 insertions(+), 28 deletions(-) create mode 100644 src/cmap/wire_protocol/on_data.ts diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 003d23eb0a2..a61ad87843b 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,4 +1,3 @@ -import { on } from 'stream'; import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; @@ -61,6 +60,7 @@ import type { ClientMetadata } from './handshake/client_metadata'; import { MessageStream, type OperationDescription } from './message_stream'; import { StreamDescription, type StreamDescriptionOptions } from './stream_description'; import { decompressResponse } from './wire_protocol/compression'; +import { onData } from './wire_protocol/on_data'; import { getReadPreference, isSharded } from './wire_protocol/shared'; /** @internal */ @@ -1052,9 +1052,6 @@ export class ModernConnection extends TypedEventEmitter { signal: this.controller.signal }); - // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners - this.controller = new AbortController(); - if (options.noResponse) { yield { ok: 1 }; return; @@ -1080,9 +1077,6 @@ export class ModernConnection extends TypedEventEmitter { } } - // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners - this.controller = new AbortController(); - yield document; this.controller.signal.throwIfAborted(); @@ -1205,11 +1199,11 @@ const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; */ export async function* readWireProtocolMessages( connection: ModernConnection, - { signal }: { signal?: AbortSignal } = {} + { signal }: { signal: AbortSignal } ): AsyncGenerator { const bufferPool = new BufferPool(); const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize; - for await (const [chunk] of on(connection.socket, 'data', { signal })) { + for await (const chunk of onData(connection.socket, { signal })) { if (connection.delayedTimeoutId) { clearTimeout(connection.delayedTimeoutId); connection.delayedTimeoutId = null; @@ -1277,7 +1271,7 @@ export async function writeCommand( */ export async function* readMany( connection: ModernConnection, - options: { signal?: AbortSignal } = {} + options: { signal: AbortSignal } ): AsyncGenerator { for await (const message of readWireProtocolMessages(connection, options)) { const response = await decompressResponse(message); @@ -1288,19 +1282,3 @@ export async function* readMany( } } } - -/** - * @internal - * - * Reads a single wire protocol message out of a connection. - */ -export async function read( - connection: ModernConnection, - options: { signal?: AbortSignal } = {} -): Promise { - for await (const value of readMany(connection, options)) { - return value; - } - - throw new MongoRuntimeError('unable to read message off of connection'); -} diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts new file mode 100644 index 00000000000..c9308771670 --- /dev/null +++ b/src/cmap/wire_protocol/on_data.ts @@ -0,0 +1,96 @@ +import { type EventEmitter } from 'events'; + +import { List, promiseWithResolvers } from '../../utils'; + +export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) { + const signal = options.signal; + signal.throwIfAborted(); + + // Preparing controlling queues and variables + const unconsumedEvents = new List(); + const unconsumedPromises = new List< + Omit>>, 'promise'> + >(); + let error: Error | null = null; + let finished = false; + + const iterator: AsyncGenerator = { + next() { + // First, we consume all unread events + const value = unconsumedEvents.shift(); + if (value != null) { + return Promise.resolve({ value, done: false }); + } + + // Then we error, if an error happened + // This happens one time if at all, because after 'error' + // we stop listening + if (error != null) { + const p = Promise.reject(error); + // Only the first element errors + error = null; + return p; + } + + // If the iterator is finished, resolve to done + if (finished) return closeHandler(); + + // Wait until an event happens + const { promise, resolve, reject } = promiseWithResolvers>(); + unconsumedPromises.push({ resolve, reject }); + return promise; + }, + + return() { + return closeHandler(); + }, + + throw(err: Error) { + errorHandler(err); + return Promise.resolve({ value: undefined, done: true }); + }, + + [Symbol.asyncIterator]() { + return this; + } + }; + + // Adding event handlers + emitter.on('data', eventHandler); + emitter.on('error', errorHandler); + signal.addEventListener('abort', abortListener, { once: true }); + + return iterator; + + function abortListener() { + errorHandler(signal.reason); + } + + function eventHandler(value: Buffer) { + const promise = unconsumedPromises.shift(); + if (promise != null) promise.resolve({ value, done: false }); + else unconsumedEvents.push(value); + } + + function errorHandler(err: Error) { + const promise = unconsumedPromises.shift(); + if (promise != null) promise.reject(err); + else error = err; + void closeHandler(); + } + + function closeHandler() { + // Adding event handlers + emitter.off('data', eventHandler); + emitter.off('error', errorHandler); + signal.removeEventListener('abort', abortListener); + finished = true; + const doneResult = { value: undefined, done: finished } as const; + + for (const promise of unconsumedPromises) { + promise.resolve(doneResult); + } + + return Promise.resolve(doneResult); + } +} diff --git a/src/index.ts b/src/index.ts index 06ccc1bee2e..6b7179bffd1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -270,7 +270,6 @@ export type { DestroyOptions, ModernConnection, ProxyOptions, - read, readMany, writeCommand } from './cmap/connection'; diff --git a/src/utils.ts b/src/utils.ts index 9c06d106796..704df6e0013 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1330,7 +1330,7 @@ export async function abortable( } } -function promiseWithResolvers() { +export function promiseWithResolvers() { let resolve: Parameters>[0]>[0]; let reject: Parameters>[0]>[1]; const promise = new Promise(function withResolversExecutor(promiseResolve, promiseReject) { From f169290891655cfc0894471c23f0c73df4d6a85d Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 01:23:13 -0500 Subject: [PATCH 05/12] perf: add sized message transform stream --- src/cmap/connection.ts | 177 ++++++++++++++++++----------------------- src/index.ts | 4 +- 2 files changed, 79 insertions(+), 102 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index a61ad87843b..64f937283fa 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,3 +1,4 @@ +import { type Readable, Transform, type TransformCallback } from 'stream'; import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; @@ -786,7 +787,6 @@ export class ModernConnection extends TypedEventEmitter { /** @internal */ authContext?: AuthContext; - /**@internal */ delayedTimeoutId: NodeJS.Timeout | null = null; /** @internal */ [kDescription]: StreamDescription; @@ -794,9 +794,12 @@ export class ModernConnection extends TypedEventEmitter { [kGeneration]: number; /** @internal */ [kLastUseTime]: number; - /** @internal */ - socket: Stream; - controller: AbortController; + + private socket: Stream; + private controller: AbortController; + private messageStream: Readable; + private socketWrite: (buffer: Uint8Array, options: { signal: AbortSignal }) => Promise; + /** @internal */ [kHello]: Document | null; /** @internal */ @@ -819,8 +822,6 @@ export class ModernConnection extends TypedEventEmitter { /** @event */ static readonly UNPINNED = UNPINNED; - socketWrite: (buffer: Uint8Array, options: { signal: AbortSignal }) => Promise; - constructor(stream: Stream, options: ConnectionOptions) { super(); @@ -838,13 +839,17 @@ export class ModernConnection extends TypedEventEmitter { this.socket = stream; this.controller = new AbortController(); - this.socket.on('error', this.onError.bind(this)); + + this.messageStream = this.socket + .on('error', this.onError.bind(this)) + .pipe(new SizedMessageTransform({ connection: this })) + .on('error', this.onError.bind(this)); this.socket.on('close', this.onClose.bind(this)); this.socket.on('timeout', this.onTimeout.bind(this)); const socketWrite = promisify(this.socket.write.bind(this.socket)); - this.socketWrite = (buffer, options) => { - return abortable(socketWrite(buffer), options); + this.socketWrite = async buffer => { + return abortable(socketWrite(buffer), { signal: this.controller.signal }); }; } @@ -1046,10 +1051,9 @@ export class ModernConnection extends TypedEventEmitter { } try { - await writeCommand(this, message, { + await this.writeCommand(message, { agreedCompressor: this.description.compressor ?? 'none', - zlibCompressionLevel: this.description.zlibCompressionLevel, - signal: this.controller.signal + zlibCompressionLevel: this.description.zlibCompressionLevel }); if (options.noResponse) { @@ -1059,7 +1063,7 @@ export class ModernConnection extends TypedEventEmitter { this.controller.signal.throwIfAborted(); - for await (const response of readMany(this, { signal: this.controller.signal })) { + for await (const response of this.readMany()) { this.socket.setTimeout(0); response.parse(options); @@ -1182,103 +1186,78 @@ export class ModernConnection extends TypedEventEmitter { }; exhaustLoop().catch(replyListener); } -} -const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; - -/** - * @internal - * - * This helper reads chucks of data out of a socket and buffers them until it has received a - * full wire protocol message. - * - * By itself, produces an infinite async generator of wire protocol messages and consumers must end - * the stream by calling `return` on the generator. - * - * Note that `for-await` loops call `return` automatically when the loop is exited. - */ -export async function* readWireProtocolMessages( - connection: ModernConnection, - { signal }: { signal: AbortSignal } -): AsyncGenerator { - const bufferPool = new BufferPool(); - const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize; - for await (const chunk of onData(connection.socket, { signal })) { - if (connection.delayedTimeoutId) { - clearTimeout(connection.delayedTimeoutId); - connection.delayedTimeoutId = null; + /** + * @internal + * + * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method + * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired). + */ + async writeCommand( + command: WriteProtocolMessageType, + options: Partial> + ): Promise { + const finalCommand = + options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) + ? command + : new OpCompressedRequest(command, { + agreedCompressor: options.agreedCompressor ?? 'none', + zlibCompressionLevel: options.zlibCompressionLevel ?? 0 + }); + + const buffer = Buffer.concat(await finalCommand.toBin()); + + return this.socketWrite(buffer, { signal: this.controller.signal }); + } + + /** + * @internal + * + * Returns an async generator that yields full wire protocol messages from the underlying socket. This function + * yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request + * by calling `return` on the generator. + * + * Note that `for-await` loops call `return` automatically when the loop is exited. + */ + async *readMany(): AsyncGenerator { + for await (const message of onData(this.messageStream, { signal: this.controller.signal })) { + const response = await decompressResponse(message); + yield response; + + if (!response.moreToCome) { + return; + } } + } +} - bufferPool.append(chunk); - const sizeOfMessage = bufferPool.getInt32(); +/** @internal */ +export class SizedMessageTransform extends Transform { + bufferPool: BufferPool; + connection: ModernConnection; + + constructor({ connection }: { connection: ModernConnection }) { + super({ objectMode: false }); + this.bufferPool = new BufferPool(); + this.connection = connection; + } + override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void { + this.bufferPool.append(chunk); + const sizeOfMessage = this.bufferPool.getInt32(); if (sizeOfMessage == null) { - continue; + return callback(); } if (sizeOfMessage < 0) { - throw new MongoParseError(`Invalid message size: ${sizeOfMessage}`); + return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`)); } - if (sizeOfMessage > maxBsonMessageSize) { - throw new MongoParseError( - `Invalid message size: ${sizeOfMessage}, max allowed: ${maxBsonMessageSize}` - ); + if (sizeOfMessage > this.bufferPool.length) { + return callback(); } - if (sizeOfMessage > bufferPool.length) { - continue; - } - - yield bufferPool.read(sizeOfMessage); - } -} - -/** - * @internal - * - * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method - * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired). - */ -export async function writeCommand( - connection: ModernConnection, - command: WriteProtocolMessageType, - options: Partial> & { - signal: AbortSignal; - } -): Promise { - const finalCommand = - options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) - ? command - : new OpCompressedRequest(command, { - agreedCompressor: options.agreedCompressor ?? 'none', - zlibCompressionLevel: options.zlibCompressionLevel ?? 0 - }); - - const buffer = Buffer.concat(await finalCommand.toBin()); - - return connection.socketWrite(buffer, options); -} - -/** - * @internal - * - * Returns an async generator that yields full wire protocol messages from the underlying socket. This function - * yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request - * by calling `return` on the generator. - * - * Note that `for-await` loops call `return` automatically when the loop is exited. - */ -export async function* readMany( - connection: ModernConnection, - options: { signal: AbortSignal } -): AsyncGenerator { - for await (const message of readWireProtocolMessages(connection, options)) { - const response = await decompressResponse(message); - yield response; - - if (!response.moreToCome) { - return; - } + const message = this.bufferPool.read(sizeOfMessage); + return callback(null, message); } } diff --git a/src/index.ts b/src/index.ts index 6b7179bffd1..5ef232cf687 100644 --- a/src/index.ts +++ b/src/index.ts @@ -269,9 +269,7 @@ export type { ConnectionOptions, DestroyOptions, ModernConnection, - ProxyOptions, - readMany, - writeCommand + ProxyOptions } from './cmap/connection'; export type { CloseOptions, From 9d09defea54a01a35d461ce4a2378a8a014323dc Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 01:24:32 -0500 Subject: [PATCH 06/12] chore: run bench with new connection --- test/benchmarks/driverBench/common.js | 2 +- test/benchmarks/driverBench/index.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/benchmarks/driverBench/common.js b/test/benchmarks/driverBench/common.js index 45415c6023e..3f50fc29cbf 100644 --- a/test/benchmarks/driverBench/common.js +++ b/test/benchmarks/driverBench/common.js @@ -28,7 +28,7 @@ function loadSpecString(filePath) { function makeClient() { this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017', { - connectionType: Connection + connectionType: ModernConnection }); } diff --git a/test/benchmarks/driverBench/index.js b/test/benchmarks/driverBench/index.js index 09e2d5148ac..fd8f79ffd50 100644 --- a/test/benchmarks/driverBench/index.js +++ b/test/benchmarks/driverBench/index.js @@ -18,12 +18,12 @@ const platform = { name: hw[0].model, cores: hw.length, ram: `${ram}GB` }; const systemInfo = () => [ - `Connection`, + `ModernConnection`, `\n- cpu: ${platform.name}`, `- cores: ${platform.cores}`, `- arch: ${os.arch()}`, `- os: ${process.platform} (${os.release()})`, - `- ram: ${platform.ram}` + `- ram: ${platform.ram}\n` ].join('\n'); console.log(systemInfo()); From e1a54923c4ef5533a50794af444f1fc2287380a0 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 10:17:51 -0500 Subject: [PATCH 07/12] chore: fix unit --- src/cmap/connection.ts | 4 ++-- src/utils.ts | 4 +++- test/unit/utils.test.ts | 15 --------------- 3 files changed, 5 insertions(+), 18 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 64f937283fa..6f04b9c1cf7 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -798,7 +798,7 @@ export class ModernConnection extends TypedEventEmitter { private socket: Stream; private controller: AbortController; private messageStream: Readable; - private socketWrite: (buffer: Uint8Array, options: { signal: AbortSignal }) => Promise; + private socketWrite: (buffer: Uint8Array) => Promise; /** @internal */ [kHello]: Document | null; @@ -1207,7 +1207,7 @@ export class ModernConnection extends TypedEventEmitter { const buffer = Buffer.concat(await finalCommand.toBin()); - return this.socketWrite(buffer, { signal: this.controller.signal }); + return this.socketWrite(buffer); } /** diff --git a/src/utils.ts b/src/utils.ts index 704df6e0013..f46ca54a5dd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1321,7 +1321,9 @@ export async function abortable( function rejectOnAbort() { reject(signal.reason); } - signal.addEventListener('abort', rejectOnAbort, { once: true }); + + if (signal.aborted) rejectOnAbort(); + else signal.addEventListener('abort', rejectOnAbort, { once: true }); try { return await Promise.race([promise, aborted]); diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 980daf498a6..8be3753be72 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1174,21 +1174,6 @@ describe('driver utils', function () { const badError = new Error('unexpected bad error!'); const expectedValue = "don't panic"; - context('when not given a signal', () => { - it('returns promise fulfillment if the promise resolves or rejects', async () => { - expect(await abortable(Promise.resolve(expectedValue))).to.equal(expectedValue); - expect(await abortable(Promise.reject(goodError)).catch(e => e)).to.equal(goodError); - }); - - it('pends indefinitely if the promise is never settled', async () => { - const forever = abortable(new Promise(() => null)); - // Assume 100ms is good enough to prove "forever" - expect(await Promise.race([forever, sleep(100).then(() => expectedValue)])).to.equal( - expectedValue - ); - }); - }); - context('always removes the abort listener it attaches', () => { let controller; let removeEventListenerSpy; From b126f8b51dbf2e86bc851ab7e7d4618f05ea46f1 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 11:34:43 -0500 Subject: [PATCH 08/12] chore: clean up test files --- modern.cjs | 42 ------------------------------------------ old.cjs | 42 ------------------------------------------ 2 files changed, 84 deletions(-) delete mode 100644 modern.cjs delete mode 100644 old.cjs diff --git a/modern.cjs b/modern.cjs deleted file mode 100644 index 2f6c9b84ac0..00000000000 --- a/modern.cjs +++ /dev/null @@ -1,42 +0,0 @@ -const totalStart = performance.now(); -/* eslint-disable no-console */ -/* eslint-disable @typescript-eslint/no-var-requires */ -const process = require('node:process'); -const { MongoClient } = require('./lib/index.js'); -const { ModernConnection } = require('./lib/cmap/connection.js'); - -const tweet = require('./test/benchmarks/driverBench/spec/single_and_multi_document/tweet.json'); - -const client = new MongoClient(process.env.MONGODB_URI, { connectionType: ModernConnection }); - -async function main() { - console.log('modern connection'); - - const db = client.db('test'); - let collection = db.collection('test'); - await collection.drop().catch(() => null); - collection = await db.createCollection('test'); - await collection.insertOne(tweet); - - const total = 10_000; - - for (let i = 0; i < total; i++) { - await collection.findOne(); - } - - const start = performance.now() - totalStart; - for (let i = 0; i < total; i++) { - await collection.findOne(); - } - const end = performance.now() - totalStart; - - console.log( - `end - start = ms time for 10k findOne calls (script boot: ${totalStart.toFixed(3)})` - ); - console.log(`${end.toFixed(3)} - ${start.toFixed(3)} = ${(end - start).toFixed(4)}`); - console.log(`avg findOne: ${((end - start) / total).toFixed(3)} ms`); - - await client.close(); -} - -main().catch(console.error); diff --git a/old.cjs b/old.cjs deleted file mode 100644 index 3dcc7914e69..00000000000 --- a/old.cjs +++ /dev/null @@ -1,42 +0,0 @@ -const totalStart = performance.now(); -/* eslint-disable no-console */ -/* eslint-disable @typescript-eslint/no-var-requires */ -const process = require('node:process'); -const { MongoClient } = require('./lib/index.js'); -const { Connection } = require('./lib/cmap/connection.js'); - -const tweet = require('./test/benchmarks/driverBench/spec/single_and_multi_document/tweet.json'); - -const client = new MongoClient(process.env.MONGODB_URI, { connectionType: Connection }); - -async function main() { - console.log('old connection'); - - const db = client.db('test'); - let collection = db.collection('test'); - await collection.drop().catch(() => null); - collection = await db.createCollection('test'); - await collection.insertOne(tweet); - - const total = 10_000; - - for (let i = 0; i < total; i++) { - await collection.findOne(); - } - - const start = performance.now() - totalStart; - for (let i = 0; i < total; i++) { - await collection.findOne(); - } - const end = performance.now() - totalStart; - - console.log( - `end - start = ms time for 10k findOne calls (script boot: ${totalStart.toFixed(3)})` - ); - console.log(`${end.toFixed(3)} - ${start.toFixed(3)} = ${(end - start).toFixed(4)}`); - console.log(`avg findOne: ${((end - start) / total).toFixed(3)} ms`); - - await client.close(); -} - -main().catch(console.error); From f34e4b0eda5ad75a8ec3baa8cd933d0eaa286a09 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 11:43:59 -0500 Subject: [PATCH 09/12] chore: try to fix github syntax highlighting? --- src/cmap/wire_protocol/on_data.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index c9308771670..1a46de246e3 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -2,15 +2,18 @@ import { type EventEmitter } from 'events'; import { List, promiseWithResolvers } from '../../utils'; +type PendingPromises = Omit< + ReturnType>>, + 'promise' +>; + export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) { const signal = options.signal; signal.throwIfAborted(); // Preparing controlling queues and variables const unconsumedEvents = new List(); - const unconsumedPromises = new List< - Omit>>, 'promise'> - >(); + const unconsumedPromises = new List(); let error: Error | null = null; let finished = false; From 161f8823f56b2c2657daa9f215e23f6416010837 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 13:09:50 -0500 Subject: [PATCH 10/12] chore: lint fixes --- src/utils.ts | 5 ++--- test/benchmarks/driverBench/common.js | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/utils.ts b/src/utils.ts index f46ca54a5dd..2c8fd45663f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1333,12 +1333,11 @@ export async function abortable( } export function promiseWithResolvers() { - let resolve: Parameters>[0]>[0]; - let reject: Parameters>[0]>[1]; + let resolve!: Parameters>[0]>[0]; + let reject!: Parameters>[0]>[1]; const promise = new Promise(function withResolversExecutor(promiseResolve, promiseReject) { resolve = promiseResolve; reject = promiseReject; }); - // @ts-expect-error: TS does not know what I know return { promise, resolve, reject } as const; } diff --git a/test/benchmarks/driverBench/common.js b/test/benchmarks/driverBench/common.js index 3f50fc29cbf..b41ab406343 100644 --- a/test/benchmarks/driverBench/common.js +++ b/test/benchmarks/driverBench/common.js @@ -7,7 +7,7 @@ const { pipeline } = require('stream/promises'); const { MongoClient } = require('../../..'); const { GridFSBucket } = require('../../..'); // eslint-disable-next-line no-restricted-modules -const { ModernConnection, Connection } = require('../../../lib/cmap/connection'); +const { ModernConnection } = require('../../../lib/cmap/connection'); // eslint-disable-next-line no-restricted-modules const { MONGODB_ERROR_CODES } = require('../../../lib/error'); From dd856540d6fd99032dac1090b696d38b754c47ac Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 15 Dec 2023 13:25:59 -0500 Subject: [PATCH 11/12] fix: delayedTimeoutId --- src/cmap/connection.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 6f04b9c1cf7..7e256eb6fc2 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1242,6 +1242,11 @@ export class SizedMessageTransform extends Transform { this.connection = connection; } override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void { + if (this.connection.delayedTimeoutId != null) { + clearTimeout(this.connection.delayedTimeoutId); + this.connection.delayedTimeoutId = null; + } + this.bufferPool.append(chunk); const sizeOfMessage = this.bufferPool.getInt32(); From cd7cb9b36d2062da6d95692d7d2894ea920101b6 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 19 Dec 2023 13:24:32 -0500 Subject: [PATCH 12/12] docs: add comments to onData --- src/cmap/wire_protocol/on_data.ts | 39 ++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts index 1a46de246e3..04c82f709d3 100644 --- a/src/cmap/wire_protocol/on_data.ts +++ b/src/cmap/wire_protocol/on_data.ts @@ -2,19 +2,46 @@ import { type EventEmitter } from 'events'; import { List, promiseWithResolvers } from '../../utils'; +/** + * @internal + * An object holding references to a promise's resolve and reject functions. + */ type PendingPromises = Omit< ReturnType>>, 'promise' >; +/** + * onData is adapted from Node.js' events.on helper + * https://nodejs.org/api/events.html#eventsonemitter-eventname-options + * + * Returns an AsyncIterator that iterates each 'data' event emitted from emitter. + * It will reject upon an error event or if the provided signal is aborted. + */ export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) { const signal = options.signal; - signal.throwIfAborted(); - // Preparing controlling queues and variables + // Setup pending events and pending promise lists + /** + * When the caller has not yet called .next(), we store the + * value from the event in this list. Next time they call .next() + * we pull the first value out of this list and resolve a promise with it. + */ const unconsumedEvents = new List(); + /** + * When there has not yet been an event, a new promise will be created + * and implicitly stored in this list. When an event occurs we take the first + * promise in this list and resolve it. + */ const unconsumedPromises = new List(); + + /** + * Stored an error created by an error event. + * This error will turn into a rejection for the subsequent .next() call + */ let error: Error | null = null; + + /** Set to true only after event listeners have been removed. */ let finished = false; const iterator: AsyncGenerator = { @@ -61,7 +88,13 @@ export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) // Adding event handlers emitter.on('data', eventHandler); emitter.on('error', errorHandler); - signal.addEventListener('abort', abortListener, { once: true }); + + if (signal.aborted) { + // If the signal is aborted, set up the first .next() call to be a rejection + queueMicrotask(abortListener); + } else { + signal.addEventListener('abort', abortListener, { once: true }); + } return iterator;