Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
=== TinkerPop 4.0.0 (Release Date: NOT OFFICIALLY RELEASED YET)

* Added Gremlator, a single page web application, that translates Gremlin into various programming languages like Javascript and Python.
* Connected HTTP streaming response deserialization to the traversal API in `gremlin-javascript`, enabling `next()` to return the first result without waiting for the full response.
* Changed `Client.stream()` in `gremlin-javascript` to return an `AsyncGenerator` for direct incremental consumption.
* Removed `readable-stream` dependency from `gremlin-javascript`.

[[release-4-0-0-beta-2]]
=== TinkerPop 4.0.0-beta.2 (April 1, 2026)
Expand Down
33 changes: 33 additions & 0 deletions docs/src/upgrade/release-4.x.x.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,39 @@ anonymized form. The original gremlator.com was a prototype built by TinkerPop c
previous implementation required Java and a running Gremlin Server, whereas the new version runs entirely in the
browser with no server infrastructure needed.

==== JS HTTP Streaming Response Support

The JavaScript driver now supports incremental HTTP streaming. Results are deserialized from the server response as
they arrive, rather than buffering the entire response before processing.

Traversal API terminal steps (`next()`, `toList()`, `hasNext()`) are now truly incremental. `next()`
returns the first result as soon as it is deserialized from the wire, without waiting for the full response.
In 3.x, `next()` waited for all WebSocket frames before returning.

`Client.stream()` now returns an `AsyncGenerator` for direct incremental consumption. This is a breaking change
from 3.x where `stream()` returned a Node.js `Readable`. The new return type works in both Node.js and browsers:

[source,javascript]
----
// 3.x — Readable stream (no longer supported)
// const stream = client.stream('g.V()');
// stream.on('data', (resultSet) => { ... });

// 4.0 — AsyncGenerator
for await (const item of client.stream('g.V()', null)) {
console.log(item);
if (someCondition) break; // stops reading from the HTTP stream
}
----

`Client.submit()` remains unchanged. It still buffers the full response and returns `Promise<ResultSet>`.

=== Upgrading for Providers

==== Graph System Providers

==== Graph Driver Providers

== TinkerPop 4.0.0-beta.2

*Release Date: April 1, 2026*
Expand Down
87 changes: 38 additions & 49 deletions gremlin-js/gremlin-javascript/lib/driver/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/

import Connection, { ConnectionOptions } from './connection.js';
import { Readable } from 'stream';
import {RequestMessage} from "./request-message.js";

