Skip to content

Commit 7a520cf

Browse files
authored
Implement keepalive on websockets (#2690)
2 parents 8581eb6 + 1338953 commit 7a520cf

6 files changed

Lines changed: 110 additions & 1 deletion

File tree

.changeset/lovely-moments-argue.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"postgraphile": patch
3+
"grafserv": patch
4+
---
5+
6+
Add keepalive to grafserv websocket connections.

grafast/grafserv/src/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ declare global {
146146
*/
147147
websockets?: boolean;
148148

149+
/**
150+
* Duration (in milliseconds) between pings. Set to `-1` to disable.
151+
*
152+
* @defaultValue `12_000`
153+
*/
154+
websocketKeepalive?: number;
155+
149156
/**
150157
* If you would like to customize the way in which errors are masked, you may
151158
* pass your own error masking function here. You can also import

grafast/grafserv/src/servers/fastify/v4/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
normalizeRequest,
2525
processHeaders,
2626
} from "../../../utils.js";
27+
import { DEFAULT_WEBSOCKET_KEEPALIVE } from "../../../websocketKeepalive.js";
2728

2829
declare global {
2930
namespace Grafast {
@@ -202,8 +203,11 @@ export class FastifyGrafserv extends GrafservBase {
202203
};
203204

204205
// Build websocket handler.
206+
const keepaliveInterval =
207+
this.getPreset().grafserv?.websocketKeepalive ??
208+
DEFAULT_WEBSOCKET_KEEPALIVE;
205209
const wsHandler = websockets
206-
? makeHandler(makeGraphQLWSConfig(this))
210+
? makeHandler(makeGraphQLWSConfig(this), keepaliveInterval)
207211
: undefined;
208212

209213
// Attach HTTP handler for POST requests.

grafast/grafserv/src/servers/node/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
makeGraphQLWSConfig,
2121
processHeaders,
2222
} from "../../utils.js";
23+
import { handleWebSocketKeepalive } from "../../websocketKeepalive.js";
2324

2425
declare global {
2526
namespace Grafast {
@@ -352,6 +353,8 @@ export async function makeNodeUpgradeHandler(instance: GrafservBase) {
352353
});
353354
};
354355
const onConnection = (socket: WebSocket, request: IncomingMessage) => {
356+
handleWebSocketKeepalive(socket, instance.getPreset());
357+
355358
// a new socket opened, let graphql-ws take over
356359
const closed = graphqlWsServer.opened(
357360
{
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
export const DEFAULT_WEBSOCKET_KEEPALIVE = 12_000;
2+
import type WebSocket from "ws";
3+
4+
export function handleWebSocketKeepalive(
5+
socket: WebSocket,
6+
preset: GraphileConfig.ResolvedPreset,
7+
): void {
8+
const keepaliveInterval =
9+
preset.grafserv?.websocketKeepalive ?? DEFAULT_WEBSOCKET_KEEPALIVE;
10+
if (!Number.isFinite(keepaliveInterval) || keepaliveInterval <= 0) {
11+
// Keepalive disabled
12+
return;
13+
}
14+
15+
/**
16+
* Sending a ping and waiting for a pong are mutually exclusive, so this
17+
* timer is used for both.
18+
*/
19+
let timer: NodeJS.Timeout | null = null;
20+
21+
/**
22+
* Cleans up the timer, always call this before re-assigning timer (to ensure
23+
* that an out-of-order pong doesn't cause two timers to run concurrently).
24+
*/
25+
const stopTimer = () => {
26+
if (timer != null) {
27+
clearTimeout(timer);
28+
timer = null;
29+
}
30+
};
31+
32+
/** First half of a heart beat - send ping */
33+
const sendPing = () => {
34+
stopTimer();
35+
// Schedule timeout
36+
timer = setTimeout(handleTimeout, keepaliveInterval);
37+
socket.ping();
38+
};
39+
/** Second half of a heart beat - receive pong */
40+
const handlePong = () => {
41+
stopTimer();
42+
// Schedule the next ping
43+
timer = setTimeout(sendPing, keepaliveInterval);
44+
};
45+
46+
/** Terminal handler, due to timeout */
47+
const handleTimeout = () => {
48+
stopTimer();
49+
releaseListeners();
50+
// Kill the socket (after we've released the listeners)
51+
socket.terminate();
52+
};
53+
/** Terminal handler, due to natural socket close */
54+
const handleClose = (_code: number, _reason: Buffer) => {
55+
stopTimer();
56+
releaseListeners();
57+
};
58+
59+
const releaseListeners = () => {
60+
socket.off("pong", handlePong);
61+
socket.off("close", handleClose);
62+
};
63+
socket.on("pong", handlePong);
64+
socket.on("close", handleClose);
65+
66+
// Schedule the first ping
67+
timer = setTimeout(sendPing, keepaliveInterval);
68+
}

postgraphile/postgraphile/graphile.config.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ const preset: GraphileConfig.Preset = {
427427
extend type Subscription {
428428
sub(topic: String!): Int
429429
gql: Int
430+
slow: String
430431
}
431432
`,
432433
objects: {
@@ -472,6 +473,26 @@ const preset: GraphileConfig.Preset = {
472473
[sleep],
473474
),
474475
},
476+
slow: {
477+
resolve: EXPORTABLE(
478+
() =>
479+
function resolve(e) {
480+
return e;
481+
},
482+
[],
483+
),
484+
subscribe: EXPORTABLE(
485+
(sleep) =>
486+
async function* subscribe() {
487+
while (true) {
488+
yield new Date().toISOString();
489+
// Wait two minutes between ticks
490+
await sleep(120000);
491+
}
492+
},
493+
[sleep],
494+
),
495+
},
475496
},
476497
},
477498
},

0 commit comments

Comments
 (0)