From b5a02d130f8da6235f64a719e8d81903c5531076 Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sat, 30 May 2026 12:54:07 +0200 Subject: [PATCH 1/8] feat(infra): add Cosmos cluster storage --- .changeset/calm-cosmos-cluster.md | 5 + packages/infra/src/ClusterCosmos.ts | 945 ++++++++++++++++++++++++++++ 2 files changed, 950 insertions(+) create mode 100644 .changeset/calm-cosmos-cluster.md create mode 100644 packages/infra/src/ClusterCosmos.ts diff --git a/.changeset/calm-cosmos-cluster.md b/.changeset/calm-cosmos-cluster.md new file mode 100644 index 000000000..6ca46d941 --- /dev/null +++ b/.changeset/calm-cosmos-cluster.md @@ -0,0 +1,5 @@ +--- +"@effect-app/infra": patch +--- + +Add Cosmos DB backed storage layers for Effect Cluster message and runner state. diff --git a/packages/infra/src/ClusterCosmos.ts b/packages/infra/src/ClusterCosmos.ts new file mode 100644 index 000000000..11dc71caf --- /dev/null +++ b/packages/infra/src/ClusterCosmos.ts @@ -0,0 +1,945 @@ +import * as Arr from "effect-app/Array" +import * as Effect from "effect-app/Effect" +import * as Layer from "effect-app/Layer" +import * as Option from "effect-app/Option" +import * as Duration from "effect/Duration" +import * as Redacted from "effect/Redacted" +import { PersistenceError } from "effect/unstable/cluster/ClusterError" +import type * as Envelope from "effect/unstable/cluster/Envelope" +import * as MessageStorage from "effect/unstable/cluster/MessageStorage" +import { SaveResultEncoded } from "effect/unstable/cluster/MessageStorage" +import type * as Reply from "effect/unstable/cluster/Reply" +import * as RunnerStorage from "effect/unstable/cluster/RunnerStorage" +import * as ShardId from "effect/unstable/cluster/ShardId" +import * as ShardingConfig from "effect/unstable/cluster/ShardingConfig" +import * as Snowflake from "effect/unstable/cluster/Snowflake" +import { CosmosClient, CosmosClientLayer } from "./cosmos-client.js" +import { annotateCosmosResponse, annotateDb } from "./otel.js" + +export interface ClusterCosmosConfig { + readonly url: Redacted.Redacted + readonly dbName: string + readonly prefix?: string +} + +type MessageKind = "Request" | "AckChunk" | "Interrupt" +type CosmosQueryValue = string | number | boolean | null | Array +type CosmosParameter = { readonly name: string; readonly value: CosmosQueryValue } + +interface MessageDoc { + readonly id: string + readonly _partitionKey: string + readonly type: "message" + readonly rowid: string + readonly messageId: string | null + readonly shardId: string + readonly entityType: string + readonly entityId: string + readonly kind: MessageKind + readonly tag: string | null + readonly payload: unknown + readonly headers: Record | null + readonly traceId?: string | undefined + readonly spanId?: string | undefined + readonly sampled?: boolean | undefined + processed: boolean + readonly requestId: string + readonly replyId: string | null + lastReplyId: string | null + lastRead: number | null + readonly deliverAt: number | null + readonly _etag?: string +} + +type ReplyDoc = WithExitReplyDoc | ChunkReplyDoc + +interface ReplyDocBase { + readonly id: string + readonly _partitionKey: string + readonly type: "reply" + readonly rowid: string + readonly requestId: string + acked: boolean +} + +interface WithExitReplyDoc extends ReplyDocBase { + readonly kind: "WithExit" + readonly payload: Reply.WithExitEncoded["exit"] + readonly sequence: null +} + +interface ChunkReplyDoc extends ReplyDocBase { + readonly kind: "Chunk" + readonly payload: Reply.ChunkEncoded["values"] + readonly sequence: number +} + +interface RunnerDoc { + readonly id: string + readonly _partitionKey: "runner" + readonly type: "runner" + readonly address: string + runner: string + healthy: boolean + lastHeartbeat: number +} + +interface LockDoc { + readonly id: string + readonly _partitionKey: "lock" + readonly type: "lock" + readonly shardId: string + address: string + acquiredAt: number + readonly _etag?: string +} + +const withTracerDisabled = Effect.withTracerEnabled(false) +const refailPersistence = (effect: Effect.Effect) => PersistenceError.refail(effect) +const messagePartition = (shardId: string) => `message::${shardId}` +const messageDocId = (envelope: Envelope.Encoded, primaryKey: string | null) => + primaryKey === null ? envelopeId(envelope) : `primary::${primaryKey}` +const replyPartition = (requestId: string) => `reply::${requestId}` +const runnerDocId = (address: string) => `runner::${address}` +const lockDocId = (shardId: string) => `lock::${shardId}` +const tenMinutes = Duration.toMillis(Duration.minutes(10)) + +const isCosmosStatus = (u: unknown, code: number) => + typeof u === "object" && u !== null && "code" in u && u.code === code + +const isConflict = (u: unknown) => isCosmosStatus(u, 409) +const isNotFound = (u: unknown) => isCosmosStatus(u, 404) +const isPreconditionFailed = (u: unknown) => isCosmosStatus(u, 412) + +const respBytes = ( + resp: { diagnostics?: { clientSideRequestStatistics?: { totalResponsePayloadLengthInBytes?: number } } } +) => resp.diagnostics?.clientSideRequestStatistics?.totalResponsePayloadLengthInBytes ?? 0 + +const annotateItem = (resp: { + readonly requestCharge?: number + readonly statusCode?: number + readonly diagnostics?: { + readonly clientSideRequestStatistics?: { readonly totalResponsePayloadLengthInBytes?: number } + } +}) => + annotateCosmosResponse({ + requestCharge: resp.requestCharge, + statusCode: resp.statusCode, + responseBytes: respBytes(resp) + }) + +const annotateFeed = (resp: { + readonly resources: readonly unknown[] + readonly requestCharge?: number + readonly diagnostics?: { + readonly clientSideRequestStatistics?: { readonly totalResponsePayloadLengthInBytes?: number } + } +}) => + annotateCosmosResponse({ + requestCharge: resp.requestCharge, + returnedRows: resp.resources.length, + responseBytes: respBytes(resp) + }) + +const envelopeId = (envelope: Envelope.Encoded) => envelope._tag === "Request" ? envelope.requestId : envelope.id + +const envelopeToDoc = ( + envelope: Envelope.Encoded, + primaryKey: string | null, + deliverAt: number | null +): MessageDoc => { + switch (envelope._tag) { + case "Request": + return { + id: messageDocId(envelope, primaryKey), + _partitionKey: messagePartition(ShardId.toString(envelope.address.shardId)), + type: "message", + rowid: envelope.requestId, + messageId: primaryKey, + shardId: ShardId.toString(envelope.address.shardId), + entityType: envelope.address.entityType, + entityId: envelope.address.entityId, + kind: "Request", + tag: envelope.tag, + payload: envelope.payload, + headers: envelope.headers, + traceId: envelope.traceId, + spanId: envelope.spanId, + sampled: envelope.sampled, + processed: false, + requestId: envelope.requestId, + replyId: null, + lastReplyId: null, + lastRead: null, + deliverAt + } + case "AckChunk": + return { + id: envelope.id, + _partitionKey: messagePartition(ShardId.toString(envelope.address.shardId)), + type: "message", + rowid: envelope.id, + messageId: primaryKey, + shardId: ShardId.toString(envelope.address.shardId), + entityType: envelope.address.entityType, + entityId: envelope.address.entityId, + kind: "AckChunk", + tag: null, + payload: null, + headers: null, + processed: false, + requestId: envelope.requestId, + replyId: envelope.replyId, + lastReplyId: null, + lastRead: null, + deliverAt + } + case "Interrupt": + return { + id: envelope.id, + _partitionKey: messagePartition(ShardId.toString(envelope.address.shardId)), + type: "message", + rowid: envelope.id, + messageId: primaryKey, + shardId: ShardId.toString(envelope.address.shardId), + entityType: envelope.address.entityType, + entityId: envelope.address.entityId, + kind: "Interrupt", + tag: null, + payload: null, + headers: null, + processed: false, + requestId: envelope.requestId, + replyId: null, + lastReplyId: null, + lastRead: null, + deliverAt + } + } +} + +const envelopeFromDoc = ( + doc: MessageDoc, + lastSentReply: Option.Option +): { + readonly envelope: Envelope.Encoded + readonly lastSentReply: Option.Option +} => { + switch (doc.kind) { + case "Request": { + const envelope: Envelope.PartialRequestEncoded = { + _tag: "Request", + requestId: doc.requestId, + address: { + shardId: shardIdFromString(doc.shardId), + entityType: doc.entityType, + entityId: doc.entityId + }, + tag: doc.tag ?? "", + payload: doc.payload, + headers: doc.headers ?? {}, + ...(doc.traceId !== undefined && { traceId: doc.traceId }), + ...(doc.spanId !== undefined && { spanId: doc.spanId }), + ...(doc.sampled !== undefined && { sampled: doc.sampled }) + } + return { + envelope, + lastSentReply + } + } + case "AckChunk": + return { + envelope: { + _tag: "AckChunk", + id: doc.id, + requestId: doc.requestId, + replyId: doc.replyId ?? "", + address: { + shardId: shardIdFromString(doc.shardId), + entityType: doc.entityType, + entityId: doc.entityId + } + }, + lastSentReply: Option.none() + } + case "Interrupt": + return { + envelope: { + _tag: "Interrupt", + id: doc.id, + requestId: doc.requestId, + address: { + shardId: shardIdFromString(doc.shardId), + entityType: doc.entityType, + entityId: doc.entityId + } + }, + lastSentReply: Option.none() + } + } +} + +const replyToDoc = (reply: Reply.Encoded): ReplyDoc => + reply._tag === "WithExit" + ? { + id: reply.id, + _partitionKey: replyPartition(reply.requestId), + type: "reply", + rowid: reply.id, + kind: "WithExit", + requestId: reply.requestId, + payload: reply.exit, + sequence: null, + acked: false + } + : { + id: reply.id, + _partitionKey: replyPartition(reply.requestId), + type: "reply", + rowid: reply.id, + kind: "Chunk", + requestId: reply.requestId, + payload: reply.values, + sequence: reply.sequence, + acked: false + } + +const replyFromDoc = (doc: ReplyDoc): Reply.Encoded => + doc.kind === "WithExit" + ? { + _tag: "WithExit", + id: doc.id, + requestId: doc.requestId, + exit: doc.payload + } + : { + _tag: "Chunk", + id: doc.id, + requestId: doc.requestId, + values: doc.payload, + sequence: doc.sequence ?? 0 + } + +const shardIdFromString = (shardId: string): Envelope.Encoded["address"]["shardId"] => + ShardId.fromStringEncoded(shardId) + +const makeMachineId = (address: string) => { + let hash = 0 + for (let i = 0; i < address.length; i++) { + hash = Math.imul(31, hash) + address.charCodeAt(i) | 0 + } + return Math.abs(hash) +} + +const createContainer = (prefix: string) => + Effect.fnUntraced(function*() { + const { db } = yield* CosmosClient + const containerId = `${prefix}cluster` + yield* Effect.promise(() => + db.containers.createIfNotExists({ + id: containerId, + partitionKey: { paths: ["/_partitionKey"], version: 2 } + }) + ) + return db.container(containerId) + }) + +export const makeMessageStorage = Effect.fnUntraced(function*(options?: { + readonly prefix?: string | undefined +}) { + const prefix = options?.prefix ?? "cluster-" + const container = yield* createContainer(prefix)() + const containerId = `${prefix}cluster` + const annotate = (operation: string) => + annotateDb({ operation, system: "cosmosdb", collection: containerId, entity: "cluster-message-storage" }) + + const readMessage = (id: string, partitionKey: string) => + Effect.promise(() => container.item(id, partitionKey).read()).pipe( + Effect.tap(annotateItem), + Effect.map((resp) => Option.fromNullishOr(resp.resource)), + Effect.catchIf(isNotFound, () => Effect.succeed(Option.none())) + ) + + const queryMessages = (query: string, parameters: ReadonlyArray) => + Effect + .promise(() => container.items.query({ query, parameters: Array.from(parameters) }).fetchAll()) + .pipe( + Effect.tap(annotateFeed), + Effect.map((resp) => resp.resources) + ) + + const queryReplies = (query: string, parameters: ReadonlyArray) => + Effect + .promise(() => container.items.query({ query, parameters: Array.from(parameters) }).fetchAll()) + .pipe( + Effect.tap(annotateFeed), + Effect.map((resp) => resp.resources) + ) + + const lastReply = (replyId: string | null) => + replyId === null + ? Effect.succeed(Option.none()) + : queryReplies("SELECT * FROM c WHERE c.type = 'reply' AND c.id = @id", [{ name: "@id", value: replyId }]).pipe( + Effect.map((docs) => Option.map(Option.fromNullishOr(docs[0]), replyFromDoc)) + ) + + const markReplyAcked = (requestId: string, replyId: string) => + Effect.promise(() => container.item(replyId, replyPartition(requestId)).read()).pipe( + Effect.flatMap((resp) => { + const doc = resp.resource + if (!doc) return Effect.void + doc.acked = true + return Effect + .promise(() => + container.item(replyId, replyPartition(requestId)).replace(doc, { + accessCondition: { type: "IfMatch", condition: doc._etag ?? "" } + }) + ) + .pipe(Effect.tap(annotateItem), Effect.asVoid) + }), + Effect.catchIf(isNotFound, () => Effect.void), + Effect.catchIf(isPreconditionFailed, () => Effect.void) + ) + + const replaceMessage = (doc: MessageDoc) => + Effect + .promise(() => + container.item(doc.id, doc._partitionKey).replace(doc, { + accessCondition: { type: "IfMatch", condition: doc._etag ?? "" } + }) + ) + .pipe(Effect.tap(annotateItem), Effect.asVoid) + + return yield* MessageStorage.makeEncoded({ + saveEnvelope: ({ deliverAt, envelope, primaryKey }) => + Effect + .gen(function*() { + const doc = envelopeToDoc(envelope, primaryKey, deliverAt) + if (envelope._tag === "AckChunk") { + yield* markReplyAcked(envelope.requestId, envelope.replyId) + const pendingAcks = yield* queryMessages( + "SELECT * FROM c WHERE c.type = 'message' AND c.kind = 'AckChunk' AND c.processed = false AND c.requestId = @requestId", + [{ name: "@requestId", value: envelope.requestId }] + ) + yield* Effect.forEach(pendingAcks, (ack) => { + ack.processed = true + return replaceMessage(ack) + }, { discard: true }) + } + return yield* Effect.promise(() => container.items.create(doc)).pipe( + Effect.tap(annotateItem), + Effect.as(SaveResultEncoded.Success()), + Effect.catchIf(isConflict, () => + readMessage(doc.id, doc._partitionKey).pipe( + Effect.flatMap((found) => + Option.match(found, { + onNone: () => Effect.succeed(SaveResultEncoded.Success()), + onSome: (existing) => + lastReply(existing.lastReplyId).pipe( + Effect.map((lastReceivedReply) => + SaveResultEncoded.Duplicate({ + originalId: Snowflake.Snowflake(existing.requestId), + lastReceivedReply + }) + ) + ) + }) + ) + )) + ) + }) + .pipe(annotate("saveEnvelope"), refailPersistence, withTracerDisabled), + + saveReply: (reply) => + Effect + .gen(function*() { + const doc = replyToDoc(reply) + yield* Effect.promise(() => container.items.create(doc)).pipe( + Effect.tap(annotateItem), + Effect.catchIf(isConflict, () => Effect.void) + ) + const messages = yield* queryMessages( + "SELECT * FROM c WHERE c.type = 'message' AND c.requestId = @requestId", + [{ name: "@requestId", value: reply.requestId }] + ) + yield* Effect.forEach(messages, (message) => { + if (reply._tag === "WithExit") { + message.processed = true + } else if (message.id !== reply.requestId && message.kind !== "Request") { + return Effect.void + } + message.lastReplyId = reply.id + return replaceMessage(message).pipe(Effect.catchIf(isPreconditionFailed, () => Effect.void)) + }, { discard: true }) + }) + .pipe(annotate("saveReply"), refailPersistence, withTracerDisabled), + + clearReplies: (requestId) => + Effect + .gen(function*() { + const id = String(requestId) + const replies = yield* queryReplies( + "SELECT * FROM c WHERE c.type = 'reply' AND c.requestId = @requestId AND c.kind = 'Chunk'", + [ + { name: "@requestId", value: id } + ] + ) + yield* Effect.forEach(replies, (reply) => + Effect + .promise(() => container.item(reply.id, reply._partitionKey).delete()) + .pipe( + Effect.tap(annotateItem), + Effect.catchIf(isNotFound, () => Effect.void) + ), { discard: true }) + const messages = yield* queryMessages( + "SELECT * FROM c WHERE c.type = 'message' AND c.requestId = @requestId", + [{ name: "@requestId", value: id }] + ) + yield* Effect.forEach(messages, (message) => { + if (message.kind === "Interrupt") { + return Effect.promise(() => container.item(message.id, message._partitionKey).delete()).pipe( + Effect.tap(annotateItem), + Effect.catchIf(isNotFound, () => Effect.void) + ) + } + message.processed = false + message.lastReplyId = null + message.lastRead = null + return replaceMessage(message).pipe(Effect.catchIf(isPreconditionFailed, () => Effect.void)) + }, { discard: true }) + }) + .pipe(annotate("clearReplies"), refailPersistence, withTracerDisabled), + + requestIdForPrimaryKey: (primaryKey) => + queryMessages("SELECT * FROM c WHERE c.type = 'message' AND c.messageId = @primaryKey", [ + { name: "@primaryKey", value: primaryKey } + ]) + .pipe( + Effect.map((docs) => Option.map(Option.fromNullishOr(docs[0]?.requestId), Snowflake.Snowflake)), + annotate("requestIdForPrimaryKey"), + refailPersistence, + withTracerDisabled + ), + + repliesFor: (requestIds) => + queryReplies( + "SELECT * FROM c WHERE c.type = 'reply' AND ARRAY_CONTAINS(@requestIds, c.requestId) AND (c.kind = 'WithExit' OR (c.kind = 'Chunk' AND c.acked = false)) ORDER BY c.rowid", + [{ name: "@requestIds", value: Array.from(requestIds) }] + ) + .pipe( + Effect.map(Arr.map(replyFromDoc)), + annotate("repliesFor"), + refailPersistence, + withTracerDisabled + ), + + repliesForUnfiltered: (requestIds) => + queryReplies( + "SELECT * FROM c WHERE c.type = 'reply' AND ARRAY_CONTAINS(@requestIds, c.requestId) ORDER BY c.rowid", + [{ name: "@requestIds", value: Array.from(requestIds) }] + ) + .pipe( + Effect.map(Arr.map(replyFromDoc)), + annotate("repliesForUnfiltered"), + refailPersistence, + withTracerDisabled + ), + + unprocessedMessages: (shardIds, now) => + queryMessages( + "SELECT * FROM c WHERE c.type = 'message' AND ARRAY_CONTAINS(@shardIds, c.shardId) AND c.processed = false AND (NOT IS_DEFINED(c.lastRead) OR IS_NULL(c.lastRead) OR c.lastRead < @lastReadBefore) AND (NOT IS_DEFINED(c.deliverAt) OR IS_NULL(c.deliverAt) OR c.deliverAt <= @now) ORDER BY c.rowid", + [ + { name: "@shardIds", value: Array.from(shardIds) }, + { name: "@lastReadBefore", value: now - tenMinutes }, + { name: "@now", value: now } + ] + ) + .pipe( + Effect.flatMap((docs) => collectUnprocessed(docs, now, lastReply, replaceMessage, queryReplies)), + annotate("unprocessedMessages"), + refailPersistence, + withTracerDisabled + ), + + unprocessedMessagesById: (messageIds, now) => + queryMessages( + "SELECT * FROM c WHERE c.type = 'message' AND (ARRAY_CONTAINS(@messageIds, c.id) OR ARRAY_CONTAINS(@messageIds, c.requestId)) AND c.processed = false AND (NOT IS_DEFINED(c.deliverAt) OR IS_NULL(c.deliverAt) OR c.deliverAt <= @now) ORDER BY c.rowid", + [ + { name: "@messageIds", value: Array.from(messageIds, String) }, + { name: "@now", value: now } + ] + ) + .pipe( + Effect.flatMap((docs) => collectUnprocessed(docs, now, lastReply, replaceMessage, queryReplies)), + annotate("unprocessedMessagesById"), + refailPersistence, + withTracerDisabled + ), + + resetAddress: (address) => + queryMessages( + "SELECT * FROM c WHERE c.type = 'message' AND c.processed = false AND c.shardId = @shardId AND c.entityType = @entityType AND c.entityId = @entityId", + [ + { name: "@shardId", value: ShardId.toString(address.shardId) }, + { name: "@entityType", value: address.entityType }, + { name: "@entityId", value: address.entityId } + ] + ) + .pipe( + Effect.flatMap((docs) => + Effect.forEach(docs, (doc) => { + doc.lastRead = null + return replaceMessage(doc).pipe(Effect.catchIf(isPreconditionFailed, () => Effect.void)) + }, { discard: true }) + ), + annotate("resetAddress"), + refailPersistence, + withTracerDisabled + ), + + clearAddress: (address) => + queryMessages( + "SELECT * FROM c WHERE c.type = 'message' AND c.entityType = @entityType AND c.entityId = @entityId", + [ + { name: "@entityType", value: address.entityType }, + { name: "@entityId", value: address.entityId } + ] + ) + .pipe( + Effect.flatMap((messages) => + Effect.forEach(messages, (message) => + queryReplies("SELECT * FROM c WHERE c.type = 'reply' AND c.requestId = @requestId", [ + { + name: "@requestId", + value: message + .requestId + } + ]) + .pipe( + Effect + .flatMap((replies) => + Effect + .forEach(replies, (reply) => + Effect + .promise(() => + container.item(reply.id, reply._partitionKey).delete() + ) + .pipe( + Effect.tap(annotateItem), + Effect.catchIf(isNotFound, () => Effect.void) + ), { discard: true }) + ), + Effect.andThen( + Effect.promise(() => container.item(message.id, message._partitionKey).delete()).pipe( + Effect.tap(annotateItem), + Effect.catchIf(isNotFound, () => Effect.void) + ) + ) + ), { discard: true }) + ), + annotate("clearAddress"), + refailPersistence, + withTracerDisabled + ), + + resetShards: (shardIds) => + queryMessages( + "SELECT * FROM c WHERE c.type = 'message' AND c.processed = false AND ARRAY_CONTAINS(@shardIds, c.shardId)", + [{ name: "@shardIds", value: Array.from(shardIds) }] + ) + .pipe( + Effect.flatMap((docs) => + Effect.forEach(docs, (doc) => { + doc.lastRead = null + return replaceMessage(doc).pipe(Effect.catchIf(isPreconditionFailed, () => Effect.void)) + }, { discard: true }) + ), + annotate("resetShards"), + refailPersistence, + withTracerDisabled + ), + + withTransaction: (effect) => effect + }) +}) + +const collectUnprocessed = ( + docs: ReadonlyArray, + now: number, + lastReply: (replyId: string | null) => Effect.Effect>, + replaceMessage: (doc: MessageDoc) => Effect.Effect, + queryReplies: ( + query: string, + parameters: ReadonlyArray + ) => Effect.Effect> +) => + Effect.gen(function*() { + const messages: Array<{ + readonly envelope: Envelope.Encoded + readonly lastSentReply: Option.Option + }> = [] + for (const doc of docs) { + const replies = yield* queryReplies( + "SELECT * FROM c WHERE c.type = 'reply' AND c.requestId = @requestId AND (c.kind = 'WithExit' OR (c.kind = 'Chunk' AND c.acked = false))", + [{ name: "@requestId", value: doc.requestId }] + ) + if (Arr.isArrayNonEmpty(replies)) continue + const sentReply = yield* lastReply(doc.lastReplyId) + doc.lastRead = now + yield* replaceMessage(doc).pipe(Effect.catchIf(isPreconditionFailed, () => Effect.void)) + messages.push(envelopeFromDoc(doc, sentReply)) + } + return messages + }) + +export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { + readonly prefix?: string | undefined +}) { + const prefix = options?.prefix ?? "cluster-" + const container = yield* createContainer(prefix)() + const config = yield* ShardingConfig.ShardingConfig + const expires = Duration.toMillis(Duration.fromInputUnsafe(config.shardLockExpiration)) + const containerId = `${prefix}cluster` + const annotate = (operation: string) => + annotateDb({ operation, system: "cosmosdb", collection: containerId, entity: "cluster-runner-storage" }) + + const queryRunners = (query: string, parameters: ReadonlyArray) => + Effect + .promise(() => + container + .items + .query({ query, parameters: Array.from(parameters) }, { partitionKey: "runner" }) + .fetchAll() + ) + .pipe(Effect.tap(annotateFeed), Effect.map((resp) => resp.resources)) + + const readLock = (shardId: string) => + Effect.promise(() => container.item(lockDocId(shardId), "lock").read()).pipe( + Effect.tap(annotateItem), + Effect.map((resp) => Option.fromNullishOr(resp.resource)), + Effect.catchIf(isNotFound, () => Effect.succeed(Option.none())) + ) + + const writeLock = (doc: LockDoc) => + Effect + .promise(() => + container.item(doc.id, "lock").replace(doc, { + accessCondition: { type: "IfMatch", condition: doc._etag ?? "" } + }) + ) + .pipe( + Effect.tap(annotateItem), + Effect.as(true), + Effect.catchIf(isPreconditionFailed, () => Effect.succeed(false)) + ) + + const createLock = (address: string, shardId: string, now: number) => + Effect + .promise(() => + container.items.create({ + id: lockDocId(shardId), + _partitionKey: "lock", + type: "lock", + shardId, + address, + acquiredAt: now + }) + ) + .pipe( + Effect.tap(annotateItem), + Effect.as(true), + Effect.catchIf(isConflict, () => Effect.succeed(false)) + ) + + const tryAcquire = (address: string, shardId: string, now: number) => + readLock(shardId).pipe( + Effect.flatMap((lock) => + Option.match(lock, { + onNone: () => createLock(address, shardId, now), + onSome: (doc) => { + if (doc.address !== address && now - doc.acquiredAt <= expires) { + return Effect.succeed(false) + } + doc.address = address + doc.acquiredAt = now + return writeLock(doc) + } + }) + ), + Effect.map((acquired) => acquired ? Option.some(shardId) : Option.none()) + ) + + return RunnerStorage.makeEncoded({ + getRunners: Effect.sync(() => Date.now()).pipe( + Effect.flatMap((now) => + queryRunners("SELECT * FROM c WHERE c.type = 'runner' AND c.lastHeartbeat > @expiresAt", [ + { name: "@expiresAt", value: now - expires } + ]) + ), + Effect.map((docs) => docs.map((doc) => [doc.runner, doc.healthy] as const)), + annotate("getRunners"), + refailPersistence, + withTracerDisabled + ), + + register: (address, runner, healthy) => + Effect.sync(() => Date.now()).pipe( + Effect.flatMap((now) => + Effect + .promise(() => + container.items.upsert({ + id: runnerDocId(address), + _partitionKey: "runner", + type: "runner", + address, + runner, + healthy, + lastHeartbeat: now + }) + ) + .pipe(Effect.tap(annotateItem)) + ), + Effect.as(makeMachineId(address)), + annotate("register"), + refailPersistence, + withTracerDisabled + ), + + unregister: (address) => + Effect.promise(() => container.item(runnerDocId(address), "runner").delete()).pipe( + Effect.tap(annotateItem), + Effect.catchIf(isNotFound, () => Effect.void), + annotate("unregister"), + refailPersistence, + withTracerDisabled + ), + + setRunnerHealth: (address, healthy) => + Effect.promise(() => container.item(runnerDocId(address), "runner").read()).pipe( + Effect.flatMap((resp) => { + const doc = resp.resource + if (!doc) return Effect.void + doc.healthy = healthy + return Effect.promise(() => container.item(doc.id, "runner").replace(doc)).pipe(Effect.tap(annotateItem)) + }), + Effect.asVoid, + Effect.catchIf(isNotFound, () => Effect.void), + annotate("setRunnerHealth"), + refailPersistence, + withTracerDisabled + ), + + acquire: (address, shardIds) => + Effect.sync(() => Date.now()).pipe( + Effect.flatMap((now) => Effect.forEach(shardIds, (shardId) => tryAcquire(address, shardId, now))), + Effect.map(Arr.getSomes), + annotate("acquire"), + refailPersistence, + withTracerDisabled + ), + + refresh: (address, shardIds) => + Effect + .gen(function*() { + const now = Date.now() + yield* Effect.promise(() => container.item(runnerDocId(address), "runner").read()).pipe( + Effect.flatMap((resp) => { + const doc = resp.resource + if (!doc) return Effect.void + doc.lastHeartbeat = now + return Effect.promise(() => container.item(doc.id, "runner").replace(doc)).pipe(Effect.tap(annotateItem)) + }), + Effect.catchIf(isNotFound, () => Effect.void) + ) + const refreshed = yield* Effect.forEach(shardIds, (shardId) => + readLock(shardId).pipe( + Effect.flatMap((lock) => + Option.match(lock, { + onNone: () => Effect.succeed(Option.none()), + onSome: (doc) => { + if (doc.address !== address) return Effect.succeed(Option.none()) + doc.acquiredAt = now + return writeLock(doc).pipe(Effect.map((ok) => ok ? Option.some(shardId) : Option.none())) + } + }) + ) + )) + return Arr.getSomes(refreshed) + }) + .pipe(annotate("refresh"), refailPersistence, withTracerDisabled), + + release: (address, shardId) => + readLock(shardId).pipe( + Effect.flatMap((lock) => + Option.match(lock, { + onNone: () => Effect.void, + onSome: (doc) => + doc.address === address + ? Effect.promise(() => container.item(doc.id, "lock").delete()).pipe( + Effect.tap(annotateItem), + Effect.catchIf(isNotFound, () => Effect.void), + Effect.asVoid + ) + : Effect.void + }) + ), + annotate("release"), + refailPersistence, + withTracerDisabled + ), + + releaseAll: (address) => + Effect + .promise(() => + container + .items + .query({ + query: "SELECT * FROM c WHERE c.type = 'lock' AND c.address = @address", + parameters: [{ name: "@address", value: address }] + }, { partitionKey: "lock" }) + .fetchAll() + ) + .pipe( + Effect.tap(annotateFeed), + Effect.flatMap((resp) => + Effect.forEach(resp.resources, (doc) => + Effect.promise(() => container.item(doc.id, "lock").delete()).pipe( + Effect.tap(annotateItem), + Effect.catchIf(isNotFound, () => Effect.void) + ), { discard: true }) + ), + annotate("releaseAll"), + refailPersistence, + withTracerDisabled + ) + }) +}) + +export const layerMessageStorage = (options?: { + readonly prefix?: string | undefined +}): Layer.Layer => + Layer.effect(MessageStorage.MessageStorage, makeMessageStorage(options)).pipe( + Layer.provide(Snowflake.layerGenerator) + ) + +export const layerRunnerStorage = (options?: { + readonly prefix?: string | undefined +}): Layer.Layer => + Layer.effect(RunnerStorage.RunnerStorage, makeRunnerStorage(options)) + +export const layerStorage = (options?: { + readonly prefix?: string | undefined +}): Layer.Layer< + MessageStorage.MessageStorage | RunnerStorage.RunnerStorage, + never, + CosmosClient | ShardingConfig.ShardingConfig +> => Layer.merge(layerMessageStorage(options), layerRunnerStorage(options)) + +export const layerCosmos = (config: ClusterCosmosConfig): Layer.Layer< + MessageStorage.MessageStorage | RunnerStorage.RunnerStorage, + never, + ShardingConfig.ShardingConfig +> => + layerStorage({ prefix: config.prefix }).pipe( + Layer.provide(CosmosClientLayer(Redacted.value(config.url), config.dbName)) + ) From f0a52c9868983efd04ac463317cf2fc38f23c0a7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 30 May 2026 11:27:46 +0000 Subject: [PATCH 2/8] docs(infra): add Cosmos cluster storage comparison --- packages/infra/docs/cluster-storage.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 packages/infra/docs/cluster-storage.md diff --git a/packages/infra/docs/cluster-storage.md b/packages/infra/docs/cluster-storage.md new file mode 100644 index 000000000..cbe4d3a6e --- /dev/null +++ b/packages/infra/docs/cluster-storage.md @@ -0,0 +1,26 @@ +# Cluster storage backends (Cosmos vs SQL) + +`@effect-app/infra` now ships Cosmos-backed cluster storage: + +- [`ClusterCosmos.layerMessageStorage`](../src/ClusterCosmos.ts) +- [`ClusterCosmos.layerRunnerStorage`](../src/ClusterCosmos.ts) +- [`ClusterCosmos.layer`](../src/ClusterCosmos.ts) +- [`ClusterCosmos.layerCosmos`](../src/ClusterCosmos.ts) + +The closest baseline in Effect is `SqlMessageStorage` + `SqlRunnerStorage`. + +## Comparison + +| Aspect | `ClusterCosmos` | `SqlMessageStorage` + `SqlRunnerStorage` | +| ------------------- | ------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------- | +| Backend dependency | Azure Cosmos DB | SQL database via `SqlClient` | +| Schema management | Container-only document model (no SQL migrations) | Creates / migrates SQL tables (`messages`, `replies`, `runners`, `locks`, migrations table) | +| Message/reply model | JSON docs split by partition key (`message::shardId`, `reply::requestId`) | Relational rows with SQL indexes and joins | +| Locking strategy | Optimistic concurrency (`_etag`) on lock docs | Dialect-aware SQL locking (including advisory locks on pg/mysql when enabled) | +| Horizontal behavior | Throughput/cost depends on partitioning and cross-partition queries | Throughput/cost depends on SQL indexing, query plans, and connection limits | +| Operational fit | Best when Cosmos is already your system DB | Best when the cluster already runs on SQL infrastructure | + +## Practical guidance + +- Pick **`ClusterCosmos`** when your platform standard is Cosmos and you want to avoid introducing SQL just for cluster storage. +- Pick **SQL storage** when you already have strong SQL ops tooling and prefer table/migration based durability. From 4fac76d0f1a4050d536ff809126d0f537e90d10f Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sat, 30 May 2026 16:09:38 +0200 Subject: [PATCH 3/8] test(infra): cover Cosmos cluster storage --- packages/infra/test/cluster-cosmos.test.ts | 298 +++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 packages/infra/test/cluster-cosmos.test.ts diff --git a/packages/infra/test/cluster-cosmos.test.ts b/packages/infra/test/cluster-cosmos.test.ts new file mode 100644 index 000000000..41b4558db --- /dev/null +++ b/packages/infra/test/cluster-cosmos.test.ts @@ -0,0 +1,298 @@ +import { assert, describe, expect, it } from "@effect/vitest" +import { Context, Effect, Exit, Fiber, Latch, Layer, Option, Redacted, Schema } from "effect" +import { TestClock } from "effect/testing" +import { EntityAddress, EntityId, EntityType, Envelope, Message, MessageStorage, Reply, Runner, RunnerAddress, RunnerStorage, ShardId, ShardingConfig, Snowflake } from "effect/unstable/cluster" +import { Headers } from "effect/unstable/http" +import { Rpc, RpcSchema } from "effect/unstable/rpc" +import { layerCosmos } from "../src/ClusterCosmos.js" + +const cosmosUrl = process.env["COSMOS_TEST_URL"] +const cosmosDb = process.env["COSMOS_TEST_DB"] ?? "cluster-test" + +const layerFor = (label: string) => + layerCosmos({ + url: Redacted.make(cosmosUrl ?? ""), + dbName: cosmosDb, + prefix: `test-cluster-${Date.now()}-${label}-` + }) + .pipe( + Layer.provideMerge(Snowflake.layerGenerator), + Layer.provide(ShardingConfig.layerDefaults) + ) + +describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { + it.effect("deduplicates keyed requests and returns the last reply", () => + Effect + .gen(function*() { + const storage = yield* MessageStorage.MessageStorage + const request = yield* makeStreamRequest(123) + + const saved = yield* storage.saveRequest(request) + assert.strictEqual(saved._tag, "Success") + + const chunk = yield* makeChunkReply(request, 0) + yield* storage.saveReply(chunk) + + const duplicateWithChunk = yield* storage.saveRequest( + yield* makeStreamRequest(123) + ) + assert(duplicateWithChunk._tag === "Duplicate" && Option.isSome(duplicateWithChunk.lastReceivedReply)) + assert.strictEqual(duplicateWithChunk.lastReceivedReply.value._tag, "Chunk") + + const ackChunk = yield* makeAckChunk(request, chunk) + yield* storage.saveEnvelope(ackChunk) + const repliesAfterAck = yield* storage.repliesFor([request]) + assert.strictEqual(repliesAfterAck.length, 0) + + yield* storage.saveReply(yield* makeStreamReply(request)) + const duplicateWithExit = yield* storage.saveRequest( + yield* makeStreamRequest(123) + ) + assert(duplicateWithExit._tag === "Duplicate" && Option.isSome(duplicateWithExit.lastReceivedReply)) + assert.strictEqual(duplicateWithExit.lastReceivedReply.value._tag, "WithExit") + }) + .pipe(Effect.provide(layerFor("message-duplicate")))) + + it.effect("marks reads, resets shards, and excludes completed requests", () => + Effect + .gen(function*() { + const storage = yield* MessageStorage.MessageStorage + const request1 = yield* makeRequest({ payload: { id: 1 } }) + const request2 = yield* makeRequest({ payload: { id: 2 } }) + yield* storage.saveRequest(request1) + yield* storage.saveRequest(request2) + + let messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) + assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)), [1, 2]) + + messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) + assert.strictEqual(messages.length, 0) + + yield* storage.resetShards([request1.envelope.address.shardId]) + messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) + assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)), [1, 2]) + + yield* storage.saveReply(yield* makeReply(request1)) + yield* storage.resetShards([request1.envelope.address.shardId]) + messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) + assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)), [2]) + }) + .pipe(Effect.provide(layerFor("message-unprocessed")))) + + it.effect("notifies registered reply handlers", () => + Effect + .gen(function*() { + const storage = yield* MessageStorage.MessageStorage + const latch = yield* Latch.make() + const request = yield* makeRequest() + yield* storage.saveRequest(request) + + const fiber = yield* storage + .registerReplyHandler( + new Message.OutgoingRequest({ + ...request, + respond: () => latch.open + }) + ) + .pipe(Effect.forkChild) + + yield* TestClock.adjust(1) + yield* storage.saveReply(yield* makeReply(request)) + yield* latch.await + yield* Fiber.await(fiber) + }) + .pipe(Effect.provide(layerFor("message-handler")))) +}) + +describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => { + it.effect("registers runners and tracks health", () => + Effect + .gen(function*() { + const storage = yield* RunnerStorage.RunnerStorage + const runner = Runner.make({ + address: runnerAddress1, + groups: ["default"], + weight: 1 + }) + + const machineId1 = yield* storage.register(runner, true) + const machineId2 = yield* storage.register(runner, true) + assert.deepStrictEqual(machineId2, machineId1) + expect(yield* storage.getRunners).toEqual([[runner, true]]) + + yield* storage.setRunnerHealth(runnerAddress1, false) + expect(yield* storage.getRunners).toEqual([[runner, false]]) + + yield* storage.unregister(runnerAddress1) + expect(yield* storage.getRunners).toEqual([]) + }) + .pipe(Effect.provide(layerFor("runner-register")))) + + it.effect("acquires, refreshes, releases, and re-acquires shard locks", () => + Effect + .gen(function*() { + const storage = yield* RunnerStorage.RunnerStorage + const shards = [ + ShardId.make("default", 1), + ShardId.make("default", 2), + ShardId.make("default", 3) + ] + + let acquired = yield* storage.acquire(runnerAddress1, shards) + assert.deepStrictEqual(acquired.map((shard) => shard.id), [1, 2, 3]) + + acquired = yield* storage.acquire(runnerAddress2, shards) + assert.deepStrictEqual(acquired.map((shard) => shard.id), []) + + const refreshed = yield* storage.refresh(runnerAddress1, shards) + assert.deepStrictEqual(refreshed.map((shard) => shard.id), [1, 2, 3]) + + yield* storage.release(runnerAddress1, ShardId.make("default", 2)) + acquired = yield* storage.acquire(runnerAddress2, shards) + assert.deepStrictEqual(acquired.map((shard) => shard.id), [2]) + + yield* storage.releaseAll(runnerAddress1) + acquired = yield* storage.acquire(runnerAddress2, shards) + assert.deepStrictEqual(acquired.map((shard) => shard.id), [1, 2, 3]) + }) + .pipe(Effect.provide(layerFor("runner-locks")))) +}) + +const GetUserRpc = Rpc.make("GetUser", { + payload: { id: Schema.Number } +}) + +class StreamRpc extends Rpc.make("StreamTest", { + success: RpcSchema.Stream(Schema.Void, Schema.Never), + payload: { + id: Schema.Number + }, + primaryKey: (value) => value.id.toString() +}) {} + +const makeRequest = Effect.fnUntraced(function*(options?: { readonly payload?: { readonly id: number } }) { + const snowflake = yield* Snowflake.Generator + return new Message.OutgoingRequest({ + envelope: Envelope.makeRequest({ + requestId: snowflake.nextUnsafe(), + address: EntityAddress.make({ + shardId: ShardId.make("default", 1), + entityType: EntityType.make("test"), + entityId: EntityId.make("1") + }), + tag: GetUserRpc._tag, + payload: options?.payload ?? { id: 123 }, + traceId: "noop", + spanId: "noop", + sampled: false, + headers: Headers.empty + }), + annotations: GetUserRpc.annotations, + context: Context.empty(), + rpc: GetUserRpc, + lastReceivedReply: Option.none(), + respond() { + return Effect.void + } + }) +}) + +const makeStreamRequest = Effect.fnUntraced(function*(id: number) { + const snowflake = yield* Snowflake.Generator + return new Message.OutgoingRequest({ + envelope: Envelope.makeRequest({ + requestId: snowflake.nextUnsafe(), + address: EntityAddress.make({ + shardId: ShardId.make("default", 1), + entityType: EntityType.make("test"), + entityId: EntityId.make("1") + }), + tag: StreamRpc._tag, + payload: StreamRpc.payloadSchema.make({ id }), + traceId: "noop", + spanId: "noop", + sampled: false, + headers: Headers.empty + }), + annotations: StreamRpc.annotations, + context: Context.empty(), + rpc: StreamRpc, + lastReceivedReply: Option.none(), + respond() { + return Effect.void + } + }) +}) + +const makeReply = Effect.fnUntraced(function*(request: Message.OutgoingRequest) { + const snowflake = yield* Snowflake.Generator + return new Reply.ReplyWithContext({ + reply: new Reply.WithExit({ + id: snowflake.nextUnsafe(), + requestId: request.envelope.requestId, + exit: Exit.void + }), + context: request.context, + rpc: request.rpc + }) +}) + +const makeStreamReply = Effect.fnUntraced(function*(request: Message.OutgoingRequest) { + const snowflake = yield* Snowflake.Generator + return new Reply.ReplyWithContext({ + reply: new Reply.WithExit({ + id: snowflake.nextUnsafe(), + requestId: request.envelope.requestId, + exit: Exit.void + }), + context: request.context, + rpc: request.rpc + }) +}) + +const makeAckChunk = Effect.fnUntraced(function*( + request: Message.OutgoingRequest, + chunk: Reply.ReplyWithContext +) { + const snowflake = yield* Snowflake.Generator + return new Message.OutgoingEnvelope({ + envelope: new Envelope.AckChunk({ + id: snowflake.nextUnsafe(), + address: request.envelope.address, + requestId: chunk.reply.requestId, + replyId: chunk.reply.id + }), + rpc: request.rpc + }) +}) + +const makeChunkReply = Effect.fnUntraced(function*( + request: Message.OutgoingRequest, + sequence: number +) { + const snowflake = yield* Snowflake.Generator + return new Reply.ReplyWithContext({ + reply: new Reply.Chunk({ + id: snowflake.nextUnsafe(), + requestId: request.envelope.requestId, + sequence, + values: [undefined] + }), + context: request.context, + rpc: request.rpc + }) +}) + +const requestPayloadId = (message: Message.Incoming) => { + if (message.envelope._tag !== "Request") { + throw new Error(`Expected Request envelope`) + } + const envelope = message.envelope + assert(typeof envelope.payload === "object" && envelope.payload !== null) + assert("id" in envelope.payload) + assert.strictEqual(typeof envelope.payload.id, "number") + return envelope.payload.id +} + +const runnerAddress1 = RunnerAddress.make("localhost", 1234) +const runnerAddress2 = RunnerAddress.make("localhost", 5678) From b6e28a4da7c04a2982054f3570f2208c1d727bdc Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sat, 30 May 2026 16:39:49 +0200 Subject: [PATCH 4/8] fix(infra): tolerate Cosmos container conflict --- packages/infra/src/ClusterCosmos.ts | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/infra/src/ClusterCosmos.ts b/packages/infra/src/ClusterCosmos.ts index 11dc71caf..ff32a1aec 100644 --- a/packages/infra/src/ClusterCosmos.ts +++ b/packages/infra/src/ClusterCosmos.ts @@ -335,12 +335,14 @@ const createContainer = (prefix: string) => Effect.fnUntraced(function*() { const { db } = yield* CosmosClient const containerId = `${prefix}cluster` - yield* Effect.promise(() => - db.containers.createIfNotExists({ - id: containerId, - partitionKey: { paths: ["/_partitionKey"], version: 2 } - }) - ) + yield* Effect + .promise(() => + db.containers.create({ + id: containerId, + partitionKey: { paths: ["/_partitionKey"], version: 2 } + }) + ) + .pipe(Effect.catchIf(isConflict, () => Effect.void)) return db.container(containerId) }) From c5a7c7457bf1bf03e64766c01e75f043ca2370c8 Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sat, 30 May 2026 16:46:14 +0200 Subject: [PATCH 5/8] fix(infra): harden Cosmos cluster tests --- packages/infra/src/ClusterCosmos.ts | 105 +++++++++++---------- packages/infra/test/cluster-cosmos.test.ts | 78 +++++++++------ 2 files changed, 103 insertions(+), 80 deletions(-) diff --git a/packages/infra/src/ClusterCosmos.ts b/packages/infra/src/ClusterCosmos.ts index ff32a1aec..6ee7f555c 100644 --- a/packages/infra/src/ClusterCosmos.ts +++ b/packages/infra/src/ClusterCosmos.ts @@ -2,6 +2,7 @@ import * as Arr from "effect-app/Array" import * as Effect from "effect-app/Effect" import * as Layer from "effect-app/Layer" import * as Option from "effect-app/Option" +import * as Cause from "effect/Cause" import * as Duration from "effect/Duration" import * as Redacted from "effect/Redacted" import { PersistenceError } from "effect/unstable/cluster/ClusterError" @@ -96,16 +97,19 @@ interface LockDoc { const withTracerDisabled = Effect.withTracerEnabled(false) const refailPersistence = (effect: Effect.Effect) => PersistenceError.refail(effect) +const cosmosId = (id: string) => encodeURIComponent(id) const messagePartition = (shardId: string) => `message::${shardId}` const messageDocId = (envelope: Envelope.Encoded, primaryKey: string | null) => - primaryKey === null ? envelopeId(envelope) : `primary::${primaryKey}` + cosmosId(primaryKey === null ? envelopeId(envelope) : `primary::${primaryKey}`) const replyPartition = (requestId: string) => `reply::${requestId}` -const runnerDocId = (address: string) => `runner::${address}` -const lockDocId = (shardId: string) => `lock::${shardId}` +const runnerDocId = (address: string) => cosmosId(`runner::${address}`) +const lockDocId = (shardId: string) => cosmosId(`lock::${shardId}`) const tenMinutes = Duration.toMillis(Duration.minutes(10)) -const isCosmosStatus = (u: unknown, code: number) => - typeof u === "object" && u !== null && "code" in u && u.code === code +const isCosmosStatus = (u: unknown, code: number): boolean => + Cause.isUnknownError(u) + ? isCosmosStatus(u.cause, code) + : typeof u === "object" && u !== null && "code" in u && u.code === code const isConflict = (u: unknown) => isCosmosStatus(u, 409) const isNotFound = (u: unknown) => isCosmosStatus(u, 404) @@ -175,7 +179,7 @@ const envelopeToDoc = ( } case "AckChunk": return { - id: envelope.id, + id: cosmosId(envelope.id), _partitionKey: messagePartition(ShardId.toString(envelope.address.shardId)), type: "message", rowid: envelope.id, @@ -196,7 +200,7 @@ const envelopeToDoc = ( } case "Interrupt": return { - id: envelope.id, + id: cosmosId(envelope.id), _partitionKey: messagePartition(ShardId.toString(envelope.address.shardId)), type: "message", rowid: envelope.id, @@ -251,7 +255,7 @@ const envelopeFromDoc = ( return { envelope: { _tag: "AckChunk", - id: doc.id, + id: doc.rowid, requestId: doc.requestId, replyId: doc.replyId ?? "", address: { @@ -266,7 +270,7 @@ const envelopeFromDoc = ( return { envelope: { _tag: "Interrupt", - id: doc.id, + id: doc.rowid, requestId: doc.requestId, address: { shardId: shardIdFromString(doc.shardId), @@ -282,7 +286,7 @@ const envelopeFromDoc = ( const replyToDoc = (reply: Reply.Encoded): ReplyDoc => reply._tag === "WithExit" ? { - id: reply.id, + id: cosmosId(reply.id), _partitionKey: replyPartition(reply.requestId), type: "reply", rowid: reply.id, @@ -293,7 +297,7 @@ const replyToDoc = (reply: Reply.Encoded): ReplyDoc => acked: false } : { - id: reply.id, + id: cosmosId(reply.id), _partitionKey: replyPartition(reply.requestId), type: "reply", rowid: reply.id, @@ -308,13 +312,13 @@ const replyFromDoc = (doc: ReplyDoc): Reply.Encoded => doc.kind === "WithExit" ? { _tag: "WithExit", - id: doc.id, + id: doc.rowid, requestId: doc.requestId, exit: doc.payload } : { _tag: "Chunk", - id: doc.id, + id: doc.rowid, requestId: doc.requestId, values: doc.payload, sequence: doc.sequence ?? 0 @@ -336,7 +340,7 @@ const createContainer = (prefix: string) => const { db } = yield* CosmosClient const containerId = `${prefix}cluster` yield* Effect - .promise(() => + .tryPromise(() => db.containers.create({ id: containerId, partitionKey: { paths: ["/_partitionKey"], version: 2 } @@ -350,13 +354,13 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { readonly prefix?: string | undefined }) { const prefix = options?.prefix ?? "cluster-" - const container = yield* createContainer(prefix)() + const container = yield* createContainer(prefix)().pipe(Effect.orDie) const containerId = `${prefix}cluster` const annotate = (operation: string) => annotateDb({ operation, system: "cosmosdb", collection: containerId, entity: "cluster-message-storage" }) const readMessage = (id: string, partitionKey: string) => - Effect.promise(() => container.item(id, partitionKey).read()).pipe( + Effect.tryPromise(() => container.item(id, partitionKey).read()).pipe( Effect.tap(annotateItem), Effect.map((resp) => Option.fromNullishOr(resp.resource)), Effect.catchIf(isNotFound, () => Effect.succeed(Option.none())) @@ -364,7 +368,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { const queryMessages = (query: string, parameters: ReadonlyArray) => Effect - .promise(() => container.items.query({ query, parameters: Array.from(parameters) }).fetchAll()) + .tryPromise(() => container.items.query({ query, parameters: Array.from(parameters) }).fetchAll()) .pipe( Effect.tap(annotateFeed), Effect.map((resp) => resp.resources) @@ -372,7 +376,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { const queryReplies = (query: string, parameters: ReadonlyArray) => Effect - .promise(() => container.items.query({ query, parameters: Array.from(parameters) }).fetchAll()) + .tryPromise(() => container.items.query({ query, parameters: Array.from(parameters) }).fetchAll()) .pipe( Effect.tap(annotateFeed), Effect.map((resp) => resp.resources) @@ -381,19 +385,20 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { const lastReply = (replyId: string | null) => replyId === null ? Effect.succeed(Option.none()) - : queryReplies("SELECT * FROM c WHERE c.type = 'reply' AND c.id = @id", [{ name: "@id", value: replyId }]).pipe( - Effect.map((docs) => Option.map(Option.fromNullishOr(docs[0]), replyFromDoc)) - ) + : queryReplies("SELECT * FROM c WHERE c.type = 'reply' AND c.rowid = @id", [{ name: "@id", value: replyId }]) + .pipe( + Effect.map((docs) => Option.map(Option.fromNullishOr(docs[0]), replyFromDoc)) + ) const markReplyAcked = (requestId: string, replyId: string) => - Effect.promise(() => container.item(replyId, replyPartition(requestId)).read()).pipe( + Effect.tryPromise(() => container.item(cosmosId(replyId), replyPartition(requestId)).read()).pipe( Effect.flatMap((resp) => { const doc = resp.resource if (!doc) return Effect.void doc.acked = true return Effect - .promise(() => - container.item(replyId, replyPartition(requestId)).replace(doc, { + .tryPromise(() => + container.item(cosmosId(replyId), replyPartition(requestId)).replace(doc, { accessCondition: { type: "IfMatch", condition: doc._etag ?? "" } }) ) @@ -405,7 +410,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { const replaceMessage = (doc: MessageDoc) => Effect - .promise(() => + .tryPromise(() => container.item(doc.id, doc._partitionKey).replace(doc, { accessCondition: { type: "IfMatch", condition: doc._etag ?? "" } }) @@ -428,7 +433,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { return replaceMessage(ack) }, { discard: true }) } - return yield* Effect.promise(() => container.items.create(doc)).pipe( + return yield* Effect.tryPromise(() => container.items.create(doc)).pipe( Effect.tap(annotateItem), Effect.as(SaveResultEncoded.Success()), Effect.catchIf(isConflict, () => @@ -456,7 +461,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { Effect .gen(function*() { const doc = replyToDoc(reply) - yield* Effect.promise(() => container.items.create(doc)).pipe( + yield* Effect.tryPromise(() => container.items.create(doc)).pipe( Effect.tap(annotateItem), Effect.catchIf(isConflict, () => Effect.void) ) @@ -488,7 +493,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { ) yield* Effect.forEach(replies, (reply) => Effect - .promise(() => container.item(reply.id, reply._partitionKey).delete()) + .tryPromise(() => container.item(reply.id, reply._partitionKey).delete()) .pipe( Effect.tap(annotateItem), Effect.catchIf(isNotFound, () => Effect.void) @@ -499,7 +504,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { ) yield* Effect.forEach(messages, (message) => { if (message.kind === "Interrupt") { - return Effect.promise(() => container.item(message.id, message._partitionKey).delete()).pipe( + return Effect.tryPromise(() => container.item(message.id, message._partitionKey).delete()).pipe( Effect.tap(annotateItem), Effect.catchIf(isNotFound, () => Effect.void) ) @@ -623,7 +628,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { Effect .forEach(replies, (reply) => Effect - .promise(() => + .tryPromise(() => container.item(reply.id, reply._partitionKey).delete() ) .pipe( @@ -632,7 +637,7 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { ), { discard: true }) ), Effect.andThen( - Effect.promise(() => container.item(message.id, message._partitionKey).delete()).pipe( + Effect.tryPromise(() => container.item(message.id, message._partitionKey).delete()).pipe( Effect.tap(annotateItem), Effect.catchIf(isNotFound, () => Effect.void) ) @@ -665,15 +670,15 @@ export const makeMessageStorage = Effect.fnUntraced(function*(options?: { }) }) -const collectUnprocessed = ( +const collectUnprocessed = ( docs: ReadonlyArray, now: number, - lastReply: (replyId: string | null) => Effect.Effect>, - replaceMessage: (doc: MessageDoc) => Effect.Effect, + lastReply: (replyId: string | null) => Effect.Effect, E>, + replaceMessage: (doc: MessageDoc) => Effect.Effect, queryReplies: ( query: string, parameters: ReadonlyArray - ) => Effect.Effect> + ) => Effect.Effect, E> ) => Effect.gen(function*() { const messages: Array<{ @@ -698,7 +703,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { readonly prefix?: string | undefined }) { const prefix = options?.prefix ?? "cluster-" - const container = yield* createContainer(prefix)() + const container = yield* createContainer(prefix)().pipe(Effect.orDie) const config = yield* ShardingConfig.ShardingConfig const expires = Duration.toMillis(Duration.fromInputUnsafe(config.shardLockExpiration)) const containerId = `${prefix}cluster` @@ -707,7 +712,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { const queryRunners = (query: string, parameters: ReadonlyArray) => Effect - .promise(() => + .tryPromise(() => container .items .query({ query, parameters: Array.from(parameters) }, { partitionKey: "runner" }) @@ -716,7 +721,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { .pipe(Effect.tap(annotateFeed), Effect.map((resp) => resp.resources)) const readLock = (shardId: string) => - Effect.promise(() => container.item(lockDocId(shardId), "lock").read()).pipe( + Effect.tryPromise(() => container.item(lockDocId(shardId), "lock").read()).pipe( Effect.tap(annotateItem), Effect.map((resp) => Option.fromNullishOr(resp.resource)), Effect.catchIf(isNotFound, () => Effect.succeed(Option.none())) @@ -724,7 +729,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { const writeLock = (doc: LockDoc) => Effect - .promise(() => + .tryPromise(() => container.item(doc.id, "lock").replace(doc, { accessCondition: { type: "IfMatch", condition: doc._etag ?? "" } }) @@ -737,7 +742,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { const createLock = (address: string, shardId: string, now: number) => Effect - .promise(() => + .tryPromise(() => container.items.create({ id: lockDocId(shardId), _partitionKey: "lock", @@ -788,7 +793,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { Effect.sync(() => Date.now()).pipe( Effect.flatMap((now) => Effect - .promise(() => + .tryPromise(() => container.items.upsert({ id: runnerDocId(address), _partitionKey: "runner", @@ -808,7 +813,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { ), unregister: (address) => - Effect.promise(() => container.item(runnerDocId(address), "runner").delete()).pipe( + Effect.tryPromise(() => container.item(runnerDocId(address), "runner").delete()).pipe( Effect.tap(annotateItem), Effect.catchIf(isNotFound, () => Effect.void), annotate("unregister"), @@ -817,12 +822,12 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { ), setRunnerHealth: (address, healthy) => - Effect.promise(() => container.item(runnerDocId(address), "runner").read()).pipe( + Effect.tryPromise(() => container.item(runnerDocId(address), "runner").read()).pipe( Effect.flatMap((resp) => { const doc = resp.resource if (!doc) return Effect.void doc.healthy = healthy - return Effect.promise(() => container.item(doc.id, "runner").replace(doc)).pipe(Effect.tap(annotateItem)) + return Effect.tryPromise(() => container.item(doc.id, "runner").replace(doc)).pipe(Effect.tap(annotateItem)) }), Effect.asVoid, Effect.catchIf(isNotFound, () => Effect.void), @@ -844,12 +849,14 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { Effect .gen(function*() { const now = Date.now() - yield* Effect.promise(() => container.item(runnerDocId(address), "runner").read()).pipe( + yield* Effect.tryPromise(() => container.item(runnerDocId(address), "runner").read()).pipe( Effect.flatMap((resp) => { const doc = resp.resource if (!doc) return Effect.void doc.lastHeartbeat = now - return Effect.promise(() => container.item(doc.id, "runner").replace(doc)).pipe(Effect.tap(annotateItem)) + return Effect.tryPromise(() => container.item(doc.id, "runner").replace(doc)).pipe( + Effect.tap(annotateItem) + ) }), Effect.catchIf(isNotFound, () => Effect.void) ) @@ -877,7 +884,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { onNone: () => Effect.void, onSome: (doc) => doc.address === address - ? Effect.promise(() => container.item(doc.id, "lock").delete()).pipe( + ? Effect.tryPromise(() => container.item(doc.id, "lock").delete()).pipe( Effect.tap(annotateItem), Effect.catchIf(isNotFound, () => Effect.void), Effect.asVoid @@ -892,7 +899,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { releaseAll: (address) => Effect - .promise(() => + .tryPromise(() => container .items .query({ @@ -905,7 +912,7 @@ export const makeRunnerStorage = Effect.fnUntraced(function*(options?: { Effect.tap(annotateFeed), Effect.flatMap((resp) => Effect.forEach(resp.resources, (doc) => - Effect.promise(() => container.item(doc.id, "lock").delete()).pipe( + Effect.tryPromise(() => container.item(doc.id, "lock").delete()).pipe( Effect.tap(annotateItem), Effect.catchIf(isNotFound, () => Effect.void) ), { discard: true }) diff --git a/packages/infra/test/cluster-cosmos.test.ts b/packages/infra/test/cluster-cosmos.test.ts index 41b4558db..f2550dd56 100644 --- a/packages/infra/test/cluster-cosmos.test.ts +++ b/packages/infra/test/cluster-cosmos.test.ts @@ -8,12 +8,14 @@ import { layerCosmos } from "../src/ClusterCosmos.js" const cosmosUrl = process.env["COSMOS_TEST_URL"] const cosmosDb = process.env["COSMOS_TEST_DB"] ?? "cluster-test" +const testRunId = `${Date.now()}-${process.pid}` +const runnerPortBase = 10000 + Date.now() % 40000 -const layerFor = (label: string) => +const layerFor = () => layerCosmos({ url: Redacted.make(cosmosUrl ?? ""), dbName: cosmosDb, - prefix: `test-cluster-${Date.now()}-${label}-` + prefix: "test-cluster-" }) .pipe( Layer.provideMerge(Snowflake.layerGenerator), @@ -25,7 +27,8 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { Effect .gen(function*() { const storage = yield* MessageStorage.MessageStorage - const request = yield* makeStreamRequest(123) + const shardId = testShardId("message-duplicate") + const request = yield* makeStreamRequest(123, shardId) const saved = yield* storage.saveRequest(request) assert.strictEqual(saved._tag, "Success") @@ -34,7 +37,7 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { yield* storage.saveReply(chunk) const duplicateWithChunk = yield* storage.saveRequest( - yield* makeStreamRequest(123) + yield* makeStreamRequest(123, shardId) ) assert(duplicateWithChunk._tag === "Duplicate" && Option.isSome(duplicateWithChunk.lastReceivedReply)) assert.strictEqual(duplicateWithChunk.lastReceivedReply.value._tag, "Chunk") @@ -46,45 +49,46 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { yield* storage.saveReply(yield* makeStreamReply(request)) const duplicateWithExit = yield* storage.saveRequest( - yield* makeStreamRequest(123) + yield* makeStreamRequest(123, shardId) ) assert(duplicateWithExit._tag === "Duplicate" && Option.isSome(duplicateWithExit.lastReceivedReply)) assert.strictEqual(duplicateWithExit.lastReceivedReply.value._tag, "WithExit") }) - .pipe(Effect.provide(layerFor("message-duplicate")))) + .pipe(Effect.provide(layerFor()))) it.effect("marks reads, resets shards, and excludes completed requests", () => Effect .gen(function*() { const storage = yield* MessageStorage.MessageStorage - const request1 = yield* makeRequest({ payload: { id: 1 } }) - const request2 = yield* makeRequest({ payload: { id: 2 } }) + const shardId = testShardId("message-unprocessed") + const request1 = yield* makeRequest({ payload: { id: 1 }, shardId }) + const request2 = yield* makeRequest({ payload: { id: 2 }, shardId }) yield* storage.saveRequest(request1) yield* storage.saveRequest(request2) let messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) - assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)), [1, 2]) + assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)).sort(), [1, 2]) messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) assert.strictEqual(messages.length, 0) yield* storage.resetShards([request1.envelope.address.shardId]) messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) - assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)), [1, 2]) + assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)).sort(), [1, 2]) yield* storage.saveReply(yield* makeReply(request1)) yield* storage.resetShards([request1.envelope.address.shardId]) messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)), [2]) }) - .pipe(Effect.provide(layerFor("message-unprocessed")))) + .pipe(Effect.provide(layerFor()))) it.effect("notifies registered reply handlers", () => Effect .gen(function*() { const storage = yield* MessageStorage.MessageStorage const latch = yield* Latch.make() - const request = yield* makeRequest() + const request = yield* makeRequest({ shardId: testShardId("message-handler") }) yield* storage.saveRequest(request) const fiber = yield* storage @@ -101,7 +105,7 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { yield* latch.await yield* Fiber.await(fiber) }) - .pipe(Effect.provide(layerFor("message-handler")))) + .pipe(Effect.provide(layerFor()))) }) describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => { @@ -109,8 +113,9 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => { Effect .gen(function*() { const storage = yield* RunnerStorage.RunnerStorage + const runnerAddress = testRunnerAddress(1) const runner = Runner.make({ - address: runnerAddress1, + address: runnerAddress, groups: ["default"], weight: 1 }) @@ -118,24 +123,26 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => { const machineId1 = yield* storage.register(runner, true) const machineId2 = yield* storage.register(runner, true) assert.deepStrictEqual(machineId2, machineId1) - expect(yield* storage.getRunners).toEqual([[runner, true]]) + expect(runnerStatus(yield* storage.getRunners, runnerAddress)).toEqual([runner, true]) - yield* storage.setRunnerHealth(runnerAddress1, false) - expect(yield* storage.getRunners).toEqual([[runner, false]]) + yield* storage.setRunnerHealth(runnerAddress, false) + expect(runnerStatus(yield* storage.getRunners, runnerAddress)).toEqual([runner, false]) - yield* storage.unregister(runnerAddress1) - expect(yield* storage.getRunners).toEqual([]) + yield* storage.unregister(runnerAddress) + expect(runnerStatus(yield* storage.getRunners, runnerAddress)).toBeUndefined() }) - .pipe(Effect.provide(layerFor("runner-register")))) + .pipe(Effect.provide(layerFor()))) it.effect("acquires, refreshes, releases, and re-acquires shard locks", () => Effect .gen(function*() { const storage = yield* RunnerStorage.RunnerStorage + const runnerAddress1 = testRunnerAddress(2) + const runnerAddress2 = testRunnerAddress(3) const shards = [ - ShardId.make("default", 1), - ShardId.make("default", 2), - ShardId.make("default", 3) + testShardId("runner-locks", 1), + testShardId("runner-locks", 2), + testShardId("runner-locks", 3) ] let acquired = yield* storage.acquire(runnerAddress1, shards) @@ -147,7 +154,7 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => { const refreshed = yield* storage.refresh(runnerAddress1, shards) assert.deepStrictEqual(refreshed.map((shard) => shard.id), [1, 2, 3]) - yield* storage.release(runnerAddress1, ShardId.make("default", 2)) + yield* storage.release(runnerAddress1, testShardId("runner-locks", 2)) acquired = yield* storage.acquire(runnerAddress2, shards) assert.deepStrictEqual(acquired.map((shard) => shard.id), [2]) @@ -155,7 +162,7 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => { acquired = yield* storage.acquire(runnerAddress2, shards) assert.deepStrictEqual(acquired.map((shard) => shard.id), [1, 2, 3]) }) - .pipe(Effect.provide(layerFor("runner-locks")))) + .pipe(Effect.provide(layerFor()))) }) const GetUserRpc = Rpc.make("GetUser", { @@ -170,13 +177,16 @@ class StreamRpc extends Rpc.make("StreamTest", { primaryKey: (value) => value.id.toString() }) {} -const makeRequest = Effect.fnUntraced(function*(options?: { readonly payload?: { readonly id: number } }) { +const makeRequest = Effect.fnUntraced(function*(options?: { + readonly payload?: { readonly id: number } + readonly shardId?: ShardId.ShardId +}) { const snowflake = yield* Snowflake.Generator return new Message.OutgoingRequest({ envelope: Envelope.makeRequest({ requestId: snowflake.nextUnsafe(), address: EntityAddress.make({ - shardId: ShardId.make("default", 1), + shardId: options?.shardId ?? testShardId("default"), entityType: EntityType.make("test"), entityId: EntityId.make("1") }), @@ -197,13 +207,13 @@ const makeRequest = Effect.fnUntraced(function*(options?: { readonly payload?: { }) }) -const makeStreamRequest = Effect.fnUntraced(function*(id: number) { +const makeStreamRequest = Effect.fnUntraced(function*(id: number, shardId = testShardId("stream")) { const snowflake = yield* Snowflake.Generator return new Message.OutgoingRequest({ envelope: Envelope.makeRequest({ requestId: snowflake.nextUnsafe(), address: EntityAddress.make({ - shardId: ShardId.make("default", 1), + shardId, entityType: EntityType.make("test"), entityId: EntityId.make("1") }), @@ -294,5 +304,11 @@ const requestPayloadId = (message: Message.Incoming) => { return envelope.payload.id } -const runnerAddress1 = RunnerAddress.make("localhost", 1234) -const runnerAddress2 = RunnerAddress.make("localhost", 5678) +const testShardId = (label: string, id = 1) => ShardId.make(`cluster-cosmos-${testRunId}-${label}`, id) + +const testRunnerAddress = (offset: number) => RunnerAddress.make("localhost", runnerPortBase + offset) + +const runnerStatus = ( + runners: ReadonlyArray, + address: RunnerAddress.RunnerAddress +) => runners.find(([runner]) => runner.address.host === address.host && runner.address.port === address.port) From 1fb479ca652b5e99ff843427fe19a82828d44cf5 Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sat, 30 May 2026 16:46:30 +0200 Subject: [PATCH 6/8] ignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 42195a6a4..47366c07d 100644 --- a/.gitignore +++ b/.gitignore @@ -53,4 +53,5 @@ fabric.properties .direnv -storybook-static/ \ No newline at end of file +storybook-static/ +run.sh From 308f215fa4a5a32a9776d1a580b876aa2c1aebef Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sat, 30 May 2026 16:57:58 +0200 Subject: [PATCH 7/8] test(infra): cover illegal Cosmos ids --- packages/infra/test/cluster-cosmos.test.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/infra/test/cluster-cosmos.test.ts b/packages/infra/test/cluster-cosmos.test.ts index f2550dd56..e90dac739 100644 --- a/packages/infra/test/cluster-cosmos.test.ts +++ b/packages/infra/test/cluster-cosmos.test.ts @@ -28,7 +28,8 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { .gen(function*() { const storage = yield* MessageStorage.MessageStorage const shardId = testShardId("message-duplicate") - const request = yield* makeStreamRequest(123, shardId) + const primaryKey = `primary/${testRunId}\\with?illegal#chars` + const request = yield* makeStreamRequest(primaryKey, shardId) const saved = yield* storage.saveRequest(request) assert.strictEqual(saved._tag, "Success") @@ -37,7 +38,7 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { yield* storage.saveReply(chunk) const duplicateWithChunk = yield* storage.saveRequest( - yield* makeStreamRequest(123, shardId) + yield* makeStreamRequest(primaryKey, shardId) ) assert(duplicateWithChunk._tag === "Duplicate" && Option.isSome(duplicateWithChunk.lastReceivedReply)) assert.strictEqual(duplicateWithChunk.lastReceivedReply.value._tag, "Chunk") @@ -49,7 +50,7 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { yield* storage.saveReply(yield* makeStreamReply(request)) const duplicateWithExit = yield* storage.saveRequest( - yield* makeStreamRequest(123, shardId) + yield* makeStreamRequest(primaryKey, shardId) ) assert(duplicateWithExit._tag === "Duplicate" && Option.isSome(duplicateWithExit.lastReceivedReply)) assert.strictEqual(duplicateWithExit.lastReceivedReply.value._tag, "WithExit") @@ -172,7 +173,7 @@ const GetUserRpc = Rpc.make("GetUser", { class StreamRpc extends Rpc.make("StreamTest", { success: RpcSchema.Stream(Schema.Void, Schema.Never), payload: { - id: Schema.Number + id: Schema.String }, primaryKey: (value) => value.id.toString() }) {} @@ -207,7 +208,7 @@ const makeRequest = Effect.fnUntraced(function*(options?: { }) }) -const makeStreamRequest = Effect.fnUntraced(function*(id: number, shardId = testShardId("stream")) { +const makeStreamRequest = Effect.fnUntraced(function*(id: string, shardId = testShardId("stream")) { const snowflake = yield* Snowflake.Generator return new Message.OutgoingRequest({ envelope: Envelope.makeRequest({ From 816adedd39643c2456db67d6327b7c3d9e23ae18 Mon Sep 17 00:00:00 2001 From: Patrick Roza Date: Sat, 30 May 2026 17:14:24 +0200 Subject: [PATCH 8/8] test(infra): cover Cosmos cluster RPC --- packages/infra/test/cluster-cosmos.test.ts | 99 +++++++++++++++++++++- 1 file changed, 95 insertions(+), 4 deletions(-) diff --git a/packages/infra/test/cluster-cosmos.test.ts b/packages/infra/test/cluster-cosmos.test.ts index e90dac739..078d9caf2 100644 --- a/packages/infra/test/cluster-cosmos.test.ts +++ b/packages/infra/test/cluster-cosmos.test.ts @@ -1,14 +1,14 @@ import { assert, describe, expect, it } from "@effect/vitest" import { Context, Effect, Exit, Fiber, Latch, Layer, Option, Redacted, Schema } from "effect" import { TestClock } from "effect/testing" -import { EntityAddress, EntityId, EntityType, Envelope, Message, MessageStorage, Reply, Runner, RunnerAddress, RunnerStorage, ShardId, ShardingConfig, Snowflake } from "effect/unstable/cluster" +import { ClusterSchema, Entity, EntityAddress, EntityId, EntityType, Envelope, Message, MessageStorage, Reply, Runner, RunnerAddress, RunnerHealth, Runners, RunnerStorage, ShardId, Sharding, ShardingConfig, Snowflake } from "effect/unstable/cluster" import { Headers } from "effect/unstable/http" import { Rpc, RpcSchema } from "effect/unstable/rpc" import { layerCosmos } from "../src/ClusterCosmos.js" const cosmosUrl = process.env["COSMOS_TEST_URL"] const cosmosDb = process.env["COSMOS_TEST_DB"] ?? "cluster-test" -const testRunId = `${Date.now()}-${process.pid}` +const testRunId = `${Date.now()}-${process.pid}-${Math.random().toString(16).slice(2)}` const runnerPortBase = 10000 + Date.now() % 40000 const layerFor = () => @@ -64,8 +64,8 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos MessageStorage", () => { const shardId = testShardId("message-unprocessed") const request1 = yield* makeRequest({ payload: { id: 1 }, shardId }) const request2 = yield* makeRequest({ payload: { id: 2 }, shardId }) - yield* storage.saveRequest(request1) - yield* storage.saveRequest(request2) + assert.strictEqual((yield* storage.saveRequest(request1))._tag, "Success") + assert.strictEqual((yield* storage.saveRequest(request2))._tag, "Success") let messages = yield* storage.unprocessedMessages([request1.envelope.address.shardId]) assert.deepStrictEqual(messages.map((message) => requestPayloadId(message)).sort(), [1, 2]) @@ -166,10 +166,66 @@ describe.skipIf(!cosmosUrl)("ClusterCosmos RunnerStorage", () => { .pipe(Effect.provide(layerFor()))) }) +describe.skipIf(!cosmosUrl)("ClusterCosmos Sharding RPC", () => { + it.effect("runs persisted entity RPCs through Cosmos-backed cluster storage", () => + Effect + .gen(function*() { + yield* TestClock.adjust(1) + const sharding = yield* Sharding.Sharding + const makeClient = yield* CosmosRpcEntity.client + const entityId = `entity/${testRunId}\\with?illegal#chars` + const shardId = sharding.getShardId(EntityId.make(entityId), testShardGroup("rpc")) + yield* waitForShard(sharding, shardId) + assert.isTrue(sharding.hasShardId(shardId)) + const client = makeClient(entityId) + + const user = yield* client.GetCosmosUser({ id: 42 }) + expect(user).toEqual(new CosmosRpcUser({ id: 42, name: "User 42" })) + + const primaryKey = `rpc/${testRunId}\\with?illegal#chars` + const first = yield* client.CosmosRequestWithKey({ key: primaryKey }) + const duplicate = yield* client.CosmosRequestWithKey({ key: primaryKey }) + + assert.strictEqual(first, primaryKey) + assert.strictEqual(duplicate, primaryKey) + }) + .pipe(Effect.provide(clusterRpcLayer("rpc"))), 20000) +}) + const GetUserRpc = Rpc.make("GetUser", { payload: { id: Schema.Number } }) +class CosmosRpcUser extends Schema.Class("CosmosRpcUser")({ + id: Schema.Number, + name: Schema.String +}) {} + +const CosmosRpcEntity = Entity + .make("CosmosRpcEntity", [ + Rpc.make("GetCosmosUser", { + success: CosmosRpcUser, + payload: { id: Schema.Number } + }), + Rpc.make("CosmosRequestWithKey", { + success: Schema.String, + payload: { key: Schema.String }, + primaryKey: ({ key }) => key + }) + ]) + .annotate(ClusterSchema.ShardGroup, () => testShardGroup("rpc")) + .annotateRpcs(ClusterSchema.Persisted, true) + +const CosmosRpcEntityLayer = CosmosRpcEntity.toLayer( + Effect.succeed( + CosmosRpcEntity.of({ + GetCosmosUser: (envelope) => + Effect.succeed(new CosmosRpcUser({ id: envelope.payload.id, name: `User ${envelope.payload.id}` })), + CosmosRequestWithKey: (envelope) => Effect.succeed(envelope.payload.key) + }) + ) +) + class StreamRpc extends Rpc.make("StreamTest", { success: RpcSchema.Stream(Schema.Void, Schema.Never), payload: { @@ -307,9 +363,44 @@ const requestPayloadId = (message: Message.Incoming) => { const testShardId = (label: string, id = 1) => ShardId.make(`cluster-cosmos-${testRunId}-${label}`, id) +const testShardGroup = (label: string) => `cluster-cosmos-${testRunId}-${label}` + const testRunnerAddress = (offset: number) => RunnerAddress.make("localhost", runnerPortBase + offset) const runnerStatus = ( runners: ReadonlyArray, address: RunnerAddress.RunnerAddress ) => runners.find(([runner]) => runner.address.host === address.host && runner.address.port === address.port) + +const clusterRpcLayer = (label: string) => { + const shardGroup = testShardGroup(label) + return CosmosRpcEntityLayer.pipe( + Layer.provideMerge(Sharding.layer), + Layer.provide(Runners.layerNoop), + Layer.provide(RunnerHealth.layerNoop), + Layer.provide(layerCosmos({ + url: Redacted.make(cosmosUrl ?? ""), + dbName: cosmosDb, + prefix: "test-cluster-" + })), + Layer.provide(ShardingConfig.layer({ + runnerAddress: Option.some(testRunnerAddress(10)), + shardsPerGroup: 1, + availableShardGroups: [shardGroup], + assignedShardGroups: [shardGroup], + entityTerminationTimeout: 0, + entityMessagePollInterval: 50, + entityReplyPollInterval: 50, + refreshAssignmentsInterval: 0, + sendRetryInterval: 50 + })) + ) +} + +const waitForShard = (sharding: Sharding.Sharding["Service"], shardId: ShardId.ShardId) => + Effect.gen(function*() { + for (let i = 0; i < 30; i++) { + if (sharding.hasShardId(shardId)) return + yield* Effect.promise(() => new Promise((resolve) => setTimeout(resolve, 100))) + } + })