export type RequestOptions = {
Expand Down Expand Up @@ -70,67 +69,57 @@ export default class Client {
}

/**
* Configuration specific to the current request.
* @typedef {Object} RequestOptions
* @property {any} bindings - The parameter bindings to apply to the script.
* @property {String} language - The language of the script to execute. Defaults to 'gremlin-lang'.
* @property {String} accept - The MIME type expected in the response.
* @property {Boolean} bulkResults - Indicates whether results should be returned in bulk format.
* @property {Object} params - Additional parameters to include with the request.
* @property {Number} batchSize - The size in which the result of a request is to be 'batched' back to the client.
* @property {String} userAgent - The user agent string to send with the request.
* @property {Number} evaluationTimeout - The timeout for the evaluation of the request.
* @property {String} materializeProperties - Indicates whether element properties should be returned or not.
*/

/**
* Send a request to the Gremlin Server.
* Send a request to the Gremlin Server and buffer the entire response.
* @param {string} message The script to send
* @param {Object|null} [bindings] The script bindings, if any.
* @param {RequestOptions} [requestOptions] Configuration specific to the current request.
* @returns {Promise}
*/ //TODO:: tighten return type to Promise<ResultSet>
* @returns {Promise<ResultSet>}
*/
submit(message: string, bindings: any | null, requestOptions?: RequestOptions): Promise<any> {
const requestBuilder = RequestMessage.build(message)
.addG(this.options.traversalSource || 'g')

if (requestOptions?.language) {
requestBuilder.addLanguage(requestOptions.language);
}
if (requestOptions?.bindings) {
requestBuilder.addBindings(requestOptions.bindings);
}
if (bindings) {
requestBuilder.addBindings(bindings);
}
if (requestOptions?.materializeProperties) {
requestBuilder.addMaterializeProperties(requestOptions.materializeProperties);
}
if (requestOptions?.evaluationTimeout) {
requestBuilder.addTimeoutMillis(requestOptions.evaluationTimeout);
}
if (requestOptions?.bulkResults) {
requestBuilder.addBulkResults(requestOptions.bulkResults);
}

return this._connection.submit(requestBuilder.create());
return this._connection.submit(this.#buildRequest(message, bindings, requestOptions));
}

/**
* Send a request to the Gremlin Server and receive a stream for the results.
* Send a request to the Gremlin Server and stream results incrementally.
* Returns an AsyncGenerator that yields individual result items as they are
* deserialized from the response. For bulked responses, yields Traverser objects.
* @param {string} message The script to send
* @param {Object} [bindings] The script bindings, if any.
* @param {Object|null} [bindings] The script bindings, if any.
* @param {RequestOptions} [requestOptions] Configuration specific to the current request.
* @returns {ReadableStream}
* @returns {AsyncGenerator<any>}
*/
//TODO:: Update stream() to mirror submit()
stream(message: string, bindings: any, requestOptions?: RequestOptions): Readable {
throw new Error("Stream not yet implemented");
async *stream(message: string, bindings: any | null, requestOptions?: RequestOptions): AsyncGenerator<any> {
return yield* this._connection.stream(this.#buildRequest(message, bindings, requestOptions));
}

#buildRequest(message: string, bindings: any | null, requestOptions?: RequestOptions): RequestMessage {
const requestBuilder = RequestMessage.build(message)
.addG(this.options.traversalSource || 'g');

if (requestOptions?.language) {
requestBuilder.addLanguage(requestOptions.language);
}
if (requestOptions?.bindings) {
requestBuilder.addBindings(requestOptions.bindings);
}
if (bindings) {
requestBuilder.addBindings(bindings);
}
if (requestOptions?.materializeProperties) {
requestBuilder.addMaterializeProperties(requestOptions.materializeProperties);
}
if (requestOptions?.evaluationTimeout) {
requestBuilder.addTimeoutMillis(requestOptions.evaluationTimeout);
}
if (requestOptions?.bulkResults) {
requestBuilder.addBulkResults(requestOptions.bulkResults);
}

return requestBuilder.create();
}

/**
* Closes the underlying connection
* send session close request before connection close if session mode
* @returns {Promise}
*/
close(): Promise<void> {
Expand All @@ -147,7 +136,7 @@ export default class Client {
}

/**
* Removes a previowsly added event listener to the connection
* Removes a previously added event listener to the connection
* @param {String} event The event name that you want to listen to.
* @param {Function} handler The event handler to be removed.
*/
Expand Down
103 changes: 85 additions & 18 deletions gremlin-js/gremlin-javascript/lib/driver/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,19 @@ import { Buffer } from 'buffer';
import { EventEmitter } from 'eventemitter3';
import type { Agent } from 'node:http';
import ioc from '../structure/io/binary/GraphBinary.js';
import StreamReader from '../structure/io/binary/internals/StreamReader.js';
import * as utils from '../utils.js';
import ResultSet from './result-set.js';
import {RequestMessage} from "./request-message.js";
import {Readable} from "stream";
import ResponseError from './response-error.js';
import { Traverser } from '../process/traversal.js';

const { DeferredPromise } = utils;
const { graphBinaryReader, graphBinaryWriter } = ioc;

const responseStatusCode = {
success: 200,
noContent: 204,
partialContent: 206,
authenticationChallenge: 407,
};

export type HttpRequest = {
Expand Down Expand Up @@ -115,20 +113,87 @@ export default class Connection extends EventEmitter {
return Promise.resolve();
}

/** @override */
submit(request: RequestMessage) {
// The user may not want the body to be serialized if they are using an interceptor.
/**
* Send a request and buffer the entire response. Returns a Promise<ResultSet>.
*/
async submit(request: RequestMessage) {
const body = this._writer ? this._writer.writeRequest(request) : request;

return this.#makeHttpRequest(body)
.then((response) => {
return this.#handleResponse(response);
});
const response = await this.#makeHttpRequest(body);
return this.#handleResponse(response);
}

/** @override */
stream(request: RequestMessage): Readable {
throw new Error('stream() is not yet implemented');
/**
* Send a request and stream the response incrementally.
* Returns an AsyncGenerator that yields deserialized result items.
* For bulked responses, yields Traverser objects.
*
* In the GraphBinary v4 streaming protocol, the server sends the status after all
* result data. If the server encounters an error mid-traversal, values yielded before
* the error are valid partial results. A ResponseError is thrown after the last value
* has been yielded.
*
* @param {RequestMessage} request
* @returns {AsyncGenerator<any>}
*/
async *stream(request: RequestMessage): AsyncGenerator<any> {
const body = this._writer ? this._writer.writeRequest(request) : request;
const abortController = new AbortController();

let response: Response;
try {
response = await this.#makeHttpRequest(body, abortController.signal);
} catch (e: any) {
throw new Error(`Stream request failed: ${e.message}`, { cause: e });
}

if (!response.ok) {
// For error responses, buffer and parse the error body
const buffer = Buffer.from(await response.arrayBuffer());
const errorMessage = `Server returned HTTP ${response.status}: ${response.statusText}`;
const reader = this.#getReaderForContentType(response.headers.get("Content-Type"));

if (reader) {
try {
const deserialized = await reader.readResponse(buffer);
const attributes = new Map();
if (deserialized.status.exception) {
attributes.set('exceptions', deserialized.status.exception);
attributes.set('stackTrace', deserialized.status.exception);
}
throw new ResponseError(errorMessage, {
code: deserialized.status.code,
message: deserialized.status.message || response.statusText,
attributes,
});
} catch (err) {
if (err instanceof ResponseError) throw err;
}
}

throw new ResponseError(errorMessage, {
code: response.status,
message: response.statusText,
attributes: new Map(),
});
}

if (!response.body) {
// 204 No Content — nothing to yield
return;
}

const streamReader = StreamReader.fromReadableStream(response.body);

let completed = false;
try {
yield* this._reader.readResponseStream(streamReader);
completed = true;
} finally {
if (!completed) {
// Consumer broke out early or an error occurred — abort to release the connection
abortController.abort();
}
}
}

#getReaderForContentType(contentType: string | null) {
Expand All @@ -143,7 +208,7 @@ export default class Connection extends EventEmitter {
return null;
}

async #makeHttpRequest(body: any): Promise<Response> {
async #makeHttpRequest(body: any, signal?: AbortSignal): Promise<Response> {
const headers: Record<string, string> = {
'Accept': this._reader.mimeType
};
Expand All @@ -164,6 +229,7 @@ export default class Connection extends EventEmitter {
headers[key] = Array.isArray(value) ? value.join(', ') : value;
});
}

let httpRequest: HttpRequest = {
url: this.url,
method: 'POST',
Expand All @@ -183,6 +249,7 @@ export default class Connection extends EventEmitter {
method: httpRequest.method,
headers: httpRequest.headers,
body: httpRequest.body,
signal,
});
}

Expand All @@ -196,7 +263,7 @@ export default class Connection extends EventEmitter {

try {
if (reader) {
const deserialized = reader.readResponse(buffer);
const deserialized = await reader.readResponse(buffer);
const attributes = new Map();
if (deserialized.status.exception) {
attributes.set('exceptions', deserialized.status.exception);
Expand Down Expand Up @@ -232,9 +299,9 @@ export default class Connection extends EventEmitter {
throw new Error(`Response Content-Type '${contentType}' does not match the configured reader (expected '${this._reader.mimeType}')`);
}

const deserialized = reader.readResponse(buffer);
const deserialized = await reader.readResponse(buffer);

if (deserialized.status.code && deserialized.status.code !== 200 && deserialized.status.code !== 204 && deserialized.status.code !== 206) {
if (deserialized.status.code && deserialized.status.code !== responseStatusCode.success && deserialized.status.code !== responseStatusCode.noContent && deserialized.status.code !== responseStatusCode.partialContent) {
const attributes = new Map();
if (deserialized.status.exception) {
attributes.set('exceptions', deserialized.status.exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import * as rcModule from './remote-connection.js';
const RemoteConnection = rcModule.RemoteConnection;
const RemoteTraversal = rcModule.RemoteTraversal;
import * as utils from '../utils.js';
import Client, { RequestOptions } from './client.js';
import GremlinLang from '../process/gremlin-lang.js';
import { ConnectionOptions } from './connection.js';
Expand Down Expand Up @@ -57,6 +56,14 @@ export default class DriverRemoteConnection extends RemoteConnection {

/** @override */
submit(gremlinLang: GremlinLang) {
const { gremlin, requestOptions } = this.#buildRequestArgs(gremlinLang);

// Use streaming internally — returns an AsyncGenerator backed RemoteTraversal
const generator = this._client.stream(gremlin, null, requestOptions);
return Promise.resolve(new RemoteTraversal(generator));
}

#buildRequestArgs(gremlinLang: GremlinLang) {
gremlinLang.addG(this.options.traversalSource || 'g');

let requestOptions: RequestOptions | undefined = undefined;
Expand Down Expand Up @@ -93,8 +100,7 @@ export default class DriverRemoteConnection extends RemoteConnection {
requestOptions.params = Object.fromEntries(params);
}

return this._client.submit(gremlinLang.getGremlin(), null, requestOptions)
.then((result) => new RemoteTraversal(result.toArray()));
return { gremlin: gremlinLang.getGremlin(), requestOptions };
}

override commit() {
Expand Down
Loading
Loading