diff --git a/cli/src/bundle/partial.ts b/cli/src/bundle/partial.ts index 403bddd8e7..a176c6cdae 100644 --- a/cli/src/bundle/partial.ts +++ b/cli/src/bundle/partial.ts @@ -187,6 +187,12 @@ interface PartialEncryptionOptions { ivSessionKey: string } +interface PartialUploadManifestEntry { + file_name: string + s3_path: string + file_hash: string +} + export async function uploadPartial( apikey: string, manifest: manifestType, @@ -195,7 +201,7 @@ export async function uploadPartial( orgId: string, encryptionOptions: PartialEncryptionOptions | undefined, options: OptionsUpload, -): Promise { +): Promise { const spinner = spinnerC() spinner.start('Preparing partial update with TUS protocol') const startTime = performance.now() @@ -238,7 +244,7 @@ export async function uploadPartial( spinner.message(`Uploading ${totalFiles} files using TUS protocol`) // Helper function to upload a single file - const uploadFile = async (file: manifestType[number]) => { + const uploadFile = async (file: manifestType[number]): Promise => { const finalFilePath = join(path, file.file) const filePathUnix = convertToUnixPath(file.file) @@ -285,7 +291,7 @@ export async function uploadPartial( }) } - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { spinner.message(`Prepare upload partial file: ${filePathUnix}`) // Get the MIME type for this file (based on original filename, not the R2 path) const filetype = getContentType(uploadPathUnix) @@ -341,9 +347,9 @@ export async function uploadPartial( }) } - // Process files in batches of 1000 to avoid overwhelming the server + // Process files in batches to avoid overwhelming the server const BATCH_SIZE = 500 - const results: any[] = [] + const results: PartialUploadManifestEntry[] = [] for (let i = 0; i < manifest.length; i += BATCH_SIZE) { const batch = manifest.slice(i, i + BATCH_SIZE) diff --git a/cli/src/types/supabase.types.ts b/cli/src/types/supabase.types.ts index 8de1d216d6..b964d2ccf6 100644 --- a/cli/src/types/supabase.types.ts +++ b/cli/src/types/supabase.types.ts @@ -2330,6 +2330,33 @@ export type Database = { }, ] } + uploaded_file_sizes: { + Row: { + app_id: string + created_at: string + file_size: number + owner_org: string + s3_path: string + updated_at: string + } + Insert: { + app_id: string + created_at?: string + file_size: number + owner_org: string + s3_path: string + updated_at?: string + } + Update: { + app_id?: string + created_at?: string + file_size?: number + owner_org?: string + s3_path?: string + updated_at?: string + } + Relationships: [] + } usage_credit_consumptions: { Row: { applied_at: string diff --git a/supabase/functions/_backend/files/files.ts b/supabase/functions/_backend/files/files.ts index 7374ecec74..c01d766f32 100644 --- a/supabase/functions/_backend/files/files.ts +++ b/supabase/functions/_backend/files/files.ts @@ -21,6 +21,7 @@ import { app as files_config } from './files_config.ts' import { parseUploadMetadata } from './parse.ts' import { DEFAULT_RETRY_PARAMS, RetryBucket } from './retry.ts' import { supabaseTusCreateHandler, supabaseTusHeadHandler, supabaseTusPatchHandler } from './supabaseTusProxy.ts' +import { getCompletedTusUploadSize, recordUploadedFileSize } from './uploadSize.ts' import { ALLOWED_HEADERS, ALLOWED_METHODS, buildFileHttpMetadata, EXPOSED_HEADERS, getSafeAttachmentReadCandidateKeys, headFirstExistingAttachmentCandidate, isRetryableDurableObjectResetError, MAX_UPLOAD_LENGTH_BYTES, parseAppScopedAttachmentPath, toBase64, TUS_EXTENSIONS, TUS_VERSION, withNoTransformCacheControl, X_CHECKSUM_SHA256, X_UPLOAD_HANDLER_RETRYABLE } from './util.ts' const DO_CALL_TIMEOUT = 1000 * 60 * 30 // 30 minutes @@ -619,6 +620,17 @@ function buildNormalizedUploadMetadataHeader(c: Context, filename: string): stri return metadata.join(',') } +async function recordCompletedTusUpload(c: Context, response: Response) { + if (response.status < 200 || response.status >= 300) + return + + const fileSize = getCompletedTusUploadSize(response.headers) + if (fileSize == null) + return + + await recordUploadedFileSize(c, c.get('fileId') as string, fileSize) +} + // TUS protocol requests (POST/PATCH/HEAD) that get forwarded to a durable object async function uploadHandler(c: Context) { const requestId = c.get('fileId') as string @@ -652,7 +664,9 @@ async function uploadHandler(c: Context) { requestInit.duplex = 'half' } const request = new Request(c.req.url, requestInit) - return await fetchUploadHandlerWithRetry(c, handler, request) + const response = await fetchUploadHandlerWithRetry(c, handler, request) + await recordCompletedTusUpload(c, response) + return response } async function setKeyFromMetadata(c: Context, next: Next) { diff --git a/supabase/functions/_backend/files/supabaseTusProxy.ts b/supabase/functions/_backend/files/supabaseTusProxy.ts index ab803c0e2a..69e5e755ad 100644 --- a/supabase/functions/_backend/files/supabaseTusProxy.ts +++ b/supabase/functions/_backend/files/supabaseTusProxy.ts @@ -1,7 +1,9 @@ import type { Context } from 'hono' -import { cloudlog } from '../utils/logging.ts' +import { cloudlog, cloudlogErr } from '../utils/logging.ts' +import { closeClient, getPgClient } from '../utils/pg.ts' import { getEnv } from '../utils/utils.ts' import { parseUploadMetadata } from './parse.ts' +import { getCompletedTusUploadSize, recordUploadedFileSize } from './uploadSize.ts' import { ALLOWED_HEADERS, ALLOWED_METHODS, EXPOSED_HEADERS, MAX_UPLOAD_LENGTH_BYTES, TUS_EXTENSIONS, TUS_VERSION } from './util.ts' const BUCKET_NAME = 'capgo' @@ -55,12 +57,9 @@ function transformMetadataForSupabase(c: Context, objectName: string): string { } /** - * Rewrite Supabase Location header to Capgo API URL + * Extract uploadId from Supabase URL in a robust way. */ -function rewriteLocationHeader(c: Context, supabaseLocation: string): string { - const requestId = c.get('requestId') - - // Extract uploadId from Supabase URL in a robust way +function extractUploadIdFromLocation(supabaseLocation: string): string | undefined { let uploadId: string | undefined try { const url = new URL(supabaseLocation) @@ -72,6 +71,15 @@ function rewriteLocationHeader(c: Context, supabaseLocation: string): string { const pathSegments = pathWithoutQuery.split('/').filter(Boolean) uploadId = pathSegments[pathSegments.length - 1] } + return uploadId +} + +/** + * Rewrite Supabase Location header to Capgo API URL + */ +function rewriteLocationHeader(c: Context, supabaseLocation: string): string { + const requestId = c.get('requestId') + const uploadId = extractUploadIdFromLocation(supabaseLocation) if (!uploadId) { cloudlog({ requestId, message: 'rewriteLocationHeader - failed to extract uploadId', supabaseLocation }) @@ -209,6 +217,91 @@ async function readErrorBody(response: Response): Promise { } } +async function readSupabaseTusProgressHeaders(c: Context, uploadId: string): Promise { + const requestId = c.get('requestId') + const result = await proxyToSupabase(requestId, 'supabaseTusProgressHead', buildSupabaseTusUrl(c, uploadId), { + method: 'HEAD', + headers: buildSupabaseAuthHeaders(c), + }) + + if ('error' in result || !result.ok) + return null + + return result.headers +} + +async function readSupabaseStorageObjectSize(c: Context, s3Path: string): Promise { + const pgClient = getPgClient(c, false) + try { + const result = await pgClient.query<{ file_size: string | number | null }>( + ` + SELECT + CASE + WHEN metadata ->> 'size' ~ '^[0-9]+$' THEN (metadata ->> 'size')::bigint + WHEN metadata ->> 'contentLength' ~ '^[0-9]+$' THEN (metadata ->> 'contentLength')::bigint + ELSE NULL + END AS file_size + FROM storage.objects + WHERE bucket_id = $1 + AND name = $2 + LIMIT 1 + `, + [BUCKET_NAME, s3Path], + ) + const fileSize = Number(result.rows[0]?.file_size) + return Number.isFinite(fileSize) && fileSize > 0 ? fileSize : null + } + finally { + await closeClient(c, pgClient) + } +} + +async function recordSupabaseCompletedTusUpload(c: Context, uploadId: string, responseHeaders: Headers): Promise { + const requestId = c.get('requestId') + const rawFileId = c.get('fileId') + if (typeof rawFileId !== 'string' || rawFileId.length === 0) { + cloudlog({ requestId, message: 'recordSupabaseCompletedTusUpload missing fileId', uploadId }) + return + } + + const s3Path = rawFileId + // Request Upload-Length is protocol input, not a trusted source for manifest sizes. + const hasCompleteProgressHeaders = responseHeaders.has('Upload-Offset') && responseHeaders.has('Upload-Length') + let fileSize = getCompletedTusUploadSize(responseHeaders) + if (hasCompleteProgressHeaders && fileSize == null) + return + + if (fileSize == null) { + const progressHeaders = await readSupabaseTusProgressHeaders(c, uploadId) + const hasProgressHeaders = progressHeaders?.has('Upload-Offset') && progressHeaders.has('Upload-Length') + fileSize = progressHeaders ? getCompletedTusUploadSize(progressHeaders) : null + if (hasProgressHeaders && fileSize == null) + return + } + + if (fileSize == null) + fileSize = await readSupabaseStorageObjectSize(c, s3Path) + + if (fileSize != null) + await recordUploadedFileSize(c, s3Path, fileSize) +} + +async function safeRecordSupabaseCompletedTusUpload(c: Context, uploadId: string, responseHeaders: Headers): Promise { + try { + await recordSupabaseCompletedTusUpload(c, uploadId, responseHeaders) + } + catch (error) { + cloudlogErr({ + requestId: c.get('requestId'), + message: 'recordSupabaseCompletedTusUpload failed', + uploadId, + uploadOffset: responseHeaders.get('Upload-Offset'), + uploadLength: responseHeaders.get('Upload-Length'), + error, + }) + } +} + /** * Handle TUS POST request - create a new upload */ @@ -248,7 +341,7 @@ export async function supabaseTusCreateHandler(c: Context): Promise { cloudlog({ requestId, message: 'supabaseTusCreateHandler response', status: response.status }) const responseHeaders = buildTusResponseHeaders() - copyResponseHeaders(response.headers, responseHeaders, ['Upload-Offset', 'Upload-Expires']) + copyResponseHeaders(response.headers, responseHeaders, ['Upload-Offset', 'Upload-Length', 'Upload-Expires']) const location = response.headers.get('Location') if (location) { @@ -267,6 +360,10 @@ export async function supabaseTusCreateHandler(c: Context): Promise { } } + const uploadId = location ? extractUploadIdFromLocation(location) : undefined + if (response.status >= 200 && response.status < 300 && uploadId) + await safeRecordSupabaseCompletedTusUpload(c, uploadId, responseHeaders) + return new Response(responseBody, { status: response.status, headers: responseHeaders }) } @@ -298,9 +395,12 @@ export async function supabaseTusPatchHandler(c: Context): Promise { cloudlog({ requestId, message: 'supabaseTusPatchHandler response', status: response.status }) const responseHeaders = buildTusResponseHeaders() - copyResponseHeaders(response.headers, responseHeaders, ['Upload-Offset', 'Upload-Expires']) + copyResponseHeaders(response.headers, responseHeaders, ['Upload-Offset', 'Upload-Length', 'Upload-Expires']) const body = await readErrorBody(response) + if (response.status >= 200 && response.status < 300 && uploadId) + await safeRecordSupabaseCompletedTusUpload(c, uploadId, responseHeaders) + return new Response(body, { status: response.status, headers: responseHeaders }) } diff --git a/supabase/functions/_backend/files/uploadHandler.ts b/supabase/functions/_backend/files/uploadHandler.ts index 4cb62394d1..faa4ccac66 100644 --- a/supabase/functions/_backend/files/uploadHandler.ts +++ b/supabase/functions/_backend/files/uploadHandler.ts @@ -367,18 +367,22 @@ export class UploadHandler extends DurableObject { const uploadOffset = hasContent ? await this.appendBody(c, r2Key, c.req.raw.body as ReadableStream, 0, uploadInfo) : 0 + const headers = new Headers({ + 'Location': uploadLocation.href, + 'Upload-Expires': expiration.toString(), + 'Upload-Offset': uploadOffset.toString(), + 'Tus-Resumable': TUS_VERSION, + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': ALLOWED_METHODS, + 'Access-Control-Allow-Headers': ALLOWED_HEADERS, + 'Access-Control-Expose-Headers': EXPOSED_HEADERS, + }) + if (uploadInfo.uploadLength != null) + headers.set('Upload-Length', uploadInfo.uploadLength.toString()) + return new Response(null, { status: 201, - headers: new Headers({ - 'Location': uploadLocation.href, - 'Upload-Expires': expiration.toString(), - 'Upload-Offset': uploadOffset.toString(), - 'Tus-Resumable': TUS_VERSION, - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': ALLOWED_METHODS, - 'Access-Control-Allow-Headers': ALLOWED_HEADERS, - 'Access-Control-Expose-Headers': EXPOSED_HEADERS, - }), + headers, }) } @@ -463,17 +467,21 @@ export class UploadHandler extends DurableObject { uploadOffset = await this.appendBody(c, r2Key, c.req.raw.body, currentUploadOffset, uploadInfo) + const headers = new Headers({ + 'Upload-Offset': uploadOffset.toString(), + 'Upload-Expires': (await this.expirationTime()).toString(), + 'Tus-Resumable': TUS_VERSION, + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': ALLOWED_METHODS, + 'Access-Control-Allow-Headers': ALLOWED_HEADERS, + 'Access-Control-Expose-Headers': EXPOSED_HEADERS, + }) + if (uploadInfo.uploadLength != null) + headers.set('Upload-Length', uploadInfo.uploadLength.toString()) + return new Response(null, { status: 204, - headers: new Headers({ - 'Upload-Offset': uploadOffset.toString(), - 'Upload-Expires': (await this.expirationTime()).toString(), - 'Tus-Resumable': TUS_VERSION, - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': ALLOWED_METHODS, - 'Access-Control-Allow-Headers': ALLOWED_HEADERS, - 'Access-Control-Expose-Headers': EXPOSED_HEADERS, - }), + headers, }) } diff --git a/supabase/functions/_backend/files/uploadSize.ts b/supabase/functions/_backend/files/uploadSize.ts new file mode 100644 index 0000000000..be62fc4332 --- /dev/null +++ b/supabase/functions/_backend/files/uploadSize.ts @@ -0,0 +1,67 @@ +import type { Context } from 'hono' +import { cloudlog, cloudlogErr } from '../utils/logging.ts' +import { closeClient, getPgClient } from '../utils/pg.ts' +import { parseAppScopedAttachmentPath } from './util.ts' + +function normalizeUploadedFileSize(fileSize: number): number | null { + if (!Number.isFinite(fileSize) || fileSize <= 0) + return null + return Math.trunc(fileSize) +} + +export async function recordUploadedFileSize(c: Context, s3Path: string, fileSize: number): Promise { + const normalizedSize = normalizeUploadedFileSize(fileSize) + if (normalizedSize == null) { + cloudlog({ requestId: c.get('requestId'), message: 'recordUploadedFileSize skipped invalid size', s3Path, fileSize }) + return + } + + const scopedPath = parseAppScopedAttachmentPath(s3Path) + if (scopedPath?.kind !== 'scoped') { + cloudlog({ requestId: c.get('requestId'), message: 'recordUploadedFileSize skipped invalid path', s3Path }) + return + } + + const pgClient = getPgClient(c, false) + try { + await pgClient.query( + ` + INSERT INTO public.uploaded_file_sizes ( + s3_path, + file_size, + owner_org, + app_id, + updated_at + ) + VALUES ($1, $2, $3, $4, now()) + ON CONFLICT (s3_path) + DO UPDATE SET + file_size = EXCLUDED.file_size, + owner_org = EXCLUDED.owner_org, + app_id = EXCLUDED.app_id, + updated_at = now() + `, + [s3Path, normalizedSize, scopedPath.owner_org, scopedPath.app_id], + ) + } + catch (error) { + cloudlogErr({ requestId: c.get('requestId'), message: 'recordUploadedFileSize failed', s3Path, fileSize: normalizedSize, error }) + } + finally { + await closeClient(c, pgClient) + } +} + +export function getCompletedTusUploadSize(headers: Headers): number | null { + const rawOffset = headers.get('Upload-Offset') + const rawLength = headers.get('Upload-Length') + if (rawOffset == null || rawLength == null) + return null + + const offset = Number.parseInt(rawOffset, 10) + const length = Number.parseInt(rawLength, 10) + if (!Number.isFinite(offset) || !Number.isFinite(length) || offset !== length) + return null + + return normalizeUploadedFileSize(offset) +} diff --git a/supabase/functions/_backend/triggers/on_manifest_create.ts b/supabase/functions/_backend/triggers/on_manifest_create.ts index 6ba5791a07..9ca1f1d97a 100644 --- a/supabase/functions/_backend/triggers/on_manifest_create.ts +++ b/supabase/functions/_backend/triggers/on_manifest_create.ts @@ -3,6 +3,7 @@ import type { MiddlewareKeyVariables } from '../utils/hono.ts' import type { RetryableResult } from '../utils/retry.ts' import type { Database } from '../utils/supabase.types.ts' import { Hono } from 'hono/tiny' +import { recordUploadedFileSize } from '../files/uploadSize.ts' import { BRES, middlewareAPISecret, simpleError, triggerValidator } from '../utils/hono.ts' import { cloudlog, cloudlogErr } from '../utils/logging.ts' import { isRetryablePostgrestResult, retryWithBackoff } from '../utils/retry.ts' @@ -67,6 +68,11 @@ async function runManifestUpdateWithRetry( } async function updateManifestSize(c: Context, record: Database['public']['Tables']['manifest']['Row']) { + if (record.file_size && record.file_size > 0) { + cloudlog({ requestId: c.get('requestId'), message: 'manifest file_size already set', id: record.id, file_size: record.file_size }) + return c.json(BRES) + } + if (!record.s3_path) { cloudlog({ requestId: c.get('requestId'), message: 'No s3 path', id: record.id }) throw simpleError('no_s3_path', 'No s3 path', { record }) @@ -90,6 +96,7 @@ async function updateManifestSize(c: Context, record: Database['public']['Tables .from('manifest') .update({ file_size: size }) .eq('id', record.id)) + await recordUploadedFileSize(c, record.s3_path, size) } catch (updateError) { cloudlog({ requestId: c.get('requestId'), message: 'error update manifest size', error: updateError }) diff --git a/supabase/functions/_backend/triggers/on_version_update.ts b/supabase/functions/_backend/triggers/on_version_update.ts index c9f16668c6..37c50b5e50 100644 --- a/supabase/functions/_backend/triggers/on_version_update.ts +++ b/supabase/functions/_backend/triggers/on_version_update.ts @@ -80,6 +80,33 @@ async function v2PathSize(c: Context, record: Database['public']['Tables']['app_ return true } +async function getUploadedFileSizes(c: Context, s3Paths: string[]): Promise> { + const uniquePaths = Array.from(new Set(s3Paths.filter(path => path.length > 0))) + if (uniquePaths.length === 0) + return new Map() + + const pgClient = getPgClient(c, false) + try { + const result = await pgClient.query<{ s3_path: string, file_size: string | number }>( + ` + SELECT s3_path, file_size + FROM public.uploaded_file_sizes + WHERE s3_path = ANY($1::text[]) + `, + [uniquePaths], + ) + + return new Map(result.rows.map(row => [row.s3_path, Number(row.file_size)])) + } + catch (error) { + cloudlog({ requestId: c.get('requestId'), message: 'error loading uploaded file sizes', error }) + return new Map() + } + finally { + await closeClient(c, pgClient) + } +} + /** * Persists manifest rows and updates aggregate counters when a version includes a manifest payload. */ @@ -96,6 +123,13 @@ async function handleManifest(c: Context, record: Database['public']['Tables'][' // Only create entries if none exist if (!existingEntries?.length && manifestEntries.length > 0) { + const uploadedFileSizes = await getUploadedFileSizes( + c, + manifestEntries + .map(entry => entry.s3_path) + .filter((s3Path): s3Path is string => typeof s3Path === 'string'), + ) + const validEntries = manifestEntries .filter(entry => entry.file_name && entry.file_hash && entry.s3_path) .map(entry => ({ @@ -103,7 +137,8 @@ async function handleManifest(c: Context, record: Database['public']['Tables'][' file_name: normalizeLegacyEncodedManifestFileName(entry.file_name, entry.s3_path)!, file_hash: entry.file_hash!, s3_path: entry.s3_path!, - file_size: 0, + // Never trust client-provided manifest sizes. Only use sizes observed by the upload backend. + file_size: uploadedFileSizes.get(entry.s3_path!) ?? 0, })) if (validEntries.length > 0) { @@ -361,3 +396,8 @@ app.post('/', middlewareAPISecret, triggerValidator('app_versions', 'UPDATE'), ( cloudlog({ requestId: c.get('requestId'), message: 'Update but not deleted' }) return updateIt(c, record) }) + +export const onVersionUpdateTestUtils = { + getUploadedFileSizes, + handleManifest, +} diff --git a/supabase/functions/_backend/triggers/queue_consumer.ts b/supabase/functions/_backend/triggers/queue_consumer.ts index 556ba8da02..8736b657de 100644 --- a/supabase/functions/_backend/triggers/queue_consumer.ts +++ b/supabase/functions/_backend/triggers/queue_consumer.ts @@ -38,6 +38,13 @@ interface Message { } } +interface QueueProcessingResult extends Message { + httpResponse: Response + errorDetails: Awaited> + cfId: string + payloadSize: number +} + export const messagesArraySchema = messageSchema.array() interface FailureDetail { @@ -163,6 +170,72 @@ function generateUUID(): string { return crypto.randomUUID() } +async function processQueueMessage( + c: Context, + queueName: string, + message: Message, + postHelper: typeof http_post_helper = http_post_helper, +): Promise { + const function_name = message.message?.function_name ?? 'unknown' + const function_type = message.message?.function_type ?? 'supabase' + const body = extractMessageBody(message) + if (message.message?.payload === undefined && Object.keys(body).length > 0) { + cloudlog({ + requestId: c.get('requestId'), + message: `[${queueName}] Using legacy queue message body shape for ${function_name}.`, + msgId: message.msg_id, + }) + } + + const cfId = generateUUID() + const payloadSize = JSON.stringify(body).length + + try { + const httpResponse = await postHelper(c, function_name, function_type, body, cfId) + const errorDetails = await extractErrorDetails(httpResponse) + + return { + httpResponse, + errorDetails, + cfId, + payloadSize, + ...message, + } + } + catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + const publicErrorMessage = 'Queue message processing failed' + cloudlogErr({ + requestId: c.get('requestId'), + message: `[${queueName}] Message processing failed before HTTP response`, + error, + function_name, + function_type, + msgId: message.msg_id, + cfId, + }) + + return { + httpResponse: new Response(JSON.stringify({ + error: 'queue_message_processing_failed', + message: publicErrorMessage, + }), { + status: 500, + statusText: publicErrorMessage, + headers: { 'content-type': 'application/json' }, + }), + errorDetails: { + errorCode: 'queue_message_processing_failed', + errorMessage, + bodyPreview: errorMessage.slice(0, 500), + }, + cfId, + payloadSize, + ...message, + } + } +} + async function processQueue(c: Context, db: ReturnType, queueName: string, batchSize: number = DEFAULT_BATCH_SIZE) { const messages = await readQueue(c, db, queueName, batchSize) @@ -185,29 +258,7 @@ async function processQueue(c: Context, db: ReturnType, queu } // Process messages that have been read less than 5 times - const results = await Promise.all(messagesToProcess.map(async (message) => { - const function_name = message.message?.function_name ?? 'unknown' - const function_type = message.message?.function_type ?? 'supabase' - const body = extractMessageBody(message) - if (message.message?.payload === undefined && Object.keys(body).length > 0) { - cloudlog({ - requestId: c.get('requestId'), - message: `[${queueName}] Using legacy queue message body shape for ${function_name}.`, - msgId: message.msg_id, - }) - } - const cfId = generateUUID() - const httpResponse = await http_post_helper(c, function_name, function_type, body, cfId) - const errorDetails = await extractErrorDetails(httpResponse) - - return { - httpResponse, - errorDetails, - cfId, - payloadSize: JSON.stringify(body).length, - ...message, - } - })) + const results = await Promise.all(messagesToProcess.map(message => processQueueMessage(c, queueName, message))) // Update all messages with their CF IDs const cfIdUpdates = results.map(result => ({ @@ -218,7 +269,12 @@ async function processQueue(c: Context, db: ReturnType, queu if (cfIdUpdates.length > 0) { cloudlog({ requestId: c.get('requestId'), message: `[${queueName}] Updating ${cfIdUpdates.length} messages with CF IDs.` }) - await mass_edit_queue_messages_cf_ids(c, db, cfIdUpdates) + try { + await mass_edit_queue_messages_cf_ids(c, db, cfIdUpdates) + } + catch (error) { + cloudlogErr({ requestId: c.get('requestId'), message: `[${queueName}] Failed to update CF IDs, continuing queue cleanup.`, error }) + } } // Batch remove all messages that have succeeded @@ -627,5 +683,6 @@ export const __queueConsumerTestUtils__ = { extractErrorDetails, extractMessageBody, getActionableQueueFailures, + processQueueMessage, sanitizeDiscordResponseBody, } diff --git a/supabase/functions/_backend/utils/supabase.types.ts b/supabase/functions/_backend/utils/supabase.types.ts index 2f13ee152a..0628e4d2be 100644 --- a/supabase/functions/_backend/utils/supabase.types.ts +++ b/supabase/functions/_backend/utils/supabase.types.ts @@ -2411,6 +2411,33 @@ export type Database = { }, ] } + uploaded_file_sizes: { + Row: { + app_id: string + created_at: string + file_size: number + owner_org: string + s3_path: string + updated_at: string + } + Insert: { + app_id: string + created_at?: string + file_size: number + owner_org: string + s3_path: string + updated_at?: string + } + Update: { + app_id?: string + created_at?: string + file_size?: number + owner_org?: string + s3_path?: string + updated_at?: string + } + Relationships: [] + } usage_credit_consumptions: { Row: { applied_at: string diff --git a/supabase/migrations/20260514102319_store_uploaded_file_sizes.sql b/supabase/migrations/20260514102319_store_uploaded_file_sizes.sql new file mode 100644 index 0000000000..e66fa2a2e5 --- /dev/null +++ b/supabase/migrations/20260514102319_store_uploaded_file_sizes.sql @@ -0,0 +1,108 @@ +-- Store object sizes observed by the upload backend. The manifest payload stays +-- client-provided, but manifest.file_size is hydrated only from this table. +CREATE TABLE IF NOT EXISTS public.uploaded_file_sizes ( + s3_path text PRIMARY KEY, + file_size bigint NOT NULL CHECK (file_size > 0), + owner_org uuid NOT NULL, + app_id character varying NOT NULL, + created_at timestamp with time zone DEFAULT NOW() NOT NULL, + updated_at timestamp with time zone DEFAULT NOW() NOT NULL +); + +ALTER TABLE public.uploaded_file_sizes OWNER TO postgres; +ALTER TABLE public.uploaded_file_sizes ENABLE ROW LEVEL SECURITY; + +DROP POLICY IF EXISTS "Deny select on uploaded_file_sizes" ON public.uploaded_file_sizes; +CREATE POLICY "Deny select on uploaded_file_sizes" +ON public.uploaded_file_sizes +AS RESTRICTIVE +FOR SELECT +TO anon, authenticated +USING (false); + +DROP POLICY IF EXISTS "Deny insert on uploaded_file_sizes" ON public.uploaded_file_sizes; +CREATE POLICY "Deny insert on uploaded_file_sizes" +ON public.uploaded_file_sizes +AS RESTRICTIVE +FOR INSERT +TO anon, authenticated +WITH CHECK (false); + +DROP POLICY IF EXISTS "Deny update on uploaded_file_sizes" ON public.uploaded_file_sizes; +CREATE POLICY "Deny update on uploaded_file_sizes" +ON public.uploaded_file_sizes +AS RESTRICTIVE +FOR UPDATE +TO anon, authenticated +USING (false) +WITH CHECK (false); + +DROP POLICY IF EXISTS "Deny delete on uploaded_file_sizes" ON public.uploaded_file_sizes; +CREATE POLICY "Deny delete on uploaded_file_sizes" +ON public.uploaded_file_sizes +AS RESTRICTIVE +FOR DELETE +TO anon, authenticated +USING (false); + +REVOKE ALL ON TABLE public.uploaded_file_sizes FROM PUBLIC; +REVOKE ALL ON TABLE public.uploaded_file_sizes FROM anon; +REVOKE ALL ON TABLE public.uploaded_file_sizes FROM authenticated; +GRANT ALL ON TABLE public.uploaded_file_sizes TO service_role; + +CREATE INDEX IF NOT EXISTS uploaded_file_sizes_owner_org_app_id_idx +ON public.uploaded_file_sizes (owner_org, app_id); + +-- Backend-observed uploads set manifest.file_size before insert, so avoid +-- queuing only rows that match the service-role-only observed size table. +-- Legacy/missing or forged records still keep the fallback R2 HEAD. +CREATE OR REPLACE FUNCTION public.trigger_verified_manifest_create() +RETURNS trigger +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = '' +AS $$ +DECLARE + payload jsonb; +BEGIN + IF NEW.file_size IS NOT NULL + AND NEW.file_size > 0 + AND EXISTS ( + SELECT 1 + FROM public.uploaded_file_sizes u + WHERE u.s3_path = NEW.s3_path + AND u.file_size = NEW.file_size + ) + THEN + RETURN NEW; + END IF; + + payload := jsonb_build_object( + 'function_name', 'on_manifest_create', + 'function_type', NULL, + 'payload', jsonb_build_object( + 'old_record', OLD, + 'record', NEW, + 'type', TG_OP, + 'table', TG_TABLE_NAME, + 'schema', TG_TABLE_SCHEMA + ) + ); + + PERFORM pgmq.send('on_manifest_create', payload); + RETURN NEW; +END; +$$; + +ALTER FUNCTION public.trigger_verified_manifest_create() OWNER TO postgres; +REVOKE ALL ON FUNCTION public.trigger_verified_manifest_create() FROM PUBLIC; +REVOKE ALL ON FUNCTION public.trigger_verified_manifest_create() FROM anon; +REVOKE ALL ON FUNCTION public.trigger_verified_manifest_create() FROM authenticated; +REVOKE ALL ON FUNCTION public.trigger_verified_manifest_create() FROM service_role; + +DROP TRIGGER IF EXISTS on_manifest_create ON public.manifest; + +CREATE TRIGGER on_manifest_create +AFTER INSERT ON public.manifest +FOR EACH ROW +EXECUTE FUNCTION public.trigger_verified_manifest_create(); diff --git a/tests/manifest-uploaded-size.unit.test.ts b/tests/manifest-uploaded-size.unit.test.ts new file mode 100644 index 0000000000..48a755bbc6 --- /dev/null +++ b/tests/manifest-uploaded-size.unit.test.ts @@ -0,0 +1,132 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const closeClientMock = vi.fn() +const getPgClientMock = vi.fn() +const supabaseAdminMock = vi.fn() + +vi.mock('../supabase/functions/_backend/utils/pg.ts', () => ({ + closeClient: closeClientMock, + getDrizzleClient: vi.fn(), + getPgClient: getPgClientMock, +})) + +vi.mock('../supabase/functions/_backend/utils/supabase.ts', () => ({ + supabaseAdmin: supabaseAdminMock, +})) + +vi.mock('../supabase/functions/_backend/utils/stats.ts', () => ({ + createStatsMeta: vi.fn(), +})) + +vi.mock('../supabase/functions/_backend/utils/s3.ts', () => ({ + getPath: vi.fn(), + s3: { + deleteObject: vi.fn(), + getSize: vi.fn(), + }, +})) + +function createContext() { + return { + get: (key: string) => key === 'requestId' ? 'req-test' : undefined, + json: (body: unknown, status = 200) => new Response(JSON.stringify(body), { status }), + } as any +} + +describe('manifest uploaded file sizes', () => { + beforeEach(() => { + vi.resetModules() + vi.clearAllMocks() + }) + + it.concurrent('hydrates manifest file_size from backend-observed upload rows only', async () => { + const uploadedSizesClient = { + query: vi.fn() + .mockResolvedValueOnce({ + rows: [ + { + file_size: '1234', + s3_path: 'orgs/org-1/apps/com.test.app/delta/trusted.js', + }, + ], + }) + .mockResolvedValueOnce({ rows: [] }), + } + getPgClientMock.mockReturnValue(uploadedSizesClient) + + const insertMock = vi.fn().mockResolvedValue({ error: null }) + const manifestLimitMock = vi.fn().mockResolvedValue({ data: [] }) + const appVersionEqMock = vi.fn().mockResolvedValue({ error: null }) + + supabaseAdminMock.mockReturnValue({ + from: vi.fn((table: string) => { + if (table === 'manifest') { + return { + insert: insertMock, + select: vi.fn(() => ({ + eq: vi.fn(() => ({ + limit: manifestLimitMock, + })), + })), + } + } + + if (table === 'app_versions') { + return { + update: vi.fn(() => ({ + eq: appVersionEqMock, + })), + } + } + + throw new Error(`unexpected table ${table}`) + }), + }) + + const { onVersionUpdateTestUtils } = await import('../supabase/functions/_backend/triggers/on_version_update.ts') + + await onVersionUpdateTestUtils.handleManifest(createContext(), { + app_id: 'com.test.app', + id: 42, + manifest: [ + { + file_hash: 'hash-trusted', + file_name: 'trusted.js', + file_size: 999999999, + s3_path: 'orgs/org-1/apps/com.test.app/delta/trusted.js', + }, + { + file_hash: 'hash-missing', + file_name: 'missing.js', + file_size: 888888888, + s3_path: 'orgs/org-1/apps/com.test.app/delta/missing.js', + }, + ], + } as any) + + expect(insertMock).toHaveBeenCalledWith([ + { + app_version_id: 42, + file_hash: 'hash-trusted', + file_name: 'trusted.js', + file_size: 1234, + s3_path: 'orgs/org-1/apps/com.test.app/delta/trusted.js', + }, + { + app_version_id: 42, + file_hash: 'hash-missing', + file_name: 'missing.js', + file_size: 0, + s3_path: 'orgs/org-1/apps/com.test.app/delta/missing.js', + }, + ]) + expect(uploadedSizesClient.query).toHaveBeenNthCalledWith( + 1, + expect.stringContaining('FROM public.uploaded_file_sizes'), + [[ + 'orgs/org-1/apps/com.test.app/delta/trusted.js', + 'orgs/org-1/apps/com.test.app/delta/missing.js', + ]], + ) + }) +}) diff --git a/tests/queue-consumer-message-shape.unit.test.ts b/tests/queue-consumer-message-shape.unit.test.ts index 816e14e95a..7dcf3da3e1 100644 --- a/tests/queue-consumer-message-shape.unit.test.ts +++ b/tests/queue-consumer-message-shape.unit.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from 'vitest' +import { describe, expect, it, vi } from 'vitest' import { __queueConsumerTestUtils__, MAX_QUEUE_READS, messagesArraySchema } from '../supabase/functions/_backend/triggers/queue_consumer.ts' import { parseSchema } from '../supabase/functions/_backend/utils/ark_validation.ts' @@ -139,4 +139,48 @@ describe('queue_consumer legacy message compatibility', () => { errorMessage: 'builder unavailable', }) }) + + it.concurrent('keeps one thrown message failure from failing the whole batch', async () => { + const messages = parseSchema(messagesArraySchema, [ + { + msg_id: 10, + read_ct: 1, + message: { + function_name: 'ok', + payload: { ok: true }, + }, + }, + { + msg_id: 11, + read_ct: 1, + message: { + function_name: 'bad', + payload: { ok: false }, + }, + }, + ]) + + const postHelper = vi.fn(async (_c, functionName: string) => { + if (functionName === 'bad') + throw new Error('network down') + return new Response(JSON.stringify({ status: 'ok' }), { + headers: { 'content-type': 'application/json' }, + status: 200, + }) + }) + + const context = { get: vi.fn(() => 'test-request') } as any + const results = await Promise.all(messages.map(message => + __queueConsumerTestUtils__.processQueueMessage(context, 'test_queue', message, postHelper as any), + )) + + expect(results.map(result => [result.msg_id, result.httpResponse.status])).toEqual([ + [10, 200], + [11, 500], + ]) + expect(results[1]?.errorDetails).toMatchObject({ + errorCode: 'queue_message_processing_failed', + errorMessage: 'network down', + }) + }) })