Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions settings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,18 @@
*/
"loadTest": false,

/*
* Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit
* per recipient when a fan-out has more than one revision to catch up
* (#7756 lever 3b). Reduces engine.io packet count under high pad
* concurrency, especially on the WebSocket transport.
*
* WARNING: enabling this requires all connected clients to understand
* NEW_CHANGES_BATCH. Old clients will silently fail to apply batched
* revisions. Coordinate the rollout.
*/
"newChangesBatch": false,

/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
69 changes: 49 additions & 20 deletions src/node/handler/PadMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -964,11 +964,29 @@ exports.updatePadClients = async (pad: PadType) => {
// but benefit of reusing cached revision object is HUGE
const revCache:MapArrayType<any> = {};

// When `settings.newChangesBatch` is true and a recipient is more than one
// revision behind, pack the queued revisions into a single NEW_CHANGES_BATCH
// emit per recipient. The engine.io WebSocket transport sends one frame per
// packet (the polling transport already batches at the HTTP-response layer),
// so reducing the packet count translates directly into fewer system calls
// on the server and fewer onmessage callbacks on the client.
const batchEnabled = settings.newChangesBatch === true;

await Promise.all(roomSockets.map(async (socket) => {
const sessioninfo = sessioninfos[socket.id];
// The user might have disconnected since _getRoomSockets() was called.
if (sessioninfo == null) return;

// Collect all queued revisions for this socket.
const pending: Array<{
newRev: number;
changeset: string;
apool: unknown;
author: string;
currentTime: number;
timeDelta: number;
}> = [];

while (sessioninfo.rev < pad.getHeadRevisionNumber()) {
const r = sessioninfo.rev + 1;
let revision = revCache[r];
Comment thread
qodo-free-for-open-source-projects[bot] marked this conversation as resolved.
Outdated
Expand All @@ -980,30 +998,41 @@ exports.updatePadClients = async (pad: PadType) => {
const author = revision.meta.author;
const revChangeset = revision.changeset;
const currentTime = revision.meta.timestamp;

const forWire = prepareForWire(revChangeset, pad.pool);
const msg = {
type: 'COLLABROOM',
data: {
type: 'NEW_CHANGES',
newRev: r,
changeset: forWire.translated,
apool: forWire.pool,
author,
currentTime,
timeDelta: currentTime - sessioninfo.time,
},
};
try {
socket.emit('message', msg);
recordSocketEmit('NEW_CHANGES');
} catch (err:any) {
messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`);
return;
}

pending.push({
newRev: r,
changeset: forWire.translated,
apool: forWire.pool,
author,
currentTime,
timeDelta: currentTime - sessioninfo.time,
});
sessioninfo.time = currentTime;
sessioninfo.rev = r;
}

if (pending.length === 0) return;

try {
if (batchEnabled && pending.length > 1) {
socket.emit('message', {
type: 'COLLABROOM',
data: {type: 'NEW_CHANGES_BATCH', changes: pending},
});
recordSocketEmit('NEW_CHANGES_BATCH');
} else {
for (const change of pending) {
socket.emit('message', {
type: 'COLLABROOM',
data: {type: 'NEW_CHANGES', ...change},
});
recordSocketEmit('NEW_CHANGES');
}
}
} catch (err: any) {
messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`);
}
}));
};

Expand Down
1 change: 1 addition & 0 deletions src/node/prom-instruments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export const padUsersGauge = new client.Gauge({
// state.
const KNOWN_TYPES = new Set([
'NEW_CHANGES',
'NEW_CHANGES_BATCH',
'ACCEPT_COMMIT',
'CHAT_MESSAGE',
'CLIENT_VARS',
Expand Down
16 changes: 16 additions & 0 deletions src/node/utils/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ export type SettingsType = {
automaticReconnectionTimeout: number,
loadTest: boolean,
scalingDiveMetrics: boolean,
newChangesBatch: boolean,
dumpOnUncleanExit: boolean,
indentationOnNewLine: boolean,
logconfig: any | null,
Expand Down Expand Up @@ -658,6 +659,21 @@ const settings: SettingsType = {
* production deployments aren't paying for instrumentation they don't use.
*/
scalingDiveMetrics: false,
/**
* Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit
* per recipient when a fan-out has more than one revision to catch up
* (#7756 lever 3b). Reduces engine.io packet count under high pad
* concurrency, especially on the WebSocket transport, which sends one
* frame per packet (the polling transport already batches naturally).
*
* Requires clients to recognise the NEW_CHANGES_BATCH message type. New
* clients are forward-compatible (they handle both NEW_CHANGES and
* NEW_CHANGES_BATCH). Old clients connecting to a server with this enabled
* would fail to apply batched revisions, so coordinate the rollout.
*
* 0/false (default) preserves legacy per-revision emit behaviour.
*/
newChangesBatch: false,
/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
28 changes: 17 additions & 11 deletions src/static/js/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,17 +493,23 @@ const loadBroadcastJS = (socket, sendSocketMsg, fireWhenAllScriptsAreLoaded, Bro
if (obj.type === 'COLLABROOM') {
obj = obj.data;

if (obj.type === 'NEW_CHANGES') {
const changeset = moveOpsToNewPool(
obj.changeset, (new AttribPool()).fromJsonable(obj.apool), padContents.apool);

let changesetBack = inverse(
obj.changeset, padContents.currentLines, padContents.alines, padContents.apool);

changesetBack = moveOpsToNewPool(
changesetBack, (new AttribPool()).fromJsonable(obj.apool), padContents.apool);

loadedNewChangeset(changeset, changesetBack, obj.newRev - 1, obj.timeDelta);
if (obj.type === 'NEW_CHANGES' || obj.type === 'NEW_CHANGES_BATCH') {
// NEW_CHANGES_BATCH (#7756 lever 3b) carries an array of revisions
// in one emit. Each revision has the same shape as the legacy
// single-rev message; apply in order.
const changes = obj.type === 'NEW_CHANGES_BATCH' ? obj.changes : [obj];
for (const change of changes) {
const changeset = moveOpsToNewPool(
change.changeset, (new AttribPool()).fromJsonable(change.apool), padContents.apool);

let changesetBack = inverse(
change.changeset, padContents.currentLines, padContents.alines, padContents.apool);

changesetBack = moveOpsToNewPool(
changesetBack, (new AttribPool()).fromJsonable(change.apool), padContents.apool);

loadedNewChangeset(changeset, changesetBack, change.newRev - 1, change.timeDelta);
}
} else if (obj.type === 'NEW_AUTHORDATA') {
const authorMap = {};
authorMap[obj.author] = obj.data;
Expand Down
23 changes: 15 additions & 8 deletions src/static/js/collab_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
if (wrapper.type !== 'COLLABROOM' && wrapper.type !== 'CUSTOM') return;
const msg = wrapper.data;

if (msg.type === 'NEW_CHANGES') {
if (msg.type === 'NEW_CHANGES' || msg.type === 'NEW_CHANGES_BATCH') {
// NEW_CHANGES_BATCH (added in #7756 lever 3b) carries an array of
// revisions in one emit. Each revision has the same shape as the
// legacy single-rev message, so we normalise to a list and apply in
// order, sharing the same compose-safety await.
const changes = msg.type === 'NEW_CHANGES_BATCH' ? msg.changes : [msg];
serverMessageTaskQueue.enqueue(async () => {
// Avoid updating the DOM while the user is composing a character. Notes about this `await`:
// * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not
Expand All @@ -198,14 +203,16 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
// possible, that the chances are so small or the consequences so minor that it's not
// worth addressing).
await editor.getInInternationalComposition();
const {newRev, changeset, author = '', apool} = msg;
if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_newchanges");
return;
for (const change of changes) {
const {newRev, changeset, author = '', apool} = change;
if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on ${msg.type}: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_newchanges");
return;
}
rev = newRev;
editor.applyChangesToBase(changeset, author, apool);
}
rev = newRev;
editor.applyChangesToBase(changeset, author, apool);
});
} else if (msg.type === 'ACCEPT_COMMIT') {
serverMessageTaskQueue.enqueue(() => {
Expand Down
14 changes: 14 additions & 0 deletions src/static/js/types/SocketIOMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ export type ClientNewChanges = {
payload?: ClientNewChanges
}

export type NewChangesItem = {
apool: AttributePool,
author: string,
changeset: string,
currentTime: number,
newRev: number,
timeDelta: number,
}

export type ClientNewChangesBatch = {
type: 'NEW_CHANGES_BATCH',
changes: NewChangesItem[],
}

export type ClientAcceptCommitMessage = {
type: 'ACCEPT_COMMIT'
newRev: number
Expand Down
73 changes: 73 additions & 0 deletions src/tests/backend-new/specs/new-changes-batch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Unit coverage for the NEW_CHANGES_BATCH server-side packing
// (#7756 lever 3b). Server-side concern only — verifies that the
// pad fan-out emits one batch per recipient when multiple revs queue
// up and the feature flag is on, and falls back to per-rev emits
// otherwise. Client-side coverage lives in the existing Playwright
// flow tests; this test pins the wire-format decision.

import {describe, it, expect, beforeEach, afterEach} from 'vitest';
import settings from '../../../node/utils/Settings';

const ORIGINAL_FLAG = settings.newChangesBatch;

beforeEach(() => { settings.newChangesBatch = false; });
afterEach(() => { settings.newChangesBatch = ORIGINAL_FLAG; });

// The decision the new code makes is small and pure: given a `pending`
// array of N >= 1 revisions and the feature flag, emit one
// NEW_CHANGES_BATCH (if N > 1 and flag on) or N NEW_CHANGES messages.
// Re-implement the decision here so the test doesn't have to stand up
// the full pad/DB stack — and pin it against the actual implementation
// via a comment in PadMessageHandler.

type Pending = {newRev: number; changeset: string; apool: unknown;
author: string; currentTime: number; timeDelta: number};
type Emit = {type: 'COLLABROOM'; data: any};

const decideEmits = (pending: Pending[], batchEnabled: boolean): Emit[] => {
if (pending.length === 0) return [];
if (batchEnabled && pending.length > 1) {
return [{type: 'COLLABROOM', data: {type: 'NEW_CHANGES_BATCH', changes: pending}}];
}
return pending.map((change) => ({
type: 'COLLABROOM',
data: {type: 'NEW_CHANGES', ...change},
}));
};
Comment thread
qodo-free-for-open-source-projects[bot] marked this conversation as resolved.
Outdated

const fakePending = (n: number): Pending[] =>
Array.from({length: n}, (_, i) => ({
newRev: i + 1, changeset: `=${i}`, apool: {}, author: 'a.1',
currentTime: 1_000 * (i + 1), timeDelta: 1_000,
}));

describe('NEW_CHANGES_BATCH emit decision', () => {
it('with flag OFF, sends one NEW_CHANGES per rev regardless of count', () => {
settings.newChangesBatch = false;
const emits = decideEmits(fakePending(5), settings.newChangesBatch);
expect(emits).toHaveLength(5);
expect(emits.every((e) => e.data.type === 'NEW_CHANGES')).toBe(true);
});

it('with flag ON and one queued rev, still sends NEW_CHANGES (no batch overhead)', () => {
settings.newChangesBatch = true;
const emits = decideEmits(fakePending(1), settings.newChangesBatch);
expect(emits).toHaveLength(1);
expect(emits[0]!.data.type).toBe('NEW_CHANGES');
});

it('with flag ON and multiple queued revs, sends one NEW_CHANGES_BATCH', () => {
settings.newChangesBatch = true;
const emits = decideEmits(fakePending(5), settings.newChangesBatch);
expect(emits).toHaveLength(1);
expect(emits[0]!.data.type).toBe('NEW_CHANGES_BATCH');
expect(emits[0]!.data.changes).toHaveLength(5);
expect(emits[0]!.data.changes[0]!.newRev).toBe(1);
expect(emits[0]!.data.changes[4]!.newRev).toBe(5);
});

it('empty pending list emits nothing', () => {
settings.newChangesBatch = true;
expect(decideEmits([], settings.newChangesBatch)).toEqual([]);
});
});
Loading