Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions cli/src/bundle/partial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ interface PartialEncryptionOptions {
ivSessionKey: string
}

interface PartialUploadManifestEntry {
file_name: string
s3_path: string
file_hash: string
}

export async function uploadPartial(
apikey: string,
manifest: manifestType,
Expand All @@ -195,7 +201,7 @@ export async function uploadPartial(
orgId: string,
encryptionOptions: PartialEncryptionOptions | undefined,
options: OptionsUpload,
): Promise<any[] | null> {
): Promise<PartialUploadManifestEntry[] | null> {
const spinner = spinnerC()
spinner.start('Preparing partial update with TUS protocol')
const startTime = performance.now()
Expand Down Expand Up @@ -238,7 +244,7 @@ export async function uploadPartial(
spinner.message(`Uploading ${totalFiles} files using TUS protocol`)

// Helper function to upload a single file
const uploadFile = async (file: manifestType[number]) => {
const uploadFile = async (file: manifestType[number]): Promise<PartialUploadManifestEntry> => {
const finalFilePath = join(path, file.file)
const filePathUnix = convertToUnixPath(file.file)

Expand Down Expand Up @@ -285,7 +291,7 @@ export async function uploadPartial(
})
}

return new Promise((resolve, reject) => {
return new Promise<PartialUploadManifestEntry>((resolve, reject) => {
spinner.message(`Prepare upload partial file: ${filePathUnix}`)
// Get the MIME type for this file (based on original filename, not the R2 path)
const filetype = getContentType(uploadPathUnix)
Expand Down Expand Up @@ -341,9 +347,9 @@ export async function uploadPartial(
})
}

// Process files in batches of 1000 to avoid overwhelming the server
// Process files in batches to avoid overwhelming the server
const BATCH_SIZE = 500
const results: any[] = []
const results: PartialUploadManifestEntry[] = []

for (let i = 0; i < manifest.length; i += BATCH_SIZE) {
const batch = manifest.slice(i, i + BATCH_SIZE)
Expand Down
27 changes: 27 additions & 0 deletions cli/src/types/supabase.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2330,6 +2330,33 @@ export type Database = {
},
]
}
uploaded_file_sizes: {
Row: {
app_id: string
created_at: string
file_size: number
owner_org: string
s3_path: string
updated_at: string
}
Insert: {
app_id: string
created_at?: string
file_size: number
owner_org: string
s3_path: string
updated_at?: string
}
Update: {
app_id?: string
created_at?: string
file_size?: number
owner_org?: string
s3_path?: string
updated_at?: string
}
Relationships: []
}
usage_credit_consumptions: {
Row: {
applied_at: string
Expand Down
16 changes: 15 additions & 1 deletion supabase/functions/_backend/files/files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { app as files_config } from './files_config.ts'
import { parseUploadMetadata } from './parse.ts'
import { DEFAULT_RETRY_PARAMS, RetryBucket } from './retry.ts'
import { supabaseTusCreateHandler, supabaseTusHeadHandler, supabaseTusPatchHandler } from './supabaseTusProxy.ts'
import { getCompletedTusUploadSize, recordUploadedFileSize } from './uploadSize.ts'
import { ALLOWED_HEADERS, ALLOWED_METHODS, buildFileHttpMetadata, EXPOSED_HEADERS, getSafeAttachmentReadCandidateKeys, headFirstExistingAttachmentCandidate, isRetryableDurableObjectResetError, MAX_UPLOAD_LENGTH_BYTES, parseAppScopedAttachmentPath, toBase64, TUS_EXTENSIONS, TUS_VERSION, withNoTransformCacheControl, X_CHECKSUM_SHA256, X_UPLOAD_HANDLER_RETRYABLE } from './util.ts'

const DO_CALL_TIMEOUT = 1000 * 60 * 30 // 30 minutes
Expand Down Expand Up @@ -619,6 +620,17 @@ function buildNormalizedUploadMetadataHeader(c: Context, filename: string): stri
return metadata.join(',')
}

async function recordCompletedTusUpload(c: Context, response: Response) {
if (response.status < 200 || response.status >= 300)
return

const fileSize = getCompletedTusUploadSize(response.headers)
if (fileSize == null)
return

await recordUploadedFileSize(c, c.get('fileId') as string, fileSize)
}

// TUS protocol requests (POST/PATCH/HEAD) that get forwarded to a durable object
async function uploadHandler(c: Context) {
const requestId = c.get('fileId') as string
Expand Down Expand Up @@ -652,7 +664,9 @@ async function uploadHandler(c: Context) {
requestInit.duplex = 'half'
}
const request = new Request(c.req.url, requestInit)
return await fetchUploadHandlerWithRetry(c, handler, request)
const response = await fetchUploadHandlerWithRetry(c, handler, request)
await recordCompletedTusUpload(c, response)
return response
}

async function setKeyFromMetadata(c: Context, next: Next) {
Expand Down
116 changes: 108 additions & 8 deletions supabase/functions/_backend/files/supabaseTusProxy.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { Context } from 'hono'
import { cloudlog } from '../utils/logging.ts'
import { cloudlog, cloudlogErr } from '../utils/logging.ts'
import { closeClient, getPgClient } from '../utils/pg.ts'
import { getEnv } from '../utils/utils.ts'
import { parseUploadMetadata } from './parse.ts'
import { getCompletedTusUploadSize, recordUploadedFileSize } from './uploadSize.ts'
import { ALLOWED_HEADERS, ALLOWED_METHODS, EXPOSED_HEADERS, MAX_UPLOAD_LENGTH_BYTES, TUS_EXTENSIONS, TUS_VERSION } from './util.ts'

const BUCKET_NAME = 'capgo'
Expand Down Expand Up @@ -55,12 +57,9 @@
}

/**
* Rewrite Supabase Location header to Capgo API URL
* Extract uploadId from Supabase URL in a robust way.
*/
function rewriteLocationHeader(c: Context, supabaseLocation: string): string {
const requestId = c.get('requestId')

// Extract uploadId from Supabase URL in a robust way
function extractUploadIdFromLocation(supabaseLocation: string): string | undefined {
let uploadId: string | undefined
try {
const url = new URL(supabaseLocation)
Expand All @@ -72,6 +71,15 @@
const pathSegments = pathWithoutQuery.split('/').filter(Boolean)
uploadId = pathSegments[pathSegments.length - 1]
}
return uploadId
}

/**
* Rewrite Supabase Location header to Capgo API URL
*/
function rewriteLocationHeader(c: Context, supabaseLocation: string): string {
const requestId = c.get('requestId')
const uploadId = extractUploadIdFromLocation(supabaseLocation)

if (!uploadId) {
cloudlog({ requestId, message: 'rewriteLocationHeader - failed to extract uploadId', supabaseLocation })
Expand Down Expand Up @@ -209,6 +217,91 @@
}
}

async function readSupabaseTusProgressHeaders(c: Context, uploadId: string): Promise<Headers | null> {
const requestId = c.get('requestId')
const result = await proxyToSupabase(requestId, 'supabaseTusProgressHead', buildSupabaseTusUrl(c, uploadId), {
method: 'HEAD',
headers: buildSupabaseAuthHeaders(c),
})

if ('error' in result || !result.ok)
return null

return result.headers
}

async function readSupabaseStorageObjectSize(c: Context, s3Path: string): Promise<number | null> {
const pgClient = getPgClient(c, false)
try {
const result = await pgClient.query<{ file_size: string | number | null }>(
`
SELECT
CASE
WHEN metadata ->> 'size' ~ '^[0-9]+$' THEN (metadata ->> 'size')::bigint
WHEN metadata ->> 'contentLength' ~ '^[0-9]+$' THEN (metadata ->> 'contentLength')::bigint
ELSE NULL
END AS file_size
FROM storage.objects
WHERE bucket_id = $1
AND name = $2
LIMIT 1
`,
[BUCKET_NAME, s3Path],
)
const fileSize = Number(result.rows[0]?.file_size)
return Number.isFinite(fileSize) && fileSize > 0 ? fileSize : null
}
finally {
await closeClient(c, pgClient)
}
}

async function recordSupabaseCompletedTusUpload(c: Context, uploadId: string, responseHeaders: Headers): Promise<void> {
const requestId = c.get('requestId')
const rawFileId = c.get('fileId')
if (typeof rawFileId !== 'string' || rawFileId.length === 0) {
cloudlog({ requestId, message: 'recordSupabaseCompletedTusUpload missing fileId', uploadId })
return
}

const s3Path = rawFileId
// Request Upload-Length is protocol input, not a trusted source for manifest sizes.
const hasCompleteProgressHeaders = responseHeaders.has('Upload-Offset') && responseHeaders.has('Upload-Length')
Comment thread
coderabbitai[bot] marked this conversation as resolved.
let fileSize = getCompletedTusUploadSize(responseHeaders)
if (hasCompleteProgressHeaders && fileSize == null)
return

if (fileSize == null) {
const progressHeaders = await readSupabaseTusProgressHeaders(c, uploadId)
const hasProgressHeaders = progressHeaders?.has('Upload-Offset') && progressHeaders.has('Upload-Length')
fileSize = progressHeaders ? getCompletedTusUploadSize(progressHeaders) : null
if (hasProgressHeaders && fileSize == null)
return
}

if (fileSize == null)
fileSize = await readSupabaseStorageObjectSize(c, s3Path)

Check warning on line 283 in supabase/functions/_backend/files/supabaseTusProxy.ts

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Prefer using nullish coalescing operator (`??=`) instead of an assignment expression, as it is simpler to read.

See more on https://sonarcloud.io/project/issues?id=Cap-go_capgo&issues=AZ4jWC0Tq8NyrMDDXQnm&open=AZ4jWC0Tq8NyrMDDXQnm&pullRequest=2265

if (fileSize != null)
await recordUploadedFileSize(c, s3Path, fileSize)
}

async function safeRecordSupabaseCompletedTusUpload(c: Context, uploadId: string, responseHeaders: Headers): Promise<void> {
try {
await recordSupabaseCompletedTusUpload(c, uploadId, responseHeaders)
}
catch (error) {
cloudlogErr({
requestId: c.get('requestId'),
message: 'recordSupabaseCompletedTusUpload failed',
uploadId,
uploadOffset: responseHeaders.get('Upload-Offset'),
uploadLength: responseHeaders.get('Upload-Length'),
error,
})
}
}

/**
* Handle TUS POST request - create a new upload
*/
Expand Down Expand Up @@ -248,7 +341,7 @@
cloudlog({ requestId, message: 'supabaseTusCreateHandler response', status: response.status })

const responseHeaders = buildTusResponseHeaders()
copyResponseHeaders(response.headers, responseHeaders, ['Upload-Offset', 'Upload-Expires'])
copyResponseHeaders(response.headers, responseHeaders, ['Upload-Offset', 'Upload-Length', 'Upload-Expires'])

const location = response.headers.get('Location')
if (location) {
Expand All @@ -267,6 +360,10 @@
}
}

const uploadId = location ? extractUploadIdFromLocation(location) : undefined
if (response.status >= 200 && response.status < 300 && uploadId)
await safeRecordSupabaseCompletedTusUpload(c, uploadId, responseHeaders)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
return new Response(responseBody, { status: response.status, headers: responseHeaders })
}

Expand Down Expand Up @@ -298,9 +395,12 @@
cloudlog({ requestId, message: 'supabaseTusPatchHandler response', status: response.status })

const responseHeaders = buildTusResponseHeaders()
copyResponseHeaders(response.headers, responseHeaders, ['Upload-Offset', 'Upload-Expires'])
copyResponseHeaders(response.headers, responseHeaders, ['Upload-Offset', 'Upload-Length', 'Upload-Expires'])

const body = await readErrorBody(response)
if (response.status >= 200 && response.status < 300 && uploadId)
await safeRecordSupabaseCompletedTusUpload(c, uploadId, responseHeaders)

return new Response(body, { status: response.status, headers: responseHeaders })
}

Expand Down
46 changes: 27 additions & 19 deletions supabase/functions/_backend/files/uploadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,18 +367,22 @@ export class UploadHandler extends DurableObject {
const uploadOffset = hasContent
? await this.appendBody(c, r2Key, c.req.raw.body as ReadableStream<Uint8Array>, 0, uploadInfo)
: 0
const headers = new Headers({
'Location': uploadLocation.href,
'Upload-Expires': expiration.toString(),
'Upload-Offset': uploadOffset.toString(),
'Tus-Resumable': TUS_VERSION,
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': ALLOWED_METHODS,
'Access-Control-Allow-Headers': ALLOWED_HEADERS,
'Access-Control-Expose-Headers': EXPOSED_HEADERS,
})
if (uploadInfo.uploadLength != null)
headers.set('Upload-Length', uploadInfo.uploadLength.toString())

return new Response(null, {
status: 201,
headers: new Headers({
'Location': uploadLocation.href,
'Upload-Expires': expiration.toString(),
'Upload-Offset': uploadOffset.toString(),
'Tus-Resumable': TUS_VERSION,
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': ALLOWED_METHODS,
'Access-Control-Allow-Headers': ALLOWED_HEADERS,
'Access-Control-Expose-Headers': EXPOSED_HEADERS,
}),
headers,
})
}

Expand Down Expand Up @@ -463,17 +467,21 @@ export class UploadHandler extends DurableObject {

uploadOffset = await this.appendBody(c, r2Key, c.req.raw.body, currentUploadOffset, uploadInfo)

const headers = new Headers({
'Upload-Offset': uploadOffset.toString(),
'Upload-Expires': (await this.expirationTime()).toString(),
'Tus-Resumable': TUS_VERSION,
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': ALLOWED_METHODS,
'Access-Control-Allow-Headers': ALLOWED_HEADERS,
'Access-Control-Expose-Headers': EXPOSED_HEADERS,
})
if (uploadInfo.uploadLength != null)
headers.set('Upload-Length', uploadInfo.uploadLength.toString())

return new Response(null, {
status: 204,
headers: new Headers({
'Upload-Offset': uploadOffset.toString(),
'Upload-Expires': (await this.expirationTime()).toString(),
'Tus-Resumable': TUS_VERSION,
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': ALLOWED_METHODS,
'Access-Control-Allow-Headers': ALLOWED_HEADERS,
'Access-Control-Expose-Headers': EXPOSED_HEADERS,
}),
headers,
})
}

Expand Down
Loading
Loading