Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,24 @@ Emitted when [`child_process.spawn()`][] encounters an error.

Emitted when [`process.execve()`][] is invoked.

#### Streams

> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->

##### Event: `'stream.web.done'`

* `stream` {ReadableStream} The stream that finished.

Emitted when a [`ReadableStream`][] has finished, either because all data has
been read or because the stream was cancelled. The event fires once per stream,
after the stream has transitioned to the `'closed'` state. It is emitted for
both the async iterator (`for await (...)`) and the direct reader
(`reader.read()`, including BYOB readers) consumption paths.

#### Web Locks

> Stability: 1 - Experimental
Expand Down Expand Up @@ -1917,6 +1935,7 @@ Emitted when a new thread is created.
[TracingChannel Channels]: #tracingchannel-channels
[`'uncaughtException'`]: process.md#event-uncaughtexception
[`BoundedChannel`]: #class-boundedchannel
[`ReadableStream`]: webstreams.md#class-readablestream
[`TracingChannel`]: #class-tracingchannel
[`asyncEnd` event]: #asyncendevent
[`asyncStart` event]: #asyncstartevent
Expand Down
45 changes: 42 additions & 3 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ const { Buffer } = require('buffer');

const assert = require('internal/assert');

let streamDoneChannel;
function getStreamDoneChannel() {
if (streamDoneChannel === undefined) {
streamDoneChannel = require('diagnostics_channel').channel('stream.web.done');
}
return streamDoneChannel;
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this lazy-loading, I had tried to just create the channel immediately. However, this resulted in the following error at build time:

$ make -j24
ninja -C out/Release 
ninja: Entering directory `out/Release'
[21/25] ACTION node: node_mksnapshot_9b7a2d2290b02e76d66661df74749f56
FAILED: gen/node_snapshot.cc 
cd ../../; out/Release/node_mksnapshot out/Release/gen/node_snapshot.cc
global handle not serialized: 0x2c1d47302cd9: [JS_API_OBJECT_TYPE] in OldSpace
 - map: 0x08188aa5fd61 <Map(HOLEY_ELEMENTS)> [FastProperties]
 - prototype: 0x12df1ab07b79 <Object map = 0x8188aa5fd19>
 - elements: 0x20e18df01329 <FixedArray[0]> [HOLEY_ELEMENTS]
 - embedder fields: 1
 - properties: 0x20e18df01329 <FixedArray[0]>
 - All own properties (excluding elements): {}
 - embedder fields = {
    21965, aligned pointer: 0x55cde06a73c0
 }

global handle not serialized: 0x2c1d47302b49: [JS_OBJECT_TYPE] in OldSpace
 - map: 0x0f4b222452b9 <Map(HOLEY_ELEMENTS)> [FastProperties]
 - prototype: 0x2c1d473027a9 <Channel map = 0xf4b22245229>
 - elements: 0x20e18df01329 <FixedArray[0]> [HOLEY_ELEMENTS]
 - properties: 0x20e18df01329 <FixedArray[0]>
 - All own properties (excluding elements): {
    0x218677ef8c49: [String] in OldSpace: #_subscribers: 0x20e18df015b9 <undefined> (const data field 0), location: in-object
    0x20e18df058f1: [String] in ReadOnlySpace: #name: 0x218677ede9d1 <String[15]: #stream.web.done> (const data field 1), location: in-object
 }



#
# Fatal error in , line 0
# Check failed: handle_checker.CheckGlobalAndEternalHandles().
#
#
#
#FailureMessage Object: 0x7ffefc883920
 1: 0x55cddae43c35  [out/Release/node_mksnapshot]
 2: 0x55cddbcc2926 V8_Fatal(char const*, ...) [out/Release/node_mksnapshot]
 3: 0x55cddb396b05 v8::SnapshotCreator::CreateBlob(v8::SnapshotCreator::FunctionCodeHandling) [out/Release/node_mksnapshot]
 4: 0x55cddae6f680 node::SnapshotBuilder::Generate(node::SnapshotData*, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >) [out/Release/node_mksnapshot]
 5: 0x55cddae707f4 node::SnapshotBuilder::Generate(std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >, std::vector<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >) [out/Release/node_mksnapshot]
 6: 0x55cddad822f2 BuildSnapshot(int, char**) [out/Release/node_mksnapshot]
 7: 0x7f4802c9dfd0  [/lib/x86_64-linux-gnu/libc.so.6]
 8: 0x7f4802c9e07d __libc_start_main [/lib/x86_64-linux-gnu/libc.so.6]
 9: 0x55cdda8c9e05 _start [out/Release/node_mksnapshot]
Trace/breakpoint trap (core dumped)
ninja: build stopped: subcommand failed.
make: *** [Makefile:127: node] Error 1

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might be related to the native diagnostics_channel interface that @RafaelGSS added. I suspect there might be some issue because it's trying to snapshot a channel and can't do so because of native bindings. 🤔


const kCancel = Symbol('kCancel');
const kClose = Symbol('kClose');
const kChunk = Symbol('kChunk');
Expand Down Expand Up @@ -463,6 +471,9 @@ class ReadableStream {
validateObject(options, 'options', kValidateObjectAllowObjectsAndNull);
const preventCancel = !!(options?.preventCancel);

const stream = this;
const channel = getStreamDoneChannel();

// eslint-disable-next-line no-use-before-define
const reader = new ReadableStreamDefaultReader(this);

Expand Down Expand Up @@ -490,8 +501,11 @@ class ReadableStream {
}
const promise = PromiseWithResolvers();

// eslint-disable-next-line no-use-before-define
readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise));
readableStreamDefaultReaderRead(
reader,
// eslint-disable-next-line no-use-before-define
new ReadableStreamAsyncIteratorReadRequest(reader, state, promise, stream, channel),
);
return promise.promise;
}

Expand All @@ -509,10 +523,16 @@ class ReadableStream {
const result = readableStreamReaderGenericCancel(reader, value);
readableStreamReaderGenericRelease(reader);
await result;
if (channel.hasSubscribers) {
channel.publish({ stream });
}
return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution
}

readableStreamReaderGenericRelease(reader);
if (channel.hasSubscribers) {
channel.publish({ stream });
}
return { done: true, value }; // eslint-disable-line node-core/avoid-prototype-pollution
}

Expand Down Expand Up @@ -760,10 +780,12 @@ function createReadableStreamBYOBRequest(controller, view) {
}

class ReadableStreamAsyncIteratorReadRequest {
constructor(reader, state, promise) {
constructor(reader, state, promise, stream, channel) {
this.reader = reader;
this.state = state;
this.promise = promise;
this.stream = stream;
this.channel = channel;
}

[kChunk](chunk) {
Expand All @@ -775,6 +797,9 @@ class ReadableStreamAsyncIteratorReadRequest {
this.state.current = undefined;
this.state.done = true;
readableStreamReaderGenericRelease(this.reader);
if (this.channel.hasSubscribers) {
this.channel.publish({ stream: this.stream });
}
this.promise.resolve({ done: true, value: undefined });
}

Expand Down Expand Up @@ -888,6 +913,13 @@ class ReadableStreamDefaultReader {
// Slow path: create request and go through normal flow
const readRequest = new DefaultReadRequest();
readableStreamDefaultReaderRead(this, readRequest);
if (getStreamDoneChannel().hasSubscribers) {
const stream = this[kState].stream;
PromisePrototypeThen(readRequest.promise, ({ done }) => {
if (done)
getStreamDoneChannel().publish({ stream });
});
}
return readRequest.promise;
}

Expand Down Expand Up @@ -1028,6 +1060,13 @@ class ReadableStreamBYOBReader {
}
const readIntoRequest = new ReadIntoRequest();
readableStreamBYOBReaderRead(this, view, min, readIntoRequest);
if (getStreamDoneChannel().hasSubscribers) {
const stream = this[kState].stream;
PromisePrototypeThen(readIntoRequest.promise, ({ done }) => {
if (done)
getStreamDoneChannel().publish({ stream });
});
}
return readIntoRequest.promise;
}

Expand Down
70 changes: 70 additions & 0 deletions test/parallel/test-whatwg-webstreams-dc-events.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Flags: --expose-internals
import * as common from '../common/index.mjs';
import assert from 'assert';

import util from 'internal/webstreams/util';

import { Readable } from 'stream';

import * as dc from 'diagnostics_channel';

{
const readable = Readable.toWeb(Readable.from([1]));

const channel = dc.channel('stream.web.done');
const subscriber = common.mustCall(({ stream }) => {
assert.strictEqual(readable, stream);
assert.strictEqual(readable[util.kState].state, 'closed');
});
channel.subscribe(subscriber);

const reader = readable.getReader();
let result;

while (!result?.done) {
result = await reader.read();
}

channel.unsubscribe(subscriber);
}

{
const readable = Readable.toWeb(Readable.from([1]));

const channel = dc.channel('stream.web.done');
const subscriber = common.mustCall(({ stream }) => {
assert.strictEqual(readable, stream);
assert.strictEqual(readable[util.kState].state, 'closed');
});
channel.subscribe(subscriber);

// eslint-disable-next-line no-unused-vars
for await (const _ of readable) { /* drain */ }

channel.unsubscribe(subscriber);
}

{
const readable = new ReadableStream({
type: 'bytes',
start(controller) {
controller.enqueue(new Uint8Array([1, 2, 3]));
controller.close();
},
});

const channel = dc.channel('stream.web.done');
const subscriber = common.mustCall(({ stream }) => {
assert.strictEqual(readable, stream);
assert.strictEqual(readable[util.kState].state, 'closed');
});
channel.subscribe(subscriber);

const reader = readable.getReader({ mode: 'byob' });
let result;
while (!result?.done) {
result = await reader.read(new Uint8Array(8));
}

channel.unsubscribe(subscriber);
}
Loading