Skip to content

Commit 67d7c74

Browse files
committed
Response Streaming in JS
Add streaming HTTP response handling to gremlin-javascript, enabling incremental result consumption via fetch response.body ReadableStream. - Add StreamReader abstraction for async byte reading over streaming and buffered sources - Refactor all ~20 GraphBinary serializers from sync deserialize(buffer) to async deserialize(reader) - Refactor GraphBinaryReader.readResponse() to use StreamReader - Add Connection.stream() using fetch response.body ReadableStream - Add Client.stream() returning AsyncGenerator - Wire Traversal API (next(), hasNext(), toList()) to streaming for incremental consumption, matching Go GLV behavior - submit() remains non-streaming, buffers full response - Remove dead readable-stream dependency and Readable imports
1 parent b08a6f0 commit 67d7c74

44 files changed

Lines changed: 2452 additions & 1963 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGELOG.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
2525
[[release-4-0-0]]
2626
=== TinkerPop 4.0.0 (Release Date: NOT OFFICIALLY RELEASED YET)
2727
28+
* Connected HTTP streaming response deserialization to the traversal API in `gremlin-javascript`, enabling `next()` to return the first result without waiting for the full response.
29+
* Changed `Client.stream()` in `gremlin-javascript` to return an `AsyncGenerator` for direct incremental consumption.
30+
* Removed `readable-stream` dependency from `gremlin-javascript`.
31+
2832
[[release-4-0-0-beta-2]]
2933
=== TinkerPop 4.0.0-beta.2 (April 1, 2026)
3034

docs/src/upgrade/release-4.x.x.asciidoc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,41 @@ image::gremlins-wildest-dreams.png[width=185]
3030
Please see the link:https://github.com/apache/tinkerpop/blob/4.0.0/CHANGELOG.asciidoc#release-4-0-0[changelog] for a
3131
complete list of all the modifications that are part of this release.
3232
33+
=== Upgrading for Users
34+
35+
==== JS HTTP Streaming Response Support
36+
37+
The JavaScript driver now supports incremental HTTP streaming. Results are deserialized from the server response as
38+
they arrive, rather than buffering the entire response before processing.
39+
40+
Traversal API terminal steps (`next()`, `toList()`, `hasNext()`) are now truly incremental. `next()`
41+
returns the first result as soon as it is deserialized from the wire, without waiting for the full response.
42+
In 3.x, `next()` waited for all WebSocket frames before returning.
43+
44+
`Client.stream()` now returns an `AsyncGenerator` for direct incremental consumption. This is a breaking change
45+
from 3.x where `stream()` returned a Node.js `Readable`. The new return type works in both Node.js and browsers:
46+
47+
[source,javascript]
48+
----
49+
// 3.x — Readable stream (no longer supported)
50+
// const stream = client.stream('g.V()');
51+
// stream.on('data', (resultSet) => { ... });
52+
53+
// 4.0 — AsyncGenerator
54+
for await (const item of client.stream('g.V()', null)) {
55+
console.log(item);
56+
if (someCondition) break; // stops reading from the HTTP stream
57+
}
58+
----
59+
60+
`Client.submit()` remains unchanged. It still buffers the full response and returns `Promise<ResultSet>`.
61+
62+
=== Upgrading for Providers
63+
64+
==== Graph System Providers
65+
66+
==== Graph Driver Providers
67+
3368
== TinkerPop 4.0.0-beta.2
3469
3570
*Release Date: April 1, 2026*

gremlin-js/gremlin-javascript/lib/driver/client.ts

Lines changed: 38 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919

2020
import Connection, { ConnectionOptions } from './connection.js';
21-
import { Readable } from 'stream';
2221
import {RequestMessage} from "./request-message.js";
2322

