diff --git a/packages/evm-api-worker/package.json b/packages/evm-api-worker/package.json index 6d7cd6469..d01ce9d4b 100644 --- a/packages/evm-api-worker/package.json +++ b/packages/evm-api-worker/package.json @@ -23,12 +23,12 @@ "dependencies": { "@mainsail/constants": "workspace:*", "@mainsail/container": "workspace:*", - "@mainsail/kernel": "workspace:*", - "joi": "18.2.1" + "@mainsail/kernel": "workspace:*" }, "devDependencies": { "@mainsail/contracts": "workspace:*", "@mainsail/test-runner": "workspace:*", + "esmock": "2.7.5", "uvu": "0.5.6" }, "engines": { diff --git a/packages/evm-api-worker/source/handlers/commit.test.ts b/packages/evm-api-worker/source/handlers/commit.test.ts new file mode 100644 index 000000000..a8dbd31f7 --- /dev/null +++ b/packages/evm-api-worker/source/handlers/commit.test.ts @@ -0,0 +1,32 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { CommitHandler } from "./commit"; + +describe<{ + app: Application; + handler: CommitHandler; + stateStore: any; + logger: any; +}>("CommitHandler", ({ assert, beforeEach, it, spy }) => { + beforeEach((context) => { + context.stateStore = { setBlockNumber: () => {} }; + context.logger = { error: () => {} }; + + context.app = new Application(); + context.app.bind(Identifiers.State.Store).toConstantValue(context.stateStore); + context.app.bind(Identifiers.Services.Log.Service).toConstantValue(context.logger); + + context.handler = context.app.resolve(CommitHandler); + }); + + it("sets the block number on the state store", async ({ handler, stateStore }) => { + const setBlockNumber = spy(stateStore, "setBlockNumber"); + + await handler.handle(123); + + setBlockNumber.calledOnce(); + setBlockNumber.calledWith(123); + }); +}); diff --git a/packages/evm-api-worker/source/handlers/set-peer-count.test.ts b/packages/evm-api-worker/source/handlers/set-peer-count.test.ts new file mode 100644 index 000000000..5c5e3350a --- /dev/null +++ b/packages/evm-api-worker/source/handlers/set-peer-count.test.ts @@ -0,0 +1,26 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { SetPeerCountHandler } from "./set-peer-count"; + +describe<{ + app: Application; + handler: SetPeerCountHandler; + state: any; +}>("SetPeerCountHandler", ({ assert, beforeEach, it }) => { + beforeEach((context) => { + context.state = { peerCount: 0 }; + + context.app = new Application(); + context.app.bind(Identifiers.Evm.State).toConstantValue(context.state); + + context.handler = context.app.resolve(SetPeerCountHandler); + }); + + it("stores the peer count on the evm state", async ({ handler, state }) => { + await handler.handle(7); + + assert.equal(state.peerCount, 7); + }); +}); diff --git a/packages/evm-api-worker/source/handlers/start.test.ts b/packages/evm-api-worker/source/handlers/start.test.ts new file mode 100644 index 000000000..9d46e9661 --- /dev/null +++ b/packages/evm-api-worker/source/handlers/start.test.ts @@ -0,0 +1,96 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { StartHandler } from "./start"; + +describe<{ + app: Application; + handler: StartHandler; + store: any; + httpServer: any; + httpsServer: any; + enabled: { http: boolean; https: boolean }; +}>("StartHandler", ({ beforeEach, it, spy }) => { + beforeEach((context) => { + context.store = { setBlockNumber: () => {} }; + context.httpServer = { boot: async () => {} }; + context.httpsServer = { boot: async () => {} }; + context.enabled = { http: false, https: false }; + + const configuration = { + getRequired: (key: string) => + key === "server.http.enabled" ? context.enabled.http : context.enabled.https, + }; + + // Application binds itself as Application.Instance, so the handler resolves the servers + // off the same container the test binds them into. + context.app = new Application(); + context.app.bind(Identifiers.State.Store).toConstantValue(context.store); + context.app.bind(Identifiers.Evm.API.HTTP).toConstantValue(context.httpServer); + context.app.bind(Identifiers.Evm.API.HTTPS).toConstantValue(context.httpsServer); + context.app + .bind(Identifiers.ServiceProvider.Configuration) + .toConstantValue(configuration) + .whenTagged("plugin", "api-evm"); + + context.handler = context.app.resolve(StartHandler); + }); + + it("sets the block number", async ({ handler, store }) => { + const setBlockNumber = spy(store, "setBlockNumber"); + + await handler.handle(42); + + setBlockNumber.calledOnce(); + setBlockNumber.calledWith(42); + }); + + it("does not boot any server when neither http nor https is enabled", async ({ + handler, + httpServer, + httpsServer, + }) => { + const http = spy(httpServer, "boot"); + const https = spy(httpsServer, "boot"); + + await handler.handle(42); + + http.neverCalled(); + https.neverCalled(); + }); + + it("boots only the http server when http is enabled", async ({ handler, enabled, httpServer, httpsServer }) => { + enabled.http = true; + const http = spy(httpServer, "boot"); + const https = spy(httpsServer, "boot"); + + await handler.handle(42); + + http.calledOnce(); + https.neverCalled(); + }); + + it("boots only the https server when https is enabled", async ({ handler, enabled, httpServer, httpsServer }) => { + enabled.https = true; + const http = spy(httpServer, "boot"); + const https = spy(httpsServer, "boot"); + + await handler.handle(42); + + http.neverCalled(); + https.calledOnce(); + }); + + it("boots both servers when http and https are enabled", async ({ handler, enabled, httpServer, httpsServer }) => { + enabled.http = true; + enabled.https = true; + const http = spy(httpServer, "boot"); + const https = spy(httpsServer, "boot"); + + await handler.handle(42); + + http.calledOnce(); + https.calledOnce(); + }); +}); diff --git a/packages/evm-api-worker/source/service-provider.test.ts b/packages/evm-api-worker/source/service-provider.test.ts new file mode 100644 index 000000000..b57318f47 --- /dev/null +++ b/packages/evm-api-worker/source/service-provider.test.ts @@ -0,0 +1,105 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application, Ipc } from "@mainsail/kernel"; +import { EventEmitter } from "events"; +import esmock from "esmock"; +import Joi from "joi"; +import { PassThrough } from "stream"; + +import { describe } from "@mainsail/test-runner"; + +// Records every `new Worker(...)` so the factory test can assert how the thread is spawned. +const constructions: any[][] = []; + +// Stand-in for worker_threads.Worker: an EventEmitter exposing the stdout/stderr streams and +// threadId that Ipc.Subprocess reads, so the real Subprocess wraps it without a real thread. +class FakeWorker extends EventEmitter { + public threadId = 1; + public readonly stdout = new PassThrough(); + public readonly stderr = new PassThrough(); + + public constructor(...arguments_: any[]) { + super(); + constructions.push(arguments_); + } + + public postMessage(): void {} + public async terminate(): Promise { + return 0; + } +} + +// Load the provider with worker_threads.Worker swapped for the fake; the real Ipc.Subprocess +// and ./worker.js stay in place. +const { ServiceProvider } = await esmock("./service-provider", { + worker_threads: { Worker: FakeWorker }, +}); + +describe<{ + app: Application; + serviceProvider: any; + worker: any; +}>("ServiceProvider", ({ assert, beforeEach, it, spy, stub }) => { + beforeEach((context) => { + constructions.length = 0; + context.worker = { boot: async () => {}, dispose: async () => {} }; + + context.app = new Application(); + context.app.bind(Identifiers.Config.Flags).toConstantValue({ network: "testnet" }); + // Ipc.Subprocess resolves the logger from the container when the factory runs. + context.app.bind(Identifiers.Services.Log.Service).toConstantValue({ debug: () => {}, error: () => {} }); + + context.serviceProvider = context.app.resolve(ServiceProvider); + + // register() resolves the WorkerInstance, whose @postConstruct would invoke the factory + // and spawn. Intercept that resolution so only the explicit factory call below runs it. + stub(context.app, "resolve").returnValue(context.worker); + }); + + it("register binds the worker subprocess factory and the worker", async (context) => { + assert.false(context.app.isBound(Identifiers.Evm.WorkerSubprocess.Factory)); + assert.false(context.app.isBound(Identifiers.Evm.Worker)); + + await context.serviceProvider.register(); + + assert.true(context.app.isBound(Identifiers.Evm.WorkerSubprocess.Factory)); + assert.true(context.app.isBound(Identifiers.Evm.Worker)); + assert.function(context.app.get(Identifiers.Evm.WorkerSubprocess.Factory)); + assert.equal(context.app.get(Identifiers.Evm.Worker), context.worker); + }); + + it("the subprocess factory spawns the worker script with piped stdio and wraps it in an Ipc.Subprocess", async (context) => { + await context.serviceProvider.register(); + + const factory = context.app.get(Identifiers.Evm.WorkerSubprocess.Factory) as () => Ipc.Subprocess; + const subprocess = factory(); + + assert.length(constructions, 1); + const [scriptPath, options] = constructions[0]; + assert.true(scriptPath.endsWith("worker-script.js")); + assert.equal(options, { stderr: true, stdout: true }); + assert.instance(subprocess, Ipc.Subprocess); + }); + + it("boot delegates to the worker with the flags and the thread name", async (context) => { + await context.serviceProvider.register(); + const boot = spy(context.worker, "boot"); + + await context.serviceProvider.boot(); + + boot.calledOnce(); + boot.calledWith({ network: "testnet", thread: "evm-api" }); + }); + + it("dispose delegates to the worker", async (context) => { + await context.serviceProvider.register(); + const dispose = spy(context.worker, "dispose"); + + await context.serviceProvider.dispose(); + + dispose.calledOnce(); + }); + + it("is required", async (context) => { + assert.true(await context.serviceProvider.required()); + }); +}); diff --git a/packages/evm-api-worker/source/service-provider.ts b/packages/evm-api-worker/source/service-provider.ts index 5a8d1cc20..c8f586525 100644 --- a/packages/evm-api-worker/source/service-provider.ts +++ b/packages/evm-api-worker/source/service-provider.ts @@ -3,7 +3,6 @@ import type { Contracts } from "@mainsail/contracts"; import { Identifiers } from "@mainsail/constants"; import { inject, injectable } from "@mainsail/container"; import { Ipc, Providers } from "@mainsail/kernel"; -import Joi from "joi"; import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; @@ -40,8 +39,4 @@ export class ServiceProvider extends Providers.ServiceProvider { public async required(): Promise { return true; } - - public configSchema(): Joi.AnySchema { - return Joi.object({}).required().unknown(true); - } } diff --git a/packages/evm-api-worker/source/worker-handler.test.ts b/packages/evm-api-worker/source/worker-handler.test.ts new file mode 100644 index 000000000..a785f34d7 --- /dev/null +++ b/packages/evm-api-worker/source/worker-handler.test.ts @@ -0,0 +1,70 @@ +import { Application } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { CommitHandler, SetPeerCountHandler, StartHandler } from "./handlers/index.js"; +import { WorkerScriptHandler } from "./worker-handler"; + +describe<{ + subject: WorkerScriptHandler; + handler: any; + resolve: any; +}>("WorkerScriptHandler", ({ beforeEach, it, spy, stub }) => { + beforeEach((context) => { + // WorkerScriptHandler owns a private `new Application()`; stub the prototype so the + // handler resolutions and lifecycle calls stay in-process. + context.handler = { handle: async () => {} }; + context.resolve = stub(Application.prototype, "resolve").returnValue(context.handler); + + context.subject = new WorkerScriptHandler(); + }); + + it("boot bootstraps the app with the flags and boots it", async ({ subject }) => { + const bootstrap = stub(Application.prototype, "bootstrap").resolvedValue(undefined); + const boot = stub(Application.prototype, "boot").resolvedValue(undefined); + const flags = { network: "testnet" } as any; + + await subject.boot(flags); + + bootstrap.calledWith({ flags }); + boot.calledOnce(); + }); + + it("dispose terminates the app", async ({ subject }) => { + const terminate = stub(Application.prototype, "terminate").resolvedValue(undefined); + + await subject.dispose(); + + terminate.calledOnce(); + }); + + it("start resolves the StartHandler and forwards the height", async ({ subject, handler, resolve }) => { + const handle = spy(handler, "handle"); + + await subject.start(42); + + resolve.calledWith(StartHandler); + handle.calledWith(42); + }); + + it("setPeerCount resolves the SetPeerCountHandler and forwards the count", async ({ + subject, + handler, + resolve, + }) => { + const handle = spy(handler, "handle"); + + await subject.setPeerCount(5); + + resolve.calledWith(SetPeerCountHandler); + handle.calledWith(5); + }); + + it("commit resolves the CommitHandler and forwards the height", async ({ subject, handler, resolve }) => { + const handle = spy(handler, "handle"); + + await subject.commit(99); + + resolve.calledWith(CommitHandler); + handle.calledWith(99); + }); +}); diff --git a/packages/evm-api-worker/source/worker-script.test.ts b/packages/evm-api-worker/source/worker-script.test.ts new file mode 100644 index 000000000..f7cf7777b --- /dev/null +++ b/packages/evm-api-worker/source/worker-script.test.ts @@ -0,0 +1,10 @@ +import { describe } from "@mainsail/test-runner"; + +// worker-script.ts is the worker thread entrypoint: importing it wires an Ipc.Handler to a +// WorkerScriptHandler. On the main thread parentPort is null, so the handler registers no +// listener — the import should simply complete without throwing. +describe("WorkerScript", ({ assert, it }) => { + it("loads without throwing", async () => { + await assert.resolves(() => import("./worker-script.js")); + }); +}); diff --git a/packages/evm-api-worker/source/worker.test.ts b/packages/evm-api-worker/source/worker.test.ts new file mode 100644 index 000000000..43fc1bf15 --- /dev/null +++ b/packages/evm-api-worker/source/worker.test.ts @@ -0,0 +1,140 @@ +import { Events, Identifiers } from "@mainsail/constants"; +import { Application } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { Worker } from "./worker"; + +describe<{ + app: Application; + worker: Worker; + ipc: any; + eventDispatcher: any; + p2pRepository: any; +}>("Worker", ({ assert, beforeEach, it, spy }) => { + beforeEach((context) => { + context.ipc = { + dispose: async () => 0, + drain: async () => {}, + getQueueSize: () => 3, + kill: async () => 7, + registerEventHandler: () => {}, + sendRequest: async () => {}, + }; + context.eventDispatcher = { listen: () => {} }; + context.p2pRepository = { getPeers: () => [] }; + + context.app = new Application(); + // The injected factory hands back our fake subprocess instead of spawning a thread. + context.app.bind(Identifiers.Evm.WorkerSubprocess.Factory).toConstantValue(() => context.ipc); + context.app.bind(Identifiers.Services.EventDispatcher.Service).toConstantValue(context.eventDispatcher); + context.app.bind(Identifiers.P2P.Peer.Repository).toConstantValue(context.p2pRepository); + + context.worker = context.app.resolve(Worker); + }); + + it("initialize subscribes to the peer added and removed events", ({ app, eventDispatcher }) => { + // initialize() ran during resolve() in beforeEach; assert its side effects on a fresh resolve. + const listen = spy(eventDispatcher, "listen"); + const fresh = app.resolve(Worker); + + listen.calledTimes(2); + listen.calledNthWith(0, Events.PeerEvent.Added, fresh); + listen.calledNthWith(1, Events.PeerEvent.Removed, fresh); + }); + + it("boot sends a single boot request and memoizes it", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + const flags = { thread: "evm-api" } as any; + + await worker.boot(flags); + await worker.boot(flags); + + sendRequest.calledOnce(); + sendRequest.calledWith("boot", flags); + }); + + it("dispose drains, requests an inner dispose, then terminates the subprocess", async ({ worker, ipc }) => { + const drain = spy(ipc, "drain"); + const sendRequest = spy(ipc, "sendRequest"); + const dispose = spy(ipc, "dispose"); + + await worker.dispose(); + + drain.calledOnce(); + sendRequest.calledWith("dispose"); + dispose.calledOnce(); + }); + + it("dispose still terminates the subprocess when the inner dispose request fails", async ({ worker, ipc }) => { + ipc.sendRequest = async () => { + throw new Error("worker already gone"); + }; + const dispose = spy(ipc, "dispose"); + + await assert.resolves(() => worker.dispose()); + + dispose.calledOnce(); + }); + + it("dispose is memoized across calls", async ({ worker, ipc }) => { + const drain = spy(ipc, "drain"); + + await worker.dispose(); + await worker.dispose(); + + drain.calledOnce(); + }); + + it("kill terminates the subprocess and returns its exit code", async ({ worker, ipc }) => { + const kill = spy(ipc, "kill"); + + assert.equal(await worker.kill(), 7); + kill.calledOnce(); + }); + + it("getQueueSize reports the subprocess queue size", ({ worker }) => { + assert.equal(worker.getQueueSize(), 3); + }); + + it("registerEventHandler forwards to the subprocess", ({ worker, ipc }) => { + const register = spy(ipc, "registerEventHandler"); + const callback = () => {}; + + worker.registerEventHandler("some-event", callback); + + register.calledWith("some-event", callback); + }); + + it("start requests start with the block number", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + + await worker.start(42); + + sendRequest.calledWith("start", 42); + }); + + it("onCommit requests commit with the unit block number", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + + await worker.onCommit({ blockNumber: 99 } as any); + + sendRequest.calledWith("commit", 99); + }); + + it("setPeerCount requests setPeerCount with the count", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + + await worker.setPeerCount(5); + + sendRequest.calledWith("setPeerCount", 5); + }); + + it("handle relays the current peer count to the subprocess", async ({ worker, ipc, p2pRepository }) => { + p2pRepository.getPeers = () => [{}, {}, {}]; + const sendRequest = spy(ipc, "sendRequest"); + + await worker.handle({ data: {}, name: "peer.added" }); + + sendRequest.calledWith("setPeerCount", 3); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 07fc34c97..4dbb00e1b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2053,9 +2053,6 @@ importers: '@mainsail/kernel': specifier: workspace:* version: link:../kernel - joi: - specifier: 18.2.1 - version: 18.2.1 devDependencies: '@mainsail/contracts': specifier: workspace:* @@ -2063,6 +2060,9 @@ importers: '@mainsail/test-runner': specifier: workspace:* version: link:../test-runner + esmock: + specifier: 2.7.5 + version: 2.7.5 uvu: specifier: 0.5.6 version: 0.5.6