Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions billing/data/space-diff-v2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import * as Link from 'multiformats/link'
import { EncodeFailure, DecodeFailure, Schema } from './lib.js'

/**
* @typedef {import('../lib/api.js').SpaceDiff} SpaceDiff
* @typedef {import('../types.js').InferStoreRecord<SpaceDiff> & { pk: string, cause: string, ttlAt?: number }} SpaceDiffV2StoreRecord
* @typedef {import('../lib/api.js').SpaceDiffListKey} SpaceDiffListKey
* @typedef {{ pk: string, receiptAt: string }} SpaceDiffV2ListStoreRecord
* @typedef {import('../types.js').StoreRecord} StoreRecord
*/

// Storage-controlled TTL: items expire after this many seconds (default 365 days)
const SPACE_DIFF_TTL_SECONDS = 60 * 60 * 24 * 365

export const schema = Schema.struct({
space: Schema.did(),
provider: Schema.did({ method: 'web' }),
subscription: Schema.text(),
cause: Schema.link({ version: 1 }),
delta: Schema.integer(),
receiptAt: Schema.date(),
insertedAt: Schema.date()
})

/** @type {import('../lib/api.js').Validator<SpaceDiff>} */
export const validate = input => schema.read(input)

/** @type {import('../lib/api.js').Encoder<SpaceDiff, SpaceDiffV2StoreRecord>} */
export const encode = input => {
try {
return {
ok: {
pk: `${input.provider}#${input.space}`,
cause: input.cause.toString(),
space: input.space,
provider: input.provider,
subscription: input.subscription,
delta: input.delta,
receiptAt: input.receiptAt.toISOString(),
insertedAt: new Date().toISOString(),
// DynamoDB TTL (epoch seconds)
ttlAt: Math.floor(Date.now() / 1000) + SPACE_DIFF_TTL_SECONDS
}
}
} catch (/** @type {any} */ err) {
return {
error: new EncodeFailure(`encoding space diff v2 record: ${err.message}`, { cause: err })
}
}
}

/** @type {import('../lib/api.js').Decoder<StoreRecord, SpaceDiff>} */
export const decode = input => {
try {
return {
ok: {
space: Schema.did().from(input.space),
provider: Schema.did({ method: 'web' }).from(input.provider),
subscription: /** @type {string} */ (input.subscription),
cause: Link.parse(/** @type {string} */ (input.cause)),
delta: /** @type {number} */ (input.delta),
receiptAt: new Date(input.receiptAt),
insertedAt: new Date(input.insertedAt)
}
}
} catch (/** @type {any} */ err) {
return {
error: new DecodeFailure(`decoding space diff v2 record: ${err.message}`, { cause: err })
}
}
}

export const lister = {
/** @type {import('../lib/api.js').Encoder<SpaceDiffListKey, SpaceDiffV2ListStoreRecord>} */
encodeKey: input => ({
ok: {
pk: `${input.provider}#${input.space}`,
receiptAt: input.from.toISOString()
}
})
}
50 changes: 50 additions & 0 deletions billing/tables/space-diff-v2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { createStoreBatchPutterClient, createStoreListerClient } from './client.js'
import { validate, encode, lister, decode } from '../data/space-diff-v2.js'

/**
* Stores changes to total space size with structural uniqueness by cause.
*
* PK: provider#space
* SK: cause (Link V1 string)
*
* @type {import('sst/constructs').TableProps}
*/
export const spaceDiffV2TableProps = {
fields: {
/** Composite key with format: "provider#space" */
pk: 'string',
/** Sort key and unique identifier per invocation cause (bafy...) */
cause: 'string',
/** Space DID (did:key:...). */
space: 'string',
/** Storage provider for the space. */
provider: 'string',
/** Subscription in use when the size changed. */
subscription: 'string',
/** Number of bytes added to or removed from the space. */
delta: 'number',
/** ISO timestamp the receipt was issued. */
receiptAt: 'string',
/** ISO timestamp we recorded the change. */
insertedAt: 'string',
/** Optional: ttlAt (Number epoch seconds) for DynamoDB TTL */
ttlAt: 'number'
},
primaryIndex: { partitionKey: 'pk', sortKey: 'cause' },
globalIndexes: {
/** Time-ordered queries for billing/reporting */
byReceiptAt: { partitionKey: 'pk', sortKey: 'receiptAt' }
}
}

