Skip to content

Commit 70dde84

Browse files
committed
Add detailed body-data tests and improve error handling accordingly
1 parent c617090 commit 70dde84

File tree

4 files changed

+451
-83
lines changed

4 files changed

+451
-83
lines changed

src/util/request-utils.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -676,24 +676,26 @@ function emitBodyDataEvents(
676676
let timestamp: number | undefined = undefined;
677677
let pendingContent: Uint8Array | undefined = undefined;
678678
let pendingContentTimer: NodeJS.Timeout | undefined = undefined;
679-
let endEmitted = false;
679+
let finished = false;
680680

681681
function flushPendingContent() {
682-
if (endEmitted) return; // Should never happen, but just in case
682+
if (finished) return; // Should never happen, but just in case
683683

684-
endEmitted = 'writableEnded' in message
684+
const hasEnded = 'writableEnded' in message
685685
? message.writableEnded
686686
: message.readableEnded;
687687

688+
finished = hasEnded || !!message.errored || message.destroyed;
689+
688690
callback(
689691
message.id,
690-
// We use the exact end timestamp where possible, but the first 'data'
692+
// We use the exact final timestamp where possible, but the first 'data'
691693
// event timestamp for every preceeding chunk.
692-
endEmitted
694+
finished
693695
? now()
694696
: (timestamp || now()),
695697
pendingContent || Buffer.alloc(0),
696-
endEmitted
698+
hasEnded
697699
);
698700

699701
timestamp = undefined;
@@ -724,6 +726,7 @@ function emitBodyDataEvents(
724726
}
725727
});
726728

729+
bodyStream.on('error', flushPendingContent);
727730
bodyStream.on('end', flushPendingContent);
728731
}
729732

test/integration/subscriptions/request-events.spec.ts

Lines changed: 233 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import * as _ from 'lodash';
2+
import { PassThrough } from 'stream';
23
import * as http from 'http';
34

