Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions packages/contracts/source/contracts/kernel/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ export type RequestCallbacks<T = unknown> = RequestCallback<T>;

export type EventCallback<T = unknown> = (data: T) => void;

export interface Handler<T extends object> {
handleRequest<K extends Requests<T>>(method: K): void;
}

export interface Subprocess {
dispose(): Promise<number>;
drain(): Promise<void>;
Expand Down
1 change: 1 addition & 0 deletions packages/kernel/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"@types/split2": "4.2.3",
"@types/tmp": "0.2.6",
"capture-console": "1.0.2",
"esmock": "2.7.5",
"moment-timezone": "0.6.2",
"tmp": "0.2.5",
"uvu": "0.5.6"
Expand Down
27 changes: 27 additions & 0 deletions packages/kernel/source/ipc/emit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { describe } from "@mainsail/test-runner";
import esmock from "esmock";

// `emit` talks to the worker_threads parentPort, which is null on the main thread (where
// tests run). esmock lets us substitute a fake parentPort so the postMessage call is observable.
describe<{
load: (parentPort: unknown) => Promise<{ emit: (event: string, data: unknown) => void }>;
}>("Emit", ({ assert, beforeEach, it }) => {
beforeEach((context) => {
context.load = (parentPort) => esmock("./emit", { worker_threads: { parentPort } });
});

it("posts a {data, event} message to the parent port", async (context) => {
const posted: unknown[] = [];
const { emit } = await context.load({ postMessage: (message: unknown) => posted.push(message) });

emit("block.applied", { height: 10 });

assert.equal(posted, [{ data: { height: 10 }, event: "block.applied" }]);
});

it("is a no-op when there is no parent port", async (context) => {
const { emit } = await context.load(null);

assert.not.throws(() => emit("block.applied", { height: 10 }));
});
});
113 changes: 113 additions & 0 deletions packages/kernel/source/ipc/handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { describe } from "@mainsail/test-runner";
import { EventEmitter } from "events";
import esmock from "esmock";

// The Handler listens on the worker_threads parentPort, which is null on the main thread
// where tests run. A fake EventEmitter parentPort lets us drive incoming messages and capture
// the replies the handler posts back.
class FakeParentPort extends EventEmitter {
public readonly posted: any[] = [];

public postMessage(message: unknown): void {
this.posted.push(message);
}
}

const flush = (): Promise<void> => new Promise((resolve) => setImmediate(resolve));

describe<{
parentPort: FakeParentPort;
makeHandler: <T extends object>(handler: T) => Promise<unknown>;
}>("Handler", ({ assert, beforeEach, it }) => {
beforeEach((context) => {
context.parentPort = new FakeParentPort();
context.makeHandler = async (handler) => {
const { Handler } = await esmock("./handler", { worker_threads: { parentPort: context.parentPort } });
return new Handler(handler);
};
});

it("registers a single message listener on the parent port", async (context) => {
await context.makeHandler({});

assert.equal(context.parentPort.listenerCount("message"), 1);
});

it("invokes the requested method and posts the result keyed by id", async (context) => {
await context.makeHandler({ add: (a: number, b: number) => a + b });

context.parentPort.emit("message", { args: [2, 3], id: "req-1", method: "add" });
await flush();

assert.equal(context.parentPort.posted, [{ id: "req-1", result: 5 }]);
});

it("awaits async handler methods before replying", async (context) => {
await context.makeHandler({ slow: async () => "done" });

context.parentPort.emit("message", { args: [], id: "req-2", method: "slow" });
await flush();

assert.equal(context.parentPort.posted, [{ id: "req-2", result: "done" }]);
});

it("forwards the handler's arguments in order", async (context) => {
let received: unknown[] = [];
await context.makeHandler({ record: (...arguments_: unknown[]) => (received = arguments_) });

context.parentPort.emit("message", { args: ["a", 1, { x: true }], id: "req-3", method: "record" });
await flush();

assert.equal(received, ["a", 1, { x: true }]);
});

it("posts an error reply when the method is not defined on the handler", async (context) => {
await context.makeHandler({});

context.parentPort.emit("message", { args: [], id: "req-4", method: "missing" });
await flush();

assert.equal(context.parentPort.posted, [
{ error: "Method missing is not defined on the handler", id: "req-4" },
]);
});

it("posts an error reply when the handler throws", async (context) => {
await context.makeHandler({
boom: () => {
throw new Error("custom-error");
},
});

context.parentPort.emit("message", { args: [], id: "req-5", method: "boom" });
await flush();

assert.equal(context.parentPort.posted, [{ error: "custom-error", id: "req-5" }]);
});

it("posts an error reply when an async handler rejects", async (context) => {
await context.makeHandler({
boom: async () => {
throw new Error("custom-async-error");
},
});

context.parentPort.emit("message", { args: [], id: "req-6", method: "boom" });
await flush();

assert.equal(context.parentPort.posted, [{ error: "custom-async-error", id: "req-6" }]);
});

it("normalizes a non-Error thrown value into an error reply", async (context) => {
await context.makeHandler({
boom: () => {
throw "just a string";
},
});

context.parentPort.emit("message", { args: [], id: "req-7", method: "boom" });
await flush();

assert.equal(context.parentPort.posted, [{ error: "just a string", id: "req-7" }]);
});
});
8 changes: 1 addition & 7 deletions packages/kernel/source/ipc/handler.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import type { Contracts } from "@mainsail/contracts";

import { ensureError } from "@mainsail/utils";
import { parentPort } from "worker_threads";

export class Handler<T extends object> implements Contracts.Kernel.IPC.Handler<T> {
export class Handler<T extends object> {
private readonly handler: T;

public constructor(handler: T) {
this.handler = handler;

this.handleRequest();
}

public handleRequest(): void {
parentPort?.on("message", (message) => {
void this.#onMessage(message);
});
Expand Down
Loading
Loading