/**
* v2 store clients (reuse existing validators/encoders)
*
* @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf
* @param {{ tableName: string }} context
* @returns {import('../lib/api.js').SpaceDiffStore}
*/
export const createSpaceDiffV2Store = (conf, { tableName }) => ({
...createStoreBatchPutterClient(conf, { tableName, validate, encode }),
...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode, indexName: 'byReceiptAt' })
})
5 changes: 4 additions & 1 deletion stacks/billing-db-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Table, Config } from 'sst/constructs'
import { customerTableProps } from '../billing/tables/customer.js'
import { spaceDiffTableProps } from '../billing/tables/space-diff.js'
import { spaceDiffArchiveTableProps } from '../billing/tables/space-diff-archive.js'
import { spaceDiffV2TableProps } from '../billing/tables/space-diff-v2.js'
import { spaceSnapshotTableProps } from '../billing/tables/space-snapshot.js'
import { usageTableProps } from '../billing/tables/usage.js'
import { egressTrafficTableProps } from '../billing/tables/egress-traffic.js'
Expand All @@ -13,6 +14,7 @@ export const BillingDbStack = ({ stack }) => {
const customerTable = new Table(stack, 'customer', customerTableProps)
const spaceSnapshotTable = new Table(stack, 'space-snapshot', spaceSnapshotTableProps)
const spaceDiffTable = new Table(stack, 'space-diff', spaceDiffTableProps)
const spaceDiffV2Table = new Table(stack, 'space-diff-v2', spaceDiffV2TableProps)
const spaceDiffArchiveTable = new Table(stack, 'space-diff-archive', spaceDiffArchiveTableProps)
const usageTable = new Table(stack, 'usage', {
...usageTableProps,
Expand All @@ -24,12 +26,13 @@ export const BillingDbStack = ({ stack }) => {
customerTableName: customerTable.tableName,
spaceSnapshotTableName: spaceSnapshotTable.tableName,
spaceDiffTableName: spaceDiffTable.tableName,
spaceDiffV2TableName: spaceDiffV2Table.tableName,
spaceDiffArchiveTableName: spaceDiffArchiveTable.tableName,
usageTable: usageTable.tableName,
egressTrafficTableName: egressTrafficTable.tableName
})

const stripeSecretKey = new Config.Secret(stack, 'STRIPE_SECRET_KEY')

return { customerTable, spaceSnapshotTable, spaceDiffTable, spaceDiffArchiveTable, usageTable, egressTrafficTable, stripeSecretKey }
return { customerTable, spaceSnapshotTable, spaceDiffTable, spaceDiffV2Table, spaceDiffArchiveTable, usageTable, egressTrafficTable, stripeSecretKey }
}
5 changes: 5 additions & 0 deletions stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export function UploadApiStack({ stack, app }) {
const {
customerTable,
spaceDiffTable,
spaceDiffV2Table,
spaceSnapshotTable,
egressTrafficTable,
stripeSecretKey,
Expand Down Expand Up @@ -167,6 +168,7 @@ export function UploadApiStack({ stack, app }) {
replicaTable,
revocationTable,
spaceDiffTable,
spaceDiffV2Table,
spaceMetricsTable,
spaceSnapshotTable,
storeTable, // legacy
Expand Down Expand Up @@ -218,6 +220,7 @@ export function UploadApiStack({ stack, app }) {
REQUIRE_PAYMENT_PLAN: process.env.REQUIRE_PAYMENT_PLAN ?? '',
REVOCATION_TABLE: revocationTable.tableName,
SPACE_DIFF_TABLE: spaceDiffTable.tableName,
SPACE_DIFF_V2_TABLE: spaceDiffV2Table.tableName,
SPACE_METRICS_TABLE: spaceMetricsTable.tableName,
SPACE_SNAPSHOT_TABLE: spaceSnapshotTable.tableName,
STORAGE_PROVIDER_TABLE: storageProviderTable.tableName,
Expand Down Expand Up @@ -301,6 +304,7 @@ export function UploadApiStack({ stack, app }) {
revocationTable,
spaceMetricsTable,
spaceDiffTable,
spaceDiffV2Table,
spaceSnapshotTable,
subscriptionTable,
ucanStream,
Expand Down Expand Up @@ -328,6 +332,7 @@ export function UploadApiStack({ stack, app }) {
REFERRALS_ENDPOINT: process.env.REFERRALS_ENDPOINT ?? '',
REVOCATION_TABLE_NAME: revocationTable.tableName,
SPACE_DIFF_TABLE_NAME: spaceDiffTable.tableName,
SPACE_DIFF_V2_TABLE_NAME: spaceDiffV2Table.tableName,
SPACE_METRICS_TABLE_NAME: spaceMetricsTable.tableName,
SPACE_SNAPSHOT_TABLE_NAME: spaceSnapshotTable.tableName,
SUBSCRIPTION_TABLE_NAME: subscriptionTable.tableName,
Expand Down
17 changes: 13 additions & 4 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ export async function ucanInvocationRouter(request) {
rateLimitTableName,
pieceTableName,
spaceDiffTableName,
spaceDiffV2TableName,
spaceSnapshotTableName,
storageProviderTableName,
replicaTableName,
Expand Down Expand Up @@ -302,6 +303,7 @@ export async function ucanInvocationRouter(request) {
AWS_REGION,
blobRegistryTableName,
spaceDiffTableName,
spaceDiffV2TableName,
consumerTableName,
metrics,
options
Expand Down Expand Up @@ -353,10 +355,7 @@ export async function ucanInvocationRouter(request) {
)
)
const rateLimitsStorage = createRateLimitTable(AWS_REGION, rateLimitTableName)
const spaceDiffStore = createSpaceDiffStore(
{ region: AWS_REGION },
{ tableName: spaceDiffTableName }
)

const spaceSnapshotStore = createSpaceSnapshotStore(
{ region: AWS_REGION },
{ tableName: spaceSnapshotTableName }
Expand All @@ -365,6 +364,15 @@ export async function ucanInvocationRouter(request) {
{ region: AWS_REGION },
{ url: new URL(egressTrafficQueueUrl) }
)
// NOTE: We keep using the v1 space-diff store for reads during the dual-write phase.
// After removing dual-write space diff and cutting over readers to v2:
// - switch to createSpaceDiffV2Store({ region: AWS_REGION }, { tableName: spaceDiffV2TableName })
// - update callers that rely on time ordering to use the v2 GSI (byReceiptAt).
// - ensure environment provides SPACE_DIFF_V2_TABLE and deprecate SPACE_DIFF_TABLE for reads.
const spaceDiffStore = createSpaceDiffStore(
{ region: AWS_REGION },
{ tableName: spaceDiffTableName }
)

const usageStorage = useUsageStore({
spaceDiffStore,
Expand Down Expand Up @@ -735,6 +743,7 @@ function getLambdaEnv() {
rateLimitTableName: mustGetEnv('RATE_LIMIT_TABLE'),
pieceTableName: mustGetEnv('PIECE_TABLE'),
spaceDiffTableName: mustGetEnv('SPACE_DIFF_TABLE'),
spaceDiffV2TableName: mustGetEnv('SPACE_DIFF_V2_TABLE'),
spaceSnapshotTableName: mustGetEnv('SPACE_SNAPSHOT_TABLE'),
storageProviderTableName: mustGetEnv('STORAGE_PROVIDER_TABLE'),
replicaTableName: mustGetEnv('REPLICA_TABLE'),
Expand Down
46 changes: 46 additions & 0 deletions upload-api/stores/blob-registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const tracer = trace.getTracer('upload-api')
* @param {string} region
* @param {string} blobRegistryTableName
* @param {string} spaceDiffTableName
* @param {string} spaceDiffV2TableName
* @param {string} consumerTableName
* @param {{
* space: import('../types.js').SpaceMetricsStore
Expand All @@ -39,6 +40,7 @@ export const createBlobRegistry = (
region,
blobRegistryTableName,
spaceDiffTableName,
spaceDiffV2TableName,
consumerTableName,
metrics,
options = {}
Expand All @@ -48,6 +50,7 @@ export const createBlobRegistry = (
dynamoDb,
blobRegistryTableName,
spaceDiffTableName,
spaceDiffV2TableName,
consumerTableName,
metrics
)
Expand All @@ -57,6 +60,7 @@ export const createBlobRegistry = (
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoDb
* @param {string} blobRegistryTableName
* @param {string} spaceDiffTableName
* @param {string} spaceDiffV2TableName
* @param {string} consumerTableName
* @param {{
* space: import('../types.js').SpaceMetricsStore
Expand All @@ -68,6 +72,7 @@ export const useBlobRegistry = (
dynamoDb,
blobRegistryTableName,
spaceDiffTableName,
spaceDiffV2TableName,
consumerTableName,
metrics
) => {
Expand Down Expand Up @@ -113,6 +118,7 @@ export const useBlobRegistry = (
return { ok: diffs, error: undefined }
}


return instrumentMethods(tracer, 'BlobRegistry', {
/** @type {BlobAPI.Registry['find']} */
async find(space, digest) {
Expand Down Expand Up @@ -184,12 +190,24 @@ export const useBlobRegistry = (
}

for (const diffItem of spaceDiffResults.ok ?? []) {
// Write to existing v1 table (receiptAt#cause sort key)
transactWriteItems.push({
Put: {
TableName: spaceDiffTableName,
Item: marshall(diffItem, { removeUndefinedValues: true })
}
})
// Write to v2 table (cause as sort key)
const v2Item = toSpaceDiffV2Item(diffItem)
transactWriteItems.push({
Put: {
TableName: spaceDiffV2TableName,
Item: marshall(v2Item, { removeUndefinedValues: true }),
// ensure we only create new items; prevents overwriting existing (pk,cause)
ConditionExpression: 'attribute_not_exists(#SK)',
ExpressionAttributeNames: { '#SK': 'cause' }
}
})
}

const transactWriteCommand = new TransactWriteItemsCommand({
Expand Down Expand Up @@ -263,12 +281,23 @@ export const useBlobRegistry = (
}

for (const diffItem of spaceDiffResults.ok ?? []) {
// V1 write
transactWriteItems.push({
Put: {
TableName: spaceDiffTableName,
Item: marshall(diffItem, { removeUndefinedValues: true }),
},
})
// V2 write with uniqueness guard
const v2Item = toSpaceDiffV2Item(diffItem)
transactWriteItems.push({
Put: {
TableName: spaceDiffV2TableName,
Item: marshall(v2Item, { removeUndefinedValues: true }),
ConditionExpression: 'attribute_not_exists(#SK)',
ExpressionAttributeNames: { '#SK': 'cause' }
}
})
}

const transactWriteCommand = new TransactWriteItemsCommand({
Expand Down Expand Up @@ -340,6 +369,23 @@ export const useBlobRegistry = (
})
}

/**
* Map a v1 diff record to v2 schema (cause as sort key).
*
* @param {Record<string, any>} diffItem
*/
const toSpaceDiffV2Item = (diffItem) => ({
pk: diffItem.pk,
// Sort key is 'cause' in v2
cause: diffItem.cause,
space: diffItem.space,
provider: diffItem.provider,
subscription: diffItem.subscription,
delta: diffItem.delta,
receiptAt: diffItem.receiptAt,
insertedAt: diffItem.insertedAt
})

/**
* Upgrade from the db representation
*
Expand Down
Loading