Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 55 additions & 60 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
address: string;
socketTimeoutMS: number;
monitorCommands: boolean;
/** Indicates that the connection (including underlying TCP socket) has been closed. */
closed: boolean;
destroyed: boolean;
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
Expand Down Expand Up @@ -204,7 +204,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.monitorCommands = options.monitorCommands;
this.serverApi = options.serverApi;
this.closed = false;
this.destroyed = false;
this[kHello] = null;
this[kClusterTime] = null;

Expand Down Expand Up @@ -294,56 +293,19 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

onError(error: Error) {
if (this.closed) {
return;
}

this[kStream].destroy(error);

this.closed = true;

for (const op of this[kQueue].values()) {
op.cb(error);
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
this.cleanup(true, error);
}

onClose() {
if (this.closed) {
return;
}

this.closed = true;

const message = `connection ${this.id} to ${this.address} closed`;
for (const op of this[kQueue].values()) {
op.cb(new MongoNetworkError(message));
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
this.cleanup(true, new MongoNetworkError(message));
}

onTimeout() {
if (this.closed) {
return;
}

this[kDelayedTimeoutId] = setTimeout(() => {
this[kStream].destroy();

this.closed = true;

const message = `connection ${this.id} to ${this.address} timed out`;
const beforeHandshake = this.hello == null;
for (const op of this[kQueue].values()) {
op.cb(new MongoNetworkTimeoutError(message, { beforeHandshake }));
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
this.cleanup(true, new MongoNetworkTimeoutError(message, { beforeHandshake }));
}, 1).unref(); // No need for this timer to hold the event loop open
}

Expand All @@ -364,7 +326,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

// First check if the map is of invalid size
if (this[kQueue].size > 1) {
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
this.cleanup(true, new MongoRuntimeError(INVALID_QUEUE_SIZE));
} else {
// Get the first orphaned operation description.
const entry = this[kQueue].entries().next();
Expand Down Expand Up @@ -444,34 +406,67 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

destroy(options: DestroyOptions, callback?: Callback): void {
if (this.closed) {
process.nextTick(() => callback?.());
return;
}
if (typeof callback === 'function') {
this.once('close', () => process.nextTick(() => callback()));
}

// load balanced mode requires that these listeners remain on the connection
// after cleanup on timeouts, errors or close so we remove them before calling
// cleanup.
this.removeAllListeners(Connection.PINNED);
this.removeAllListeners(Connection.UNPINNED);
const message = `connection ${this.id} to ${this.address} closed`;
this.cleanup(options.force, new MongoNetworkError(message));
}

if (this[kStream] == null || this.destroyed) {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}

/**
* A method that cleans up the connection. When `force` is true, this method
* forcibly destroys the socket.
*
* If an error is provided, any in-flight operations will be closed with the error.
*
* This method does nothing if the connection is already closed.
*/
private cleanup(force: boolean, error?: Error): void {
if (this.closed) {
return;
}

if (options.force) {
this[kStream].destroy();
this.destroyed = true;
if (typeof callback === 'function') {
callback();
this.closed = true;

const completeCleanup = () => {
for (const op of this[kQueue].values()) {
op.cb(error);
}

this[kQueue].clear();

this.emit(Connection.CLOSE);
};

this[kStream].removeAllListeners();
this[kMessageStream].removeAllListeners();

this[kMessageStream].destroy();

if (force) {
this[kStream].destroy();
completeCleanup();
return;
}

this[kStream].end(() => {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}
});
if (!this[kStream].writableEnded) {
this[kStream].end(() => {
this[kStream].destroy();
completeCleanup();
});
} else {
completeCleanup();
}
}

command(
Expand Down
Loading