2423
export type RequestOptions = {
@@ -70,67 +69,57 @@ export default class Client {
7069
}
7170

7271
/**
73-
* Configuration specific to the current request.
74-
* @typedef {Object} RequestOptions
75-
* @property {any} bindings - The parameter bindings to apply to the script.
76-
* @property {String} language - The language of the script to execute. Defaults to 'gremlin-lang'.
77-
* @property {String} accept - The MIME type expected in the response.
78-
* @property {Boolean} bulkResults - Indicates whether results should be returned in bulk format.
79-
* @property {Object} params - Additional parameters to include with the request.
80-
* @property {Number} batchSize - The size in which the result of a request is to be 'batched' back to the client.
81-
* @property {String} userAgent - The user agent string to send with the request.
82-
* @property {Number} evaluationTimeout - The timeout for the evaluation of the request.
83-
* @property {String} materializeProperties - Indicates whether element properties should be returned or not.
84-
*/
85-
86-
/**
87-
* Send a request to the Gremlin Server.
72+
* Send a request to the Gremlin Server and buffer the entire response.
8873
* @param {string} message The script to send
8974
* @param {Object|null} [bindings] The script bindings, if any.
9075
* @param {RequestOptions} [requestOptions] Configuration specific to the current request.
91-
* @returns {Promise}
92-
*/ //TODO:: tighten return type to Promise<ResultSet>
76+
* @returns {Promise<ResultSet>}
77+
*/
9378
submit(message: string, bindings: any | null, requestOptions?: RequestOptions): Promise<any> {
94-
const requestBuilder = RequestMessage.build(message)
95-
.addG(this.options.traversalSource || 'g')
96-
97-
if (requestOptions?.language) {
98-
requestBuilder.addLanguage(requestOptions.language);
99-
}
100-
if (requestOptions?.bindings) {
101-
requestBuilder.addBindings(requestOptions.bindings);
102-
}
103-
if (bindings) {
104-
requestBuilder.addBindings(bindings);
105-
}
106-
if (requestOptions?.materializeProperties) {
107-
requestBuilder.addMaterializeProperties(requestOptions.materializeProperties);
108-
}
109-
if (requestOptions?.evaluationTimeout) {
110-
requestBuilder.addTimeoutMillis(requestOptions.evaluationTimeout);
111-
}
112-
if (requestOptions?.bulkResults) {
113-
requestBuilder.addBulkResults(requestOptions.bulkResults);
114-
}
115-
116-
return this._connection.submit(requestBuilder.create());
79+
return this._connection.submit(this.#buildRequest(message, bindings, requestOptions));
11780
}
11881

11982
/**
120-
* Send a request to the Gremlin Server and receive a stream for the results.
83+
* Send a request to the Gremlin Server and stream results incrementally.
84+
* Returns an AsyncGenerator that yields individual result items as they are
85+
* deserialized from the response. For bulked responses, yields Traverser objects.
12186
* @param {string} message The script to send
122-
* @param {Object} [bindings] The script bindings, if any.
87+
* @param {Object|null} [bindings] The script bindings, if any.
12388
* @param {RequestOptions} [requestOptions] Configuration specific to the current request.
124-
* @returns {ReadableStream}
89+
* @returns {AsyncGenerator<any>}
12590
*/
126-
//TODO:: Update stream() to mirror submit()
127-
stream(message: string, bindings: any, requestOptions?: RequestOptions): Readable {
128-
throw new Error("Stream not yet implemented");
91+
async *stream(message: string, bindings: any | null, requestOptions?: RequestOptions): AsyncGenerator<any> {
92+
return yield* this._connection.stream(this.#buildRequest(message, bindings, requestOptions));
93+
}
94+
95+
#buildRequest(message: string, bindings: any | null, requestOptions?: RequestOptions): RequestMessage {
96+
const requestBuilder = RequestMessage.build(message)
97+
.addG(this.options.traversalSource || 'g');
98+
99+
if (requestOptions?.language) {
100+
requestBuilder.addLanguage(requestOptions.language);
101+
}
102+
if (requestOptions?.bindings) {
103+
requestBuilder.addBindings(requestOptions.bindings);
104+
}
105+
if (bindings) {
106+
requestBuilder.addBindings(bindings);
107+
}
108+
if (requestOptions?.materializeProperties) {
109+
requestBuilder.addMaterializeProperties(requestOptions.materializeProperties);
110+
}
111+
if (requestOptions?.evaluationTimeout) {
112+
requestBuilder.addTimeoutMillis(requestOptions.evaluationTimeout);
113+
}
114+
if (requestOptions?.bulkResults) {
115+
requestBuilder.addBulkResults(requestOptions.bulkResults);
116+
}
117+
118+
return requestBuilder.create();
129119
}
130120

131121
/**
132122
* Closes the underlying connection
133-
* send session close request before connection close if session mode
134123
* @returns {Promise}
135124
*/
136125
close(): Promise<void> {
@@ -147,7 +136,7 @@ export default class Client {
147136
}
148137

149138
/**
150-
* Removes a previowsly added event listener to the connection
139+
* Removes a previously added event listener to the connection
151140
* @param {String} event The event name that you want to listen to.
152141
* @param {Function} handler The event handler to be removed.
153142
*/

gremlin-js/gremlin-javascript/lib/driver/connection.ts

Lines changed: 85 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,19 @@ import { Buffer } from 'buffer';
2525
import { EventEmitter } from 'eventemitter3';
2626
import type { Agent } from 'node:http';
2727
import ioc from '../structure/io/binary/GraphBinary.js';
28+
import StreamReader from '../structure/io/binary/internals/StreamReader.js';
2829
import * as utils from '../utils.js';
2930
import ResultSet from './result-set.js';
3031
import {RequestMessage} from "./request-message.js";
31-
import {Readable} from "stream";
3232
import ResponseError from './response-error.js';
3333
import { Traverser } from '../process/traversal.js';
3434

35-
const { DeferredPromise } = utils;
3635
const { graphBinaryReader, graphBinaryWriter } = ioc;
3736

3837
const responseStatusCode = {
3938
success: 200,
4039
noContent: 204,
4140
partialContent: 206,
42-
authenticationChallenge: 407,
4341
};
4442

4543
export type HttpRequest = {
@@ -115,20 +113,87 @@ export default class Connection extends EventEmitter {
115113
return Promise.resolve();
116114
}
117115

118-
/** @override */
119-
submit(request: RequestMessage) {
120-
// The user may not want the body to be serialized if they are using an interceptor.
116+
/**
117+
* Send a request and buffer the entire response. Returns a Promise<ResultSet>.
118+
*/
119+
async submit(request: RequestMessage) {
121120
const body = this._writer ? this._writer.writeRequest(request) : request;
122-
123-
return this.#makeHttpRequest(body)
124-
.then((response) => {
125-
return this.#handleResponse(response);
126-
});
121+
const response = await this.#makeHttpRequest(body);
122+
return this.#handleResponse(response);
127123
}
128124

129-
/** @override */
130-
stream(request: RequestMessage): Readable {
131-
throw new Error('stream() is not yet implemented');
125+
/**
126+
* Send a request and stream the response incrementally.
127+
* Returns an AsyncGenerator that yields deserialized result items.
128+
* For bulked responses, yields Traverser objects.
129+
*
130+
* In the GraphBinary v4 streaming protocol, the server sends the status after all
131+
* result data. If the server encounters an error mid-traversal, values yielded before
132+
* the error are valid partial results. A ResponseError is thrown after the last value
133+
* has been yielded.
134+
*
135+
* @param {RequestMessage} request
136+
* @returns {AsyncGenerator<any>}
137+
*/
138+
async *stream(request: RequestMessage): AsyncGenerator<any> {
139+
const body = this._writer ? this._writer.writeRequest(request) : request;
140+
const abortController = new AbortController();
141+
142+
let response: Response;
143+
try {
144+
response = await this.#makeHttpRequest(body, abortController.signal);
145+
} catch (e: any) {
146+
throw new Error(`Stream request failed: ${e.message}`, { cause: e });
147+
}
148+
149+
if (!response.ok) {
150+
// For error responses, buffer and parse the error body
151+
const buffer = Buffer.from(await response.arrayBuffer());
152+
const errorMessage = `Server returned HTTP ${response.status}: ${response.statusText}`;
153+
const reader = this.#getReaderForContentType(response.headers.get("Content-Type"));
154+
155+
if (reader) {
156+
try {
157+
const deserialized = await reader.readResponse(buffer);
158+
const attributes = new Map();
159+
if (deserialized.status.exception) {
160+
attributes.set('exceptions', deserialized.status.exception);
161+
attributes.set('stackTrace', deserialized.status.exception);
162+
}
163+
throw new ResponseError(errorMessage, {
164+
code: deserialized.status.code,
165+
message: deserialized.status.message || response.statusText,
166+
attributes,
167+
});
168+
} catch (err) {
169+
if (err instanceof ResponseError) throw err;
170+
}
171+
}
172+
173+
throw new ResponseError(errorMessage, {
174+
code: response.status,
175+
message: response.statusText,
176+
attributes: new Map(),
177+
});
178+
}
179+
180+
if (!response.body) {
181+
// 204 No Content — nothing to yield
182+
return;
183+
}
184+
185+
const streamReader = StreamReader.fromReadableStream(response.body);
186+
187+
let completed = false;
188+
try {
189+
yield* this._reader.readResponseStream(streamReader);
190+
completed = true;
191+
} finally {
192+
if (!completed) {
193+
// Consumer broke out early or an error occurred — abort to release the connection
194+
abortController.abort();
195+
}
196+
}
132197
}
133198

134199
#getReaderForContentType(contentType: string | null) {
@@ -143,7 +208,7 @@ export default class Connection extends EventEmitter {
143208
return null;
144209
}
145210

146-
async #makeHttpRequest(body: any): Promise<Response> {
211+
async #makeHttpRequest(body: any, signal?: AbortSignal): Promise<Response> {
147212
const headers: Record<string, string> = {
148213
'Accept': this._reader.mimeType
149214
};
@@ -164,6 +229,7 @@ export default class Connection extends EventEmitter {
164229
headers[key] = Array.isArray(value) ? value.join(', ') : value;
165230
});
166231
}
232+
167233
let httpRequest: HttpRequest = {
168234
url: this.url,
169235
method: 'POST',
@@ -183,6 +249,7 @@ export default class Connection extends EventEmitter {
183249
method: httpRequest.method,
184250
headers: httpRequest.headers,
185251
body: httpRequest.body,
252+
signal,
186253
});
187254
}
188255

@@ -196,7 +263,7 @@ export default class Connection extends EventEmitter {
196263

197264
try {
198265
if (reader) {
199-
const deserialized = reader.readResponse(buffer);
266+
const deserialized = await reader.readResponse(buffer);
200267
const attributes = new Map();
201268
if (deserialized.status.exception) {
202269
attributes.set('exceptions', deserialized.status.exception);
@@ -232,9 +299,9 @@ export default class Connection extends EventEmitter {
232299
throw new Error(`Response Content-Type '${contentType}' does not match the configured reader (expected '${this._reader.mimeType}')`);
233300
}
234301

235-
const deserialized = reader.readResponse(buffer);
302+
const deserialized = await reader.readResponse(buffer);
236303

237-
if (deserialized.status.code && deserialized.status.code !== 200 && deserialized.status.code !== 204 && deserialized.status.code !== 206) {
304+
if (deserialized.status.code && deserialized.status.code !== responseStatusCode.success && deserialized.status.code !== responseStatusCode.noContent && deserialized.status.code !== responseStatusCode.partialContent) {
238305
const attributes = new Map();
239306
if (deserialized.status.exception) {
240307
attributes.set('exceptions', deserialized.status.exception);

gremlin-js/gremlin-javascript/lib/driver/driver-remote-connection.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import * as rcModule from './remote-connection.js';
2525
const RemoteConnection = rcModule.RemoteConnection;
2626
const RemoteTraversal = rcModule.RemoteTraversal;
27-
import * as utils from '../utils.js';
2827
import Client, { RequestOptions } from './client.js';
2928
import GremlinLang from '../process/gremlin-lang.js';
3029
import { ConnectionOptions } from './connection.js';
@@ -57,6 +56,14 @@ export default class DriverRemoteConnection extends RemoteConnection {
5756

5857
/** @override */
5958
submit(gremlinLang: GremlinLang) {
59+
const { gremlin, requestOptions } = this.#buildRequestArgs(gremlinLang);
60+
61+
// Use streaming internally — returns an AsyncGenerator backed RemoteTraversal
62+
const generator = this._client.stream(gremlin, null, requestOptions);
63+
return Promise.resolve(new RemoteTraversal(generator));
64+
}
65+
66+
#buildRequestArgs(gremlinLang: GremlinLang) {
6067
gremlinLang.addG(this.options.traversalSource || 'g');
6168

6269
let requestOptions: RequestOptions | undefined = undefined;
@@ -93,8 +100,7 @@ export default class DriverRemoteConnection extends RemoteConnection {
93100
requestOptions.params = Object.fromEntries(params);
94101
}
95102

96-
return this._client.submit(gremlinLang.getGremlin(), null, requestOptions)
97-
.then((result) => new RemoteTraversal(result.toArray()));
103+
return { gremlin: gremlinLang.getGremlin(), requestOptions };
98104
}
99105

100106
override commit() {

0 commit comments

Comments
 (0)