Skip to content

Commit affbd1f

Browse files
chore: rework connection and add tests
1 parent 9208eba commit affbd1f

File tree

2 files changed

+313
-127
lines changed

2 files changed

+313
-127
lines changed

src/cmap/connection.ts

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -292,16 +292,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
292292
this[kLastUseTime] = now();
293293
}
294294

295-
/**
296-
*
297-
* @param options
298-
*/
299-
_cleanup(options: { force: boolean; errorFactory?: () => Error; callback?: Callback }) {
300-
const { callback, errorFactory, force } = options;
295+
_cleanup(options: { force: boolean; errorFactory?: () => Error }) {
296+
const { errorFactory, force } = options;
301297
if (this.closed) {
302-
if (callback) {
303-
process.nextTick(callback);
304-
}
305298
return;
306299
}
307300

@@ -315,10 +308,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
315308
this[kQueue].clear();
316309

317310
this.emit(Connection.CLOSE);
318-
319-
if (callback) {
320-
process.nextTick(callback);
321-
}
322311
};
323312

324313
this.removeAllListeners(Connection.PINNED);
@@ -335,7 +324,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
335324
}
336325

337326
if (!this[kStream].writableEnded) {
338-
this[kStream].end(completeCleanup);
327+
this[kStream].end(() => {
328+
this[kStream].destroy();
329+
completeCleanup();
330+
});
339331
} else {
340332
completeCleanup();
341333
}
@@ -349,10 +341,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
349341
}
350342

351343
onClose() {
352-
if (this.closed) {
353-
return;
354-
}
355-
356344
const message = `connection ${this.id} to ${this.address} closed`;
357345
this._cleanup({ force: false, errorFactory: () => new MongoNetworkError(message) });
358346
}
@@ -389,7 +377,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
389377

390378
// First check if the map is of invalid size
391379
if (this[kQueue].size > 1) {
392-
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
380+
this._cleanup({
381+
force: true,
382+
errorFactory: () => new MongoRuntimeError(INVALID_QUEUE_SIZE)
383+
});
393384
} else {
394385
// Get the first orphaned operation description.
395386
const entry = this[kQueue].entries().next();
@@ -469,7 +460,13 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
469460
}
470461

471462
destroy(options: DestroyOptions, callback?: Callback): void {
472-
this._cleanup({ ...options, callback });
463+
if (this.closed) {
464+
process.nextTick(() => callback?.());
465+
return;
466+
}
467+
this.once('close', () => process.nextTick(() => callback?.()));
468+
const message = `connection ${this.id} to ${this.address} closed`;
469+
this._cleanup({ ...options, errorFactory: () => new MongoNetworkError(message) });
473470
}
474471

475472
command(

0 commit comments

Comments
 (0)