45
import {
@@ -7,15 +8,18 @@ import {
78
getRemote,
89
InitiatedRequest,
910
CompletedRequest,
10-
TimingEvents
11+
TimingEvents,
12+
BodyData,
13+
AbortedRequest
1114
} from "../../..";
1215
import {
1316
expect,
1417
fetch,
1518
nodeOnly,
1619
getDeferred,
1720
sendRawRequest,
18-
defaultNodeConnectionHeader
21+
defaultNodeConnectionHeader,
22+
delay
1923
} from "../../test-utils";
2024

2125
// Headers we ignore when checking the received values, because they can vary depending
@@ -429,3 +433,230 @@ describe("Request subscriptions", () => {
429433
});
430434
});
431435
});
436+
437+
describe("Request body data subscriptions", () => {
438+
439+
const server = getLocal();
440+
441+
beforeEach(() => server.start());
442+
afterEach(() => server.stop());
443+
444+
it("should fire a single ended chunk for small non-streamed bodies", async () => {
445+
const dataEvents: BodyData[] = [];
446+
await server.on('request-body-data', (event) => dataEvents.push(event));
447+
448+
await server.forPost('/mocked-endpoint').thenReply(200, "hello world");
449+
450+
await fetch(server.urlFor("/mocked-endpoint"), {
451+
method: 'POST',
452+
body: 'small POST body'
453+
});
454+
await delay(5); // Delay for events to be received
455+
456+
expect(dataEvents).to.have.length(1);
457+
expect(dataEvents[0].content.toString()).to.equal('small POST body');
458+
expect(dataEvents[0].isEnded).to.equal(true);
459+
expect(dataEvents[0].eventTimestamp).to.be.a('number');
460+
expect(dataEvents[0].id).to.be.a('string');
461+
});
462+
463+
it("should fire immediate-empty ended chunks for empty bodies", async () => {
464+
const dataEvents: BodyData[] = [];
465+
await server.on('request-body-data', (event) => dataEvents.push(event));
466+
467+
await server.forPost('/mocked-endpoint').thenReply(200, "hello world");
468+
469+
await fetch(server.urlFor("/mocked-endpoint"), {
470+
method: 'POST'
471+
// No body
472+
});
473+
await delay(5); // Delay for events to be received
474+
475+
expect(dataEvents).to.have.length(1);
476+
expect(dataEvents[0].content.byteLength).to.equal(0);
477+
expect(dataEvents[0].isEnded).to.equal(true);
478+
expect(dataEvents[0].eventTimestamp).to.be.a('number');
479+
expect(dataEvents[0].id).to.be.a('string');
480+
});
481+
482+
nodeOnly(() => {
483+
484+
// Mildly difficult to do streaming well in browsers, and the above covers the basic
485+
// functionality, so we just test this node only:
486+
487+
it("should stream body chunks as they are received", async () => {
488+
const dataEvents: BodyData[] = [];
489+
await server.on('request-body-data', (event) => dataEvents.push(event));
490+
491+
await server.forAnyRequest().waitForRequestBody().thenReply(200);
492+
493+
const req = http.request(server.url, {
494+
method: 'POST',
495+
});
496+
req.flushHeaders();
497+
498+
await delay(20);
499+
expect(dataEvents.length).to.equal(0);
500+
501+
req.write('hello');
502+
await delay(25);
503+
expect(dataEvents.length).to.equal(1);
504+
expect(dataEvents[0].content.toString()).to.equal('hello');
505+
expect(dataEvents[0].isEnded).to.equal(false);
506+
expect(dataEvents[0].eventTimestamp).to.be.a('number');
507+
expect(dataEvents[0].id).to.be.a('string');
508+
509+
req.write('world');
510+
await delay(25);
511+
expect(dataEvents.length).to.equal(2);
512+
expect(dataEvents[1].content.toString()).to.equal('world');
513+
expect(dataEvents[1].isEnded).to.equal(false);
514+
expect(dataEvents[1].eventTimestamp).to.be.greaterThan(dataEvents[0].eventTimestamp);
515+
expect(dataEvents[1].id).to.be.a('string');
516+
517+
req.end();
518+
await delay(25);
519+
expect(dataEvents.length).to.equal(3);
520+
expect(dataEvents[2].content.byteLength).to.equal(0);
521+
expect(dataEvents[2].isEnded).to.equal(true);
522+
expect(dataEvents[2].eventTimestamp).to.be.greaterThan(dataEvents[1].eventTimestamp);
523+
expect(dataEvents[2].id).to.be.a('string');
524+
});
525+
526+
it("should batch streamed body chunks", async () => {
527+
const dataEvents: BodyData[] = [];
528+
await server.on('request-body-data', (event) => dataEvents.push(event));
529+
530+
await server.forAnyRequest().waitForRequestBody().thenReply(200);
531+
532+
const req = http.request(server.url, {
533+
method: 'POST',
534+
});
535+
req.flushHeaders();
536+
537+
req.write('hello');
538+
await delay(5);
539+
req.write('world');
540+
await delay(25);
541+
expect(dataEvents.length).to.equal(1);
542+
expect(dataEvents[0].content.toString()).to.equal('helloworld');
543+
expect(dataEvents[0].isEnded).to.equal(false);
544+
expect(dataEvents[0].eventTimestamp).to.be.a('number');
545+
expect(dataEvents[0].id).to.be.a('string');
546+
547+
req.end();
548+
await delay(25);
549+
expect(dataEvents).to.have.length(2);
550+
expect(dataEvents[1].content.byteLength).to.equal(0);
551+
expect(dataEvents[1].isEnded).to.equal(true);
552+
expect(dataEvents[1].eventTimestamp).to.be.greaterThan(dataEvents[0].eventTimestamp);
553+
expect(dataEvents[1].id).to.equal(dataEvents[0].id);
554+
});
555+
556+
it("should batch streamed body chunks but emit immediately on end", async () => {
557+
const dataEvents: BodyData[] = [];
558+
await server.on('request-body-data', (event) => dataEvents.push(event));
559+
560+
await server.forAnyRequest().waitForRequestBody().thenReply(200);
561+
562+
const req = http.request(server.url, {
563+
method: 'POST',
564+
});
565+
req.flushHeaders();
566+
567+
await delay(25);
568+
expect(dataEvents).to.deep.equal([]);
569+
570+
req.write('hello');
571+
await delay(5);
572+
expect(dataEvents).to.have.length(0);
573+
req.end('world');
574+
await delay(5);
575+
576+
expect(dataEvents).to.have.length(1);
577+
expect(dataEvents[0].content).to.deep.equal(Buffer.from('helloworld'));
578+
expect(dataEvents[0].isEnded).to.equal(true);
579+
expect(dataEvents[0].eventTimestamp).to.be.a('number');
580+
expect(dataEvents[0].id).to.be.a('string');
581+
});
582+
583+
it("should just stop (without ended) if aborted server-side while still streaming", async () => {
584+
const dataEvents: BodyData[] = [];
585+
await server.on('request-body-data', (event) => dataEvents.push(event));
586+
587+
let requestEvent: CompletedRequest | undefined;
588+
await server.on('request', (r) => { requestEvent = r });
589+
590+
const abortEvent = getDeferred<AbortedRequest>();
591+
await server.on('abort', (r) => abortEvent.resolve(r));
592+
593+
const stream = new PassThrough();
594+
await server.forAnyRequest().thenStream(200, stream);
595+
596+
const req = http.request(server.url, {
597+
method: 'POST',
598+
});
599+
req.flushHeaders();
600+
601+
await delay(25);
602+
expect(dataEvents).to.deep.equal([]);
603+
604+
req.write('hello'); // Start writing request stream
605+
await delay(1);
606+
stream.destroy(new Error('OH NO')); // Kill server response stream
607+
608+
const abort = await abortEvent; // Abort event does fire
609+
expect(abort.error?.code).to.equal('STREAM_RULE_ERROR'); // Server error
610+
611+
await delay(25);
612+
expect(dataEvents).to.have.length(1);
613+
expect(dataEvents[0].content.toString()).to.deep.equal('hello');
614+
expect(dataEvents[0].isEnded).to.equal(false); // Not ended
615+
expect(dataEvents[0].eventTimestamp).to.be.a('number');
616+
expect(dataEvents[0].id).to.be.a('string');
617+
618+
expect(requestEvent).to.equal(undefined); // No request event fired
619+
});
620+
621+
it("should just stop (without ended) if aborted client-side mid-stream", async () => {
622+
const dataEvents: BodyData[] = [];
623+
await server.on('request-body-data', (event) => dataEvents.push(event));
624+
625+
let requestEvent: CompletedRequest | undefined;
626+
await server.on('request', (r) => { requestEvent = r });
627+
628+
const abortEvent = getDeferred<AbortedRequest>();
629+
await server.on('abort', (r) => abortEvent.resolve(r));
630+
631+
await server.forAnyRequest().waitForRequestBody().thenReply(200);
632+
633+
const req = http.request(server.url, {
634+
method: 'POST',
635+
});
636+
req.flushHeaders();
637+
req.on('error', () => {});
638+
639+
await delay(25);
640+
expect(dataEvents).to.deep.equal([]);
641+
642+
req.write('hello');
643+
await delay(5);
644+
req.destroy();
645+
await delay(25);
646+
647+
expect(dataEvents).to.have.length(1);
648+
expect(dataEvents[0].content.toString()).to.equal('hello');
649+
expect(dataEvents[0].isEnded).to.equal(false);
650+
expect(dataEvents[0].eventTimestamp).to.be.a('number');
651+
expect(dataEvents[0].id).to.be.a('string');
652+
653+
await delay(25);
654+
expect(dataEvents).to.have.length(1); // No end event
655+
expect(requestEvent).to.equal(undefined); // No response event
656+
657+
const abort = await abortEvent; // Abort even _is_ fired however.
658+
expect(abort.error).to.equal(undefined); // Client close - not server error
659+
});
660+
});
661+
});
662+

0 commit comments

Comments
 (0)