From 71357a31831d6701c6116232b74d5edc4de69050 Mon Sep 17 00:00:00 2001 From: FishEnjoyer2025 Date: Tue, 2 Jun 2026 17:27:53 -0500 Subject: [PATCH] feat: add external-replication plugin #72 --- plugins/external-replication/README.md | 49 ++++ plugins/external-replication/index.test.ts | 158 ++++++++++++ plugins/external-replication/index.ts | 278 +++++++++++++++++++++ 3 files changed, 485 insertions(+) create mode 100644 plugins/external-replication/README.md create mode 100644 plugins/external-replication/index.test.ts create mode 100644 plugins/external-replication/index.ts diff --git a/plugins/external-replication/README.md b/plugins/external-replication/README.md new file mode 100644 index 0000000..837ffca --- /dev/null +++ b/plugins/external-replication/README.md @@ -0,0 +1,49 @@ +# External Replication Plugin + +Pulls data from a StarbaseDB instance's configured **external** data source (e.g. a +Postgres on Supabase) into the **internal** Durable-Object SQLite database, turning the +instance into a close-to-edge read replica that can be queried locally instead of +round-tripping to the external database. + +Implements the pull-based replication described in +[#72](https://github.com/outerbase/starbasedb/issues/72). + +## Usage + +Register the plugin with the tables you want replicated: + +```ts +import { ExternalReplicationPlugin } from './plugins/external-replication' + +new ExternalReplicationPlugin({ + tables: [ + // Full snapshot every run: + { name: 'products' }, + // Incremental — only rows whose `updated_at` advanced since the last run: + { name: 'orders', cursorColumn: 'updated_at', batchSize: 2000 }, + ], +}) +``` + +- **`cursorColumn`** (optional): a monotonically-increasing column (`updated_at`, `id`, …). + When set, only new/changed rows are pulled each run and the last value is persisted in + the `_starbasedb_replication_state` table. Omit it to re-pull the whole table each run. +- **`batchSize`** (optional, default `5000`): rows pulled per run; large tables drain + across successive runs. + +Writes use `INSERT OR REPLACE`, so runs are **idempotent** and safe to retry. + +## Triggering + +- **Manual:** `POST /replicate` (all configured tables) or `POST /replicate/:table` (one + table). Admin only. +- **On an interval:** add a + [Cloudflare Cron Trigger](https://developers.cloudflare.com/workers/configuration/cron-triggers/) + and invoke the endpoint (or call `replicateAll(...)` directly) from your Worker's + `scheduled()` handler: + +```toml +# wrangler.toml +[triggers] +crons = ["*/5 * * * *"] # every 5 minutes +``` diff --git a/plugins/external-replication/index.test.ts b/plugins/external-replication/index.test.ts new file mode 100644 index 0000000..a495ad5 --- /dev/null +++ b/plugins/external-replication/index.test.ts @@ -0,0 +1,158 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +// Mock the two execution dependencies BEFORE importing the plugin (vi.mock is hoisted). +vi.mock('../../src/operation', () => ({ + executeExternalQuery: vi.fn(), +})) +vi.mock('../../src/export', () => ({ + executeOperation: vi.fn(), +})) + +import { ExternalReplicationPlugin } from './index' +import { executeExternalQuery } from '../../src/operation' +import { executeOperation } from '../../src/export' +import { DataSource } from '../../src/types' +import { StarbaseDBConfiguration } from '../../src/handler' + +const config = { role: 'admin' } as StarbaseDBConfiguration +const dataSource = { + rpc: {}, + source: 'external', + external: { dialect: 'postgresql' }, +} as unknown as DataSource + +function upsertCallFor(table: string) { + return vi + .mocked(executeOperation) + .mock.calls.find( + (args: any[]) => + Array.isArray(args[0]) && + typeof args[0][0]?.sql === 'string' && + args[0][0].sql.startsWith(`INSERT OR REPLACE INTO "${table}"`) + ) +} + +beforeEach(() => { + vi.clearAllMocks() + // Default: internal state lookups return nothing -> cursor starts null. + vi.mocked(executeOperation).mockResolvedValue([] as any) +}) + +describe('ExternalReplicationPlugin - buildSelectQuery', () => { + const plugin = new ExternalReplicationPlugin() + + it('full snapshot when no cursorColumn', () => { + const { sql, params } = plugin.buildSelectQuery({ name: 'users' }, null) + expect(sql).toBe('SELECT * FROM "users" LIMIT 5000') + expect(params).toEqual([]) + }) + + it('incremental first run (cursor column, no saved cursor) omits WHERE', () => { + const { sql, params } = plugin.buildSelectQuery( + { name: 'users', cursorColumn: 'updated_at' }, + null + ) + expect(sql).toBe( + 'SELECT * FROM "users" ORDER BY "updated_at" ASC LIMIT 5000' + ) + expect(params).toEqual([]) + }) + + it('incremental with saved cursor adds WHERE + param + custom batch', () => { + const { sql, params } = plugin.buildSelectQuery( + { name: 'users', cursorColumn: 'updated_at', batchSize: 100 }, + '2024-01-01' + ) + expect(sql).toBe( + 'SELECT * FROM "users" WHERE "updated_at" > ? ORDER BY "updated_at" ASC LIMIT 100' + ) + expect(params).toEqual(['2024-01-01']) + }) + + it('quotes identifiers defensively', () => { + const { sql } = plugin.buildSelectQuery({ name: 'we"ird' }, null) + expect(sql).toContain('"we""ird"') + }) +}) + +describe('ExternalReplicationPlugin - buildUpsertQueries', () => { + const plugin = new ExternalReplicationPlugin() + + it('builds an idempotent INSERT OR REPLACE per row', () => { + const writes = plugin.buildUpsertQueries('users', [ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + ]) + expect(writes).toHaveLength(2) + expect(writes[0].sql).toBe( + 'INSERT OR REPLACE INTO "users" ("id", "name") VALUES (?, ?)' + ) + expect(writes[0].params).toEqual([1, 'Alice']) + expect(writes[1].params).toEqual([2, 'Bob']) + }) +}) + +describe('ExternalReplicationPlugin - replicateTable', () => { + it('pulls from external, upserts into internal, advances the cursor', async () => { + const plugin = new ExternalReplicationPlugin() + vi.mocked(executeExternalQuery).mockResolvedValue([ + { id: 5, name: 'Eve' }, + { id: 6, name: 'Frank' }, + ]) + + const result = await plugin.replicateTable( + { name: 'users', cursorColumn: 'id' }, + dataSource, + config + ) + + expect(result).toEqual({ table: 'users', rowsReplicated: 2, cursor: 6 }) + expect(executeExternalQuery).toHaveBeenCalledTimes(1) + + const upsert = upsertCallFor('users') + expect(upsert).toBeTruthy() + expect(upsert![0]).toHaveLength(2) + + // cursor persisted to the state table + const stateWrite = vi + .mocked(executeOperation) + .mock.calls.find((a: any[]) => + a[0]?.[0]?.sql?.includes('_starbasedb_replication_state') + ) + expect(stateWrite).toBeTruthy() + }) + + it('no rows -> no upsert and rowsReplicated 0', async () => { + const plugin = new ExternalReplicationPlugin() + vi.mocked(executeExternalQuery).mockResolvedValue([]) + + const result = await plugin.replicateTable( + { name: 'users' }, + dataSource, + config + ) + + expect(result.rowsReplicated).toBe(0) + expect(upsertCallFor('users')).toBeFalsy() + }) + + it('resumes from a previously saved cursor', async () => { + const plugin = new ExternalReplicationPlugin() + // state lookup returns a saved cursor for this run + vi.mocked(executeOperation).mockResolvedValueOnce([] as any) // ensureStateTable + vi.mocked(executeOperation).mockResolvedValueOnce([ + { last_cursor: '100' }, + ] as any) // getLastCursor + vi.mocked(executeExternalQuery).mockResolvedValue([{ id: 101 }]) + + await plugin.replicateTable( + { name: 'events', cursorColumn: 'id' }, + dataSource, + config + ) + + const externalArgs = vi.mocked(executeExternalQuery).mock.calls[0][0] + expect(externalArgs.sql).toContain('WHERE "id" > ?') + expect(externalArgs.params).toEqual(['100']) + }) +}) diff --git a/plugins/external-replication/index.ts b/plugins/external-replication/index.ts new file mode 100644 index 0000000..9f40fc4 --- /dev/null +++ b/plugins/external-replication/index.ts @@ -0,0 +1,278 @@ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { DataSource } from '../../src/types' +import { executeExternalQuery } from '../../src/operation' +import { executeOperation } from '../../src/export' + +/** + * Configuration for a single table to replicate from the external data source + * into the internal (Durable Object) SQLite database. + */ +export interface ReplicationTableConfig { + /** Table name. Must exist on the external source and in the internal DB. */ + name: string + /** + * Optional monotonically-increasing column (e.g. `updated_at`, `id`) used to + * pull only NEW/CHANGED rows since the last run. When omitted, the whole + * table is re-pulled every run (full snapshot). + */ + cursorColumn?: string + /** Max rows pulled per run (large tables drain across runs). Default 5000. */ + batchSize?: number +} + +export interface ExternalReplicationOptions { + tables?: ReplicationTableConfig[] +} + +export interface ReplicationResult { + table: string + rowsReplicated: number + cursor: string | number | null +} + +/** Internal bookkeeping table that remembers the last replicated cursor per table. */ +export const REPLICATION_STATE_TABLE = '_starbasedb_replication_state' + +/** + * ExternalReplicationPlugin + * + * Pulls data from the instance's configured EXTERNAL data source (e.g. a Postgres + * on Supabase) into the INTERNAL Durable-Object SQLite database, so a StarbaseDB + * instance can serve as a close-to-edge read replica that can be queried instead + * of round-tripping to the external database. + * + * Design (per the maintainer's notes on the issue): + * - PULL based — the external source needs no changes / no per-provider push. + * - Per-table configuration; optional INCREMENTAL pulls via a `cursorColumn`. + * - Idempotent `INSERT OR REPLACE` upserts so re-runs and retries are safe. + * - Batched (`batchSize`) so very large tables drain across successive runs. + * - Exposes `POST /replicate` (all tables) and `POST /replicate/:table`. Invoke + * manually, or on an interval via a Cloudflare Cron Trigger (see README). + */ +export class ExternalReplicationPlugin extends StarbasePlugin { + public prefix: string = '/replicate' + public tables: ReplicationTableConfig[] + + constructor(opts?: ExternalReplicationOptions) { + super('starbasedb:external-replication', { requiresAuth: true }) + this.tables = opts?.tables ?? [] + } + + override async register(app: StarbaseApp) { + // Replicate every configured table. Admin only. + app.post(this.prefix, async (c) => { + const config = c.get('config') as StarbaseDBConfiguration + const dataSource = c.get('dataSource') as DataSource + + if (config?.role !== 'admin') { + return c.json({ error: 'Unauthorized request' }, 401) + } + + const tables = await this.replicateAll(dataSource, config) + return c.json({ success: true, tables }) + }) + + // Replicate a single configured table by name. + app.post(`${this.prefix}/:table`, async (c) => { + const config = c.get('config') as StarbaseDBConfiguration + const dataSource = c.get('dataSource') as DataSource + + if (config?.role !== 'admin') { + return c.json({ error: 'Unauthorized request' }, 401) + } + + const name = c.req.param('table') + const table = this.tables.find((t) => t.name === name) + if (!table) { + return c.json( + { + error: `Table '${name}' is not configured for replication`, + }, + 404 + ) + } + + const result = await this.replicateTable(table, dataSource, config) + return c.json({ success: true, ...result }) + }) + } + + /** Replicate all configured tables sequentially, returning a per-table summary. */ + async replicateAll( + dataSource: DataSource, + config: StarbaseDBConfiguration + ): Promise { + const results: ReplicationResult[] = [] + for (const table of this.tables) { + results.push(await this.replicateTable(table, dataSource, config)) + } + return results + } + + /** + * Replicate one table: pull a batch of new rows from the external source, + * upsert them into the internal database, then advance the saved cursor. + */ + async replicateTable( + table: ReplicationTableConfig, + dataSource: DataSource, + config: StarbaseDBConfiguration + ): Promise { + const externalDataSource: DataSource = { + ...dataSource, + source: 'external', + } + const internalDataSource: DataSource = { + ...dataSource, + source: 'internal', + external: undefined, + } + + await this.ensureStateTable(internalDataSource, config) + const lastCursor = await this.getLastCursor( + table.name, + internalDataSource, + config + ) + + // 1. Pull a batch of new/changed rows from the external source. + const { sql, params } = this.buildSelectQuery(table, lastCursor) + const rows: Record[] = await executeExternalQuery({ + sql, + params, + dataSource: externalDataSource, + config, + }) + + if (!rows || rows.length === 0) { + return { table: table.name, rowsReplicated: 0, cursor: lastCursor } + } + + // 2. Upsert the rows into the internal database in a single transaction. + const writes = this.buildUpsertQueries(table.name, rows) + await executeOperation(writes, internalDataSource, config) + + // 3. Advance + persist the cursor (only for incremental tables). + let cursor = lastCursor + if (table.cursorColumn) { + cursor = rows[rows.length - 1][table.cursorColumn] as + | string + | number + | null + await this.setLastCursor( + table.name, + cursor, + internalDataSource, + config + ) + } + + return { table: table.name, rowsReplicated: rows.length, cursor } + } + + /** Build the incremental SELECT against the external source. Pure + unit-testable. */ + buildSelectQuery( + table: ReplicationTableConfig, + lastCursor: string | number | null + ): { sql: string; params: unknown[] } { + const limit = table.batchSize ?? 5000 + const params: unknown[] = [] + let sql = `SELECT * FROM ${quoteIdent(table.name)}` + + if ( + table.cursorColumn && + lastCursor !== null && + lastCursor !== undefined + ) { + sql += ` WHERE ${quoteIdent(table.cursorColumn)} > ?` + params.push(lastCursor) + } + if (table.cursorColumn) { + sql += ` ORDER BY ${quoteIdent(table.cursorColumn)} ASC` + } + sql += ` LIMIT ${limit}` + + return { sql, params } + } + + /** Build idempotent INSERT-OR-REPLACE writes for the internal DB. Pure + unit-testable. */ + buildUpsertQueries( + tableName: string, + rows: Record[] + ): { sql: string; params: unknown[] }[] { + return rows.map((row) => { + const columns = Object.keys(row) + const placeholders = columns.map(() => '?').join(', ') + const columnList = columns.map(quoteIdent).join(', ') + return { + sql: `INSERT OR REPLACE INTO ${quoteIdent(tableName)} (${columnList}) VALUES (${placeholders})`, + params: columns.map((col) => row[col]), + } + }) + } + + private async ensureStateTable( + internalDataSource: DataSource, + config: StarbaseDBConfiguration + ) { + await executeOperation( + [ + { + sql: `CREATE TABLE IF NOT EXISTS ${REPLICATION_STATE_TABLE} (table_name TEXT PRIMARY KEY, last_cursor TEXT, last_synced_at TEXT)`, + }, + ], + internalDataSource, + config + ) + } + + private async getLastCursor( + tableName: string, + internalDataSource: DataSource, + config: StarbaseDBConfiguration + ): Promise { + const result = await executeOperation( + [ + { + sql: `SELECT last_cursor FROM ${REPLICATION_STATE_TABLE} WHERE table_name = ?`, + params: [tableName], + }, + ], + internalDataSource, + config + ) + + const rows = (Array.isArray(result) ? result.flat() : []) as { + last_cursor?: string | number | null + }[] + return rows[0]?.last_cursor ?? null + } + + private async setLastCursor( + tableName: string, + cursor: string | number | null, + internalDataSource: DataSource, + config: StarbaseDBConfiguration + ) { + await executeOperation( + [ + { + sql: `INSERT OR REPLACE INTO ${REPLICATION_STATE_TABLE} (table_name, last_cursor, last_synced_at) VALUES (?, ?, ?)`, + params: [ + tableName, + cursor === null ? null : String(cursor), + new Date().toISOString(), + ], + }, + ], + internalDataSource, + config + ) + } +} + +/** Quote a SQL identifier (table/column) defensively. */ +function quoteIdent(name: string): string { + return '"' + String(name).replace(/"/g, '""') + '"' +}