diff --git a/src/cmap/commands.ts b/src/cmap/commands.ts index ee1e7b6a7f0..165e33a2f07 100644 --- a/src/cmap/commands.ts +++ b/src/cmap/commands.ts @@ -317,6 +317,9 @@ export class OpQueryResponse { bsonRegExp?: boolean; index?: number; + /** moreToCome is an OP_MSG only concept */ + moreToCome = false; + constructor( message: Buffer, msgHeader: MessageHeader, @@ -598,6 +601,7 @@ export class OpMsgResponse { fromCompressed?: boolean; responseFlags: number; checksumPresent: boolean; + /** Indicates the server will be sending more responses on this connection */ moreToCome: boolean; exhaustAllowed: boolean; useBigInt64: boolean; diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 6168d38ec04..8bd195ee1c5 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -562,6 +562,15 @@ export class Connection extends TypedEventEmitter { callback(err); } } + + exhaustCommand( + ns: MongoDBNamespace, + command: Document, + options: CommandOptions | undefined, + replyListener: Callback + ) { + return this.command(ns, command, options, replyListener); + } } /** @internal */ @@ -1156,6 +1165,15 @@ export class ModernConnection extends TypedEventEmitter { return document; } + + exhaustCommand( + _ns: MongoDBNamespace, + _command: Document, + _options: CommandOptions, + _replyListener: Callback + ) { + throw new Error('NODE-5742: not implemented.'); + } } const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; @@ -1253,7 +1271,7 @@ export async function* readMany( const response = await decompressResponse(message); yield response; - if (!('moreToCome' in response) || !response.moreToCome) { + if (!response.moreToCome) { return; } } diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 1cc7cfad7f1..137eae78e54 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -23,8 +23,6 @@ const kServer = Symbol('server'); /** @internal */ const kMonitorId = Symbol('monitorId'); /** @internal */ -const kConnection = Symbol('connection'); -/** @internal */ const kCancellationToken = Symbol('cancellationToken'); /** @internal */ const kRoundTripTime = Symbol('roundTripTime'); @@ -94,21 +92,17 @@ export class Monitor extends TypedEventEmitter { connectOptions: ConnectionOptions; isRunningInFaasEnv: boolean; [kServer]: Server; - [kConnection]?: Connection; + connection: Connection | null; [kCancellationToken]: CancellationToken; /** @internal */ [kMonitorId]?: MonitorInterval; rttPinger?: RTTPinger; - get connection(): Connection | undefined { - return this[kConnection]; - } - constructor(server: Server, options: MonitorOptions) { super(); this[kServer] = server; - this[kConnection] = undefined; + this.connection = null; this[kCancellationToken] = new CancellationToken(); this[kCancellationToken].setMaxListeners(Infinity); this[kMonitorId] = undefined; @@ -219,8 +213,8 @@ function resetMonitorState(monitor: Monitor) { monitor[kCancellationToken].emit('cancel'); - monitor[kConnection]?.destroy({ force: true }); - monitor[kConnection] = undefined; + monitor.connection?.destroy({ force: true }); + monitor.connection = null; } function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean { @@ -241,6 +235,7 @@ function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion function checkServer(monitor: Monitor, callback: Callback) { let start = now(); + let awaited: boolean; const topologyVersion = monitor[kServer].description.topologyVersion; const isAwaitable = useStreamingProtocol(monitor, topologyVersion); monitor.emit( @@ -248,9 +243,9 @@ function checkServer(monitor: Monitor, callback: Callback) { new ServerHeartbeatStartedEvent(monitor.address, isAwaitable) ); - function failureHandler(err: Error, awaited: boolean) { - monitor[kConnection]?.destroy({ force: true }); - monitor[kConnection] = undefined; + function onHeartbeatFailed(err: Error) { + monitor.connection?.destroy({ force: true }); + monitor.connection = null; monitor.emit( Server.SERVER_HEARTBEAT_FAILED, @@ -269,7 +264,39 @@ function checkServer(monitor: Monitor, callback: Callback) { callback(err); } - const connection = monitor[kConnection]; + function onHeartbeatSucceeded(hello: Document) { + if (!('isWritablePrimary' in hello)) { + // Provide hello-style response document. + hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; + } + + const duration = + isAwaitable && monitor.rttPinger + ? monitor.rttPinger.roundTripTime + : calculateDurationInMs(start); + + monitor.emit( + Server.SERVER_HEARTBEAT_SUCCEEDED, + new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable) + ); + + // If we are using the streaming protocol then we immediately issue another 'started' + // event, otherwise the "check" is complete and return to the main monitor loop. + if (isAwaitable) { + monitor.emit( + Server.SERVER_HEARTBEAT_STARTED, + new ServerHeartbeatStartedEvent(monitor.address, true) + ); + start = now(); + } else { + monitor.rttPinger?.close(); + monitor.rttPinger = undefined; + + callback(undefined, hello); + } + } + + const { connection } = monitor; if (connection && !connection.closed) { const { serverApi, helloOk } = connection; const connectTimeoutMS = monitor.options.connectTimeoutMS; @@ -299,41 +326,18 @@ function checkServer(monitor: Monitor, callback: Callback) { ); } - connection.command(ns('admin.$cmd'), cmd, options, (err, hello) => { - if (err) { - return failureHandler(err, isAwaitable); - } - - if (!('isWritablePrimary' in hello)) { - // Provide hello-style response document. - hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND]; - } - - const duration = - isAwaitable && monitor.rttPinger - ? monitor.rttPinger.roundTripTime - : calculateDurationInMs(start); - - monitor.emit( - Server.SERVER_HEARTBEAT_SUCCEEDED, - new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable) - ); + if (isAwaitable) { + awaited = true; + return connection.exhaustCommand(ns('admin.$cmd'), cmd, options, (error, hello) => { + if (error) return onHeartbeatFailed(error); + return onHeartbeatSucceeded(hello); + }); + } - // If we are using the streaming protocol then we immediately issue another 'started' - // event, otherwise the "check" is complete and return to the main monitor loop. - if (isAwaitable) { - monitor.emit( - Server.SERVER_HEARTBEAT_STARTED, - new ServerHeartbeatStartedEvent(monitor.address, true) - ); - start = now(); - } else { - monitor.rttPinger?.close(); - monitor.rttPinger = undefined; - - callback(undefined, hello); - } - }); + awaited = false; + connection + .commandAsync(ns('admin.$cmd'), cmd, options) + .then(onHeartbeatSucceeded, onHeartbeatFailed); return; } @@ -341,9 +345,10 @@ function checkServer(monitor: Monitor, callback: Callback) { // connecting does an implicit `hello` connect(monitor.connectOptions, (err, conn) => { if (err) { - monitor[kConnection] = undefined; + monitor.connection = null; - failureHandler(err, false); + awaited = false; + onHeartbeatFailed(err); return; } @@ -357,7 +362,7 @@ function checkServer(monitor: Monitor, callback: Callback) { return; } - monitor[kConnection] = conn; + monitor.connection = conn; monitor.emit( Server.SERVER_HEARTBEAT_SUCCEEDED, new ServerHeartbeatSucceededEvent(