diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md index b47f98ce64211c..919e3446c47236 100644 --- a/doc/api/diagnostics_channel.md +++ b/doc/api/diagnostics_channel.md @@ -1853,6 +1853,24 @@ Emitted when [`child_process.spawn()`][] encounters an error. Emitted when [`process.execve()`][] is invoked. +#### Streams + +> Stability: 1 - Experimental + + + +##### Event: `'stream.web.done'` + +* `stream` {ReadableStream} The stream that finished. + +Emitted when a [`ReadableStream`][] has finished, either because all data has +been read or because the stream was cancelled. The event fires once per stream, +after the stream has transitioned to the `'closed'` state. It is emitted for +both the async iterator (`for await (...)`) and the direct reader +(`reader.read()`, including BYOB readers) consumption paths. + #### Web Locks > Stability: 1 - Experimental @@ -1917,6 +1935,7 @@ Emitted when a new thread is created. [TracingChannel Channels]: #tracingchannel-channels [`'uncaughtException'`]: process.md#event-uncaughtexception [`BoundedChannel`]: #class-boundedchannel +[`ReadableStream`]: webstreams.md#class-readablestream [`TracingChannel`]: #class-tracingchannel [`asyncEnd` event]: #asyncendevent [`asyncStart` event]: #asyncstartevent diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 597135778a1e0f..be39117d04eb4b 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -136,6 +136,8 @@ const { Buffer } = require('buffer'); const assert = require('internal/assert'); +const streamDoneChannel = require('diagnostics_channel').channel('stream.web.done'); + const kCancel = Symbol('kCancel'); const kClose = Symbol('kClose'); const kChunk = Symbol('kChunk'); @@ -463,6 +465,8 @@ class ReadableStream { validateObject(options, 'options', kValidateObjectAllowObjectsAndNull); const preventCancel = !!(options?.preventCancel); + const stream = this; + // eslint-disable-next-line no-use-before-define const reader = new ReadableStreamDefaultReader(this); @@ -490,8 +494,11 @@ class ReadableStream { } const promise = PromiseWithResolvers(); - // eslint-disable-next-line no-use-before-define - readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise)); + readableStreamDefaultReaderRead( + reader, + // eslint-disable-next-line no-use-before-define + new ReadableStreamAsyncIteratorReadRequest(reader, state, promise, stream), + ); return promise.promise; } @@ -509,10 +516,16 @@ class ReadableStream { const result = readableStreamReaderGenericCancel(reader, value); readableStreamReaderGenericRelease(reader); await result; + if (streamDoneChannel.hasSubscribers) { + streamDoneChannel.publish({ stream }); + } return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution } readableStreamReaderGenericRelease(reader); + if (streamDoneChannel.hasSubscribers) { + streamDoneChannel.publish({ stream }); + } return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution } @@ -760,10 +773,11 @@ function createReadableStreamBYOBRequest(controller, view) { } class ReadableStreamAsyncIteratorReadRequest { - constructor(reader, state, promise) { + constructor(reader, state, promise, stream) { this.reader = reader; this.state = state; this.promise = promise; + this.stream = stream; } [kChunk](chunk) { @@ -775,6 +789,9 @@ class ReadableStreamAsyncIteratorReadRequest { this.state.current = undefined; this.state.done = true; readableStreamReaderGenericRelease(this.reader); + if (streamDoneChannel.hasSubscribers) { + streamDoneChannel.publish({ stream: this.stream }); + } this.promise.resolve({ done: true, value: undefined }); } @@ -888,6 +905,12 @@ class ReadableStreamDefaultReader { // Slow path: create request and go through normal flow const readRequest = new DefaultReadRequest(); readableStreamDefaultReaderRead(this, readRequest); + if (streamDoneChannel.hasSubscribers) { + PromisePrototypeThen(readRequest.promise, ({ done }) => { + if (done) + streamDoneChannel.publish({ stream }); + }); + } return readRequest.promise; } @@ -1028,6 +1051,13 @@ class ReadableStreamBYOBReader { } const readIntoRequest = new ReadIntoRequest(); readableStreamBYOBReaderRead(this, view, min, readIntoRequest); + if (streamDoneChannel.hasSubscribers) { + const stream = this[kState].stream; + PromisePrototypeThen(readIntoRequest.promise, ({ done }) => { + if (done) + streamDoneChannel.publish({ stream }); + }); + } return readIntoRequest.promise; } diff --git a/test/parallel/test-whatwg-webstreams-dc-events.mjs b/test/parallel/test-whatwg-webstreams-dc-events.mjs new file mode 100644 index 00000000000000..908e16c75d879f --- /dev/null +++ b/test/parallel/test-whatwg-webstreams-dc-events.mjs @@ -0,0 +1,70 @@ +// Flags: --expose-internals +import * as common from '../common/index.mjs'; +import assert from 'assert'; + +import util from 'internal/webstreams/util'; + +import { Readable } from 'stream'; + +import * as dc from 'diagnostics_channel'; + +{ + const readable = Readable.toWeb(Readable.from([1])); + + const channel = dc.channel('stream.web.done'); + const subscriber = common.mustCall(({ stream }) => { + assert.strictEqual(readable, stream); + assert.strictEqual(readable[util.kState].state, 'closed'); + }); + channel.subscribe(subscriber); + + const reader = readable.getReader(); + let result; + + while (!result?.done) { + result = await reader.read(); + } + + channel.unsubscribe(subscriber); +} + +{ + const readable = Readable.toWeb(Readable.from([1])); + + const channel = dc.channel('stream.web.done'); + const subscriber = common.mustCall(({ stream }) => { + assert.strictEqual(readable, stream); + assert.strictEqual(readable[util.kState].state, 'closed'); + }); + channel.subscribe(subscriber); + + // eslint-disable-next-line no-unused-vars + for await (const _ of readable) { /* drain */ } + + channel.unsubscribe(subscriber); +} + +{ + const readable = new ReadableStream({ + type: 'bytes', + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + controller.close(); + }, + }); + + const channel = dc.channel('stream.web.done'); + const subscriber = common.mustCall(({ stream }) => { + assert.strictEqual(readable, stream); + assert.strictEqual(readable[util.kState].state, 'closed'); + }); + channel.subscribe(subscriber); + + const reader = readable.getReader({ mode: 'byob' }); + let result; + while (!result?.done) { + result = await reader.read(new Uint8Array(8)); + } + + channel.unsubscribe(subscriber); +}