File tree Expand file tree Collapse file tree 2 files changed +51
-3
lines changed Expand file tree Collapse file tree 2 files changed +51
-3
lines changed Original file line number Diff line number Diff line change @@ -310,8 +310,7 @@ function chunkInvalid(state, chunk) {
310
310
// 'readable' event will be triggered.
311
311
function needMoreData ( state ) {
312
312
return ! state . ended &&
313
- ( state . needReadable ||
314
- state . length < state . highWaterMark ||
313
+ ( state . length < state . highWaterMark ||
315
314
state . length === 0 ) ;
316
315
}
317
316
@@ -536,7 +535,17 @@ function emitReadable_(stream) {
536
535
if ( ! state . destroyed && ( state . length || state . ended ) ) {
537
536
stream . emit ( 'readable' ) ;
538
537
}
539
- state . needReadable = ! state . flowing && ! state . ended ;
538
+
539
+ // The stream needs another readable event if
540
+ // 1. It is not flowing, as the flow mechanism will take
541
+ // care of it.
542
+ // 2. It is not ended.
543
+ // 3. It is below the highWaterMark, so we can schedule
544
+ // another readable later.
545
+ state . needReadable =
546
+ ! state . flowing &&
547
+ ! state . ended &&
548
+ state . length <= state . highWaterMark ;
540
549
flow ( stream ) ;
541
550
}
542
551
Original file line number Diff line number Diff line change
1
+ 'use strict' ;
2
+
3
+ const common = require ( '../common' ) ;
4
+ const assert = require ( 'assert' ) ;
5
+ const stream = require ( 'stream' ) ;
6
+
7
+ let pushes = 0 ;
8
+ const total = 65500 + 40 * 1024 ;
9
+ const rs = new stream . Readable ( {
10
+ read : common . mustCall ( function ( ) {
11
+ if ( pushes ++ === 10 ) {
12
+ this . push ( null ) ;
13
+ return ;
14
+ }
15
+
16
+ const length = this . _readableState . length ;
17
+
18
+ // We are at most doing two full runs of _reads
19
+ // before stopping, because Readable is greedy
20
+ // to keep its buffer full
21
+ assert ( length <= total ) ;
22
+
23
+ this . push ( Buffer . alloc ( 65500 ) ) ;
24
+ for ( let i = 0 ; i < 40 ; i ++ ) {
25
+ this . push ( Buffer . alloc ( 1024 ) ) ;
26
+ }
27
+
28
+ // We will be over highWaterMark at this point
29
+ // but a new call to _read is scheduled anyway.
30
+ } , 11 )
31
+ } ) ;
32
+
33
+ const ws = stream . Writable ( {
34
+ write : common . mustCall ( function ( data , enc , cb ) {
35
+ setImmediate ( cb ) ;
36
+ } , 41 * 10 )
37
+ } ) ;
38
+
39
+ rs . pipe ( ws ) ;
You can’t perform that action at this time.
0 commit comments