Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/node/hooks/express/socketio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ const socketSessionMiddleware = (args: any) => (socket: any, next: Function) =>
};

export const expressCreateServer = (hookName:string, args:ArgsExpressType, cb:Function) => {
// Engine.io socket flush deferral (#7756 / #7767). Apply BEFORE building
// the socket.io Server so the patched Socket prototype is in effect when
// the Server creates its engine.
if (settings.engineFlushDefer === true) {
// eslint-disable-next-line @typescript-eslint/no-require-imports
require('../../utils/EngineFlushDeferral').installEngineFlushDeferral();
}

// init socket.io and redirect all requests to the MessageHandler
// there shouldn't be a browser that isn't compatible to all
// transports in this list at once
Expand Down
95 changes: 95 additions & 0 deletions src/node/utils/EngineFlushDeferral.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Engine.io socket flush deferral — #7756 / #7767 deeper investigation
// after the simple WS transport-level packing prototype (#7772) showed
// that the writeBuffer almost never accumulates because flush() drains
// immediately on `transport.writable === true`.
//
// engine.io's Socket.sendPacket(...) ends with:
//
// this.writeBuffer.push(packet);
// if (callback) this.packetsFn.push(callback);
// this.flush(); // <-- synchronous
//
// flush() reads writeBuffer and hands it to transport.send. For
// WebSocket, transport.writable is true again within microseconds of
// each write, so each sendPacket() call drains a buffer of size 1. The
// transport.send([packets]) function then iterates packets and writes
// one WS frame per packet — which is what the polling transport's
// natural encodePayload batching avoids.
//
// This patch coalesces synchronous-task sendPacket calls onto a single
// microtask-scheduled flush. Inside the same JS task, multiple
// sendPacket() calls accumulate in writeBuffer; the queued microtask
// then calls flush() once with the whole batch. The transport's
// send([batch]) sees N > 1 packets and the WS payload-encoding fast
// path (also added by lever 8) coalesces them into one frame.
//
// Microtask deferral adds zero meaningful wall-clock latency:
// microtasks drain before the next macrotask, so any consumer waiting
// on the next setImmediate / setTimeout / I/O callback still sees the
// flush completed.
//
// Forward-compatible. Existing clients receive identical wire bytes
// because the engine.io packet encoding is unchanged; the difference
// is only how many engine.io packets share one transport-level send
// call. The WS transport's send([packets]) path is then where lever 8
// (or this patch's accompanying engine-packing branch) decides
// whether to ship them as N frames or one payload-encoded frame.
//
// Gated by settings.engineFlushDefer. Default off; production unaffected.

import log4js from 'log4js';

const logger = log4js.getLogger('engine-flush-defer');

let installed = false;

const SCHEDULED = Symbol('engineFlushScheduled');

export const installEngineFlushDeferral = (): void => {
if (installed) return;
installed = true;

let SocketProto: {sendPacket: (...a: unknown[]) => unknown};
try {
// eslint-disable-next-line @typescript-eslint/no-require-imports
SocketProto = require('engine.io/build/socket').Socket.prototype;
} catch (err: any) {
logger.warn(`Unable to install engine.io flush deferral (module not found): ${err && err.message || err}`);
return;
}
if (typeof SocketProto.sendPacket !== 'function') {
logger.warn('engine.io Socket shape unexpected; skipping flush deferral patch');
return;
}

// Re-implementing sendPacket inline rather than wrapping the original
// so the single closing `this.flush()` becomes a microtask-coalesced
// schedule. The body is intentionally a near-verbatim copy of the
// engine.io 6.6.5 implementation so future engine.io upgrades that
// change packet-shape semantics still need re-vetting.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
SocketProto.sendPacket = function (this: any, type: any, data: any, options: any, callback: any) {
if ('function' === typeof options) {
callback = options;
options = {};
}
if ('closing' === this.readyState || 'closed' === this.readyState) return;

options = options || {};
options.compress = options.compress !== false;
const packet: any = {type, options};
if (data !== undefined) packet.data = data;
this.emit('packetCreate', packet);
this.writeBuffer.push(packet);
if ('function' === typeof callback) this.packetsFn.push(callback);

if (this[SCHEDULED]) return;
this[SCHEDULED] = true;
queueMicrotask(() => {
this[SCHEDULED] = false;
this.flush();
});
};

logger.info('engine.io socket flush deferral enabled (#7756 / #7767)');
};
14 changes: 14 additions & 0 deletions src/node/utils/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ export type SettingsType = {
automaticReconnectionTimeout: number,
loadTest: boolean,
scalingDiveMetrics: boolean,
engineFlushDefer: boolean,
dumpOnUncleanExit: boolean,
indentationOnNewLine: boolean,
logconfig: any | null,
Expand Down Expand Up @@ -658,6 +659,19 @@ const settings: SettingsType = {
* production deployments aren't paying for instrumentation they don't use.
*/
scalingDiveMetrics: false,
/**
* Defer engine.io socket flush onto the next microtask so multiple
* sendPacket() calls within the same task accumulate in the writeBuffer
* before drain. Pairs with engine.io's existing transport.send([packets])
* fast path so a batched send produces fewer WebSocket frames.
*
* Adds no meaningful wall-clock latency — microtasks drain before any
* subsequent macrotask. Backward-compatible at the wire level; existing
* clients receive identical packet bytes.
*
* Default false. Enable only when scoring under the scaling dive.
*/
engineFlushDefer: false,
Comment thread
qodo-free-for-open-source-projects[bot] marked this conversation as resolved.
/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
Loading