diff --git a/aws-storage-s3/api/aws-storage-s3.api b/aws-storage-s3/api/aws-storage-s3.api index 8843367323..ea58e5252e 100644 --- a/aws-storage-s3/api/aws-storage-s3.api +++ b/aws-storage-s3/api/aws-storage-s3.api @@ -81,13 +81,16 @@ public final class com/amplifyframework/storage/s3/configuration/AWSS3StoragePlu public static final field Companion Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration$Companion; public synthetic fun (Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration$Builder;Lkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun getAWSS3PluginPrefixResolver (Lcom/amplifyframework/auth/AuthCredentialsProvider;)Lcom/amplifyframework/storage/s3/configuration/AWSS3PluginPrefixResolver; + public final fun getProgressStallTimeout ()Lcom/amplifyframework/storage/ProgressStallTimeout; } public final class com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration$Builder { public fun ()V public final fun build ()Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration; public final fun getAwsS3PluginPrefixResolver ()Lcom/amplifyframework/storage/s3/configuration/AWSS3PluginPrefixResolver; + public final fun getProgressStallTimeout ()Lcom/amplifyframework/storage/ProgressStallTimeout; public final fun setAwsS3PluginPrefixResolver (Lcom/amplifyframework/storage/s3/configuration/AWSS3PluginPrefixResolver;)V + public final fun setProgressStallTimeout (Lcom/amplifyframework/storage/ProgressStallTimeout;)V } public final class com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration$Companion { @@ -128,11 +131,13 @@ public final class com/amplifyframework/storage/s3/operation/AWSS3StorageRemoveO public final class com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperation : com/amplifyframework/storage/operation/StorageUploadFileOperation { public fun (Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;)V + public fun (Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;J)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/transfer/TransferObserver;)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/transfer/TransferObserver;Lcom/amplifyframework/core/Consumer;)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/transfer/TransferObserver;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;)V + public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/transfer/TransferObserver;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;)V public fun cancel ()V public fun getTransferState ()Lcom/amplifyframework/storage/TransferState; public fun pause ()V @@ -143,11 +148,13 @@ public final class com/amplifyframework/storage/s3/operation/AWSS3StorageUploadF public final class com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperation : com/amplifyframework/storage/operation/StorageUploadInputStreamOperation { public fun (Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;)V + public fun (Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;J)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/transfer/TransferObserver;)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/transfer/TransferObserver;Lcom/amplifyframework/core/Consumer;)V public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/transfer/TransferObserver;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;)V + public fun (Ljava/lang/String;Lcom/amplifyframework/storage/s3/service/StorageService;Ljava/util/concurrent/ExecutorService;Lcom/amplifyframework/auth/AuthCredentialsProvider;Lcom/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration;Lcom/amplifyframework/storage/s3/request/AWSS3StorageUploadRequest;Lcom/amplifyframework/storage/s3/transfer/TransferObserver;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;Lcom/amplifyframework/core/Consumer;)V public fun cancel ()V public fun getTransferState ()Lcom/amplifyframework/storage/TransferState; public fun pause ()V @@ -246,6 +253,7 @@ public final class com/amplifyframework/storage/s3/options/AWSS3StorageUploadFil public static fun defaultInstance ()Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions; public fun equals (Ljava/lang/Object;)Z public static fun from (Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions;)Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions$Builder; + public fun getProgressStallTimeout ()Lcom/amplifyframework/storage/ProgressStallTimeout; public fun getServerSideEncryption ()Lcom/amplifyframework/storage/s3/ServerSideEncryption; public fun hashCode ()I public fun toString ()Ljava/lang/String; @@ -256,6 +264,7 @@ public final class com/amplifyframework/storage/s3/options/AWSS3StorageUploadFil public synthetic fun build ()Lcom/amplifyframework/storage/options/StorageOptions; public synthetic fun build ()Lcom/amplifyframework/storage/options/StorageUploadFileOptions; public fun build ()Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions; + public fun progressStallTimeout (Lcom/amplifyframework/storage/ProgressStallTimeout;)Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions$Builder; public fun serverSideEncryption (Lcom/amplifyframework/storage/s3/ServerSideEncryption;)Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions$Builder; public fun setUseAccelerateEndpoint (Z)Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions$Builder; } @@ -265,6 +274,7 @@ public final class com/amplifyframework/storage/s3/options/AWSS3StorageUploadInp public static fun defaultInstance ()Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions; public fun equals (Ljava/lang/Object;)Z public static fun from (Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions;)Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions$Builder; + public fun getProgressStallTimeout ()Lcom/amplifyframework/storage/ProgressStallTimeout; public fun getServerSideEncryption ()Lcom/amplifyframework/storage/s3/ServerSideEncryption; public fun hashCode ()I public fun toString ()Ljava/lang/String; @@ -275,6 +285,7 @@ public final class com/amplifyframework/storage/s3/options/AWSS3StorageUploadInp public synthetic fun build ()Lcom/amplifyframework/storage/options/StorageOptions; public synthetic fun build ()Lcom/amplifyframework/storage/options/StorageUploadInputStreamOptions; public fun build ()Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions; + public fun progressStallTimeout (Lcom/amplifyframework/storage/ProgressStallTimeout;)Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions$Builder; public fun serverSideEncryption (Lcom/amplifyframework/storage/s3/ServerSideEncryption;)Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions$Builder; public fun setUseAccelerateEndpoint (Z)Lcom/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions$Builder; } diff --git a/aws-storage-s3/src/androidTest/java/com/amplifyframework/storage/s3/AWSS3StoragePathUploadTest.kt b/aws-storage-s3/src/androidTest/java/com/amplifyframework/storage/s3/AWSS3StoragePathUploadTest.kt index a8c51f0792..97971b494c 100644 --- a/aws-storage-s3/src/androidTest/java/com/amplifyframework/storage/s3/AWSS3StoragePathUploadTest.kt +++ b/aws-storage-s3/src/androidTest/java/com/amplifyframework/storage/s3/AWSS3StoragePathUploadTest.kt @@ -23,6 +23,7 @@ import com.amplifyframework.core.async.Resumable import com.amplifyframework.hub.HubChannel import com.amplifyframework.hub.HubEvent import com.amplifyframework.hub.SubscriptionToken +import com.amplifyframework.storage.ProgressStallTimeout import com.amplifyframework.storage.StorageCategory import com.amplifyframework.storage.StorageChannelEventName import com.amplifyframework.storage.StorageException @@ -363,6 +364,38 @@ class AWSS3StoragePathUploadTest : DeviceFarmTestBase() { ) } + /** + * Given: A progress stall timeout configured on upload options + * When: A small file is uploaded over the network + * Then: The upload completes successfully (the stall timer must not break the happy path) + */ + @Test + fun testUploadSmallFileWithProgressStallTimeoutOptionCompletesSuccessfully() { + val uploadFile: File = RandomTempFile(SMALL_FILE_SIZE) + storagePath = StoragePath.fromString("public/${uploadFile.name}") + val options = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Interval(120)) + .build() + + synchronousStorage.uploadFile(storagePath, uploadFile, options) + } + + /** + * Given: A progress stall timeout configured on upload options for a multipart upload + * When: A file larger than the multipart threshold is uploaded + * Then: The upload completes successfully across all parts + */ + @Test + fun testUploadLargeFileWithProgressStallTimeoutOptionCompletesSuccessfully() { + val uploadFile: File = RandomTempFile(LARGE_FILE_SIZE) + storagePath = StoragePath.fromString("public/${uploadFile.name}") + val options = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Interval(120)) + .build() + + synchronousStorage.uploadFile(storagePath, uploadFile, options, EXTENDED_TIMEOUT_MS) + } + @Test(expected = StorageException::class) fun testUploadUnauthenticatedProtectedAccess() { val uploadFile: File = RandomTempFile(SMALL_FILE_SIZE) diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java index be5bfdd6fa..ae244e4eaa 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/AWSS3StoragePlugin.java @@ -31,6 +31,7 @@ import com.amplifyframework.storage.BucketInfo; import com.amplifyframework.storage.InvalidStorageBucketException; import com.amplifyframework.storage.OutputsStorageBucket; +import com.amplifyframework.storage.ProgressStallTimeout; import com.amplifyframework.storage.ResolvedStorageBucket; import com.amplifyframework.storage.StorageAccessLevel; import com.amplifyframework.storage.StorageBucket; @@ -175,7 +176,8 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf @VisibleForTesting AWSS3StoragePlugin(AuthCredentialsProvider authCredentialsProvider) { - this((context, region, bucket, clientProvider, transferStatusUpdater) -> + this( + (context, region, bucket, clientProvider, transferStatusUpdater, defaultProgressStallTimeoutSeconds) -> new AWSS3StorageService( context, region, @@ -183,7 +185,8 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf authCredentialsProvider, AWS_S3_STORAGE_PLUGIN_KEY, clientProvider, - transferStatusUpdater + transferStatusUpdater, + defaultProgressStallTimeoutSeconds ), authCredentialsProvider, new AWSS3StoragePluginConfiguration.Builder().build()); @@ -193,7 +196,8 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf AWSS3StoragePlugin(AuthCredentialsProvider authCredentialsProvider, AWSS3StoragePluginConfiguration awss3StoragePluginConfiguration) { - this((context, region, bucket, clientProvider, transferStatusUpdater) -> + this( + (context, region, bucket, clientProvider, transferStatusUpdater, defaultProgressStallTimeoutSeconds) -> new AWSS3StorageService( context, region, @@ -201,7 +205,8 @@ public AWSS3StoragePlugin(AWSS3StoragePluginConfiguration awsS3StoragePluginConf authCredentialsProvider, AWS_S3_STORAGE_PLUGIN_KEY, clientProvider, - transferStatusUpdater + transferStatusUpdater, + defaultProgressStallTimeoutSeconds ), authCredentialsProvider, awss3StoragePluginConfiguration); @@ -301,17 +306,22 @@ private void configure( try { this.transferStatusUpdater = new TransferStatusUpdater(TransferDB.Companion.getInstance(context)); + long defaultProgressStallTimeoutSeconds = awsS3StoragePluginConfiguration + .getProgressStallTimeout() + .getSecondsForStallTimer(); this.awss3StorageServiceContainer = new AWSS3StorageServiceContainer( context, storageServiceFactory, (S3StorageTransferClientProvider) clientProvider, - transferStatusUpdater + transferStatusUpdater, + defaultProgressStallTimeoutSeconds ); this.defaultStorageService = storageServiceFactory.create( context, region, bucket.getBucketInfo().getBucketName(), clientProvider, - transferStatusUpdater + transferStatusUpdater, + defaultProgressStallTimeoutSeconds ); this.awss3StorageServiceContainer.put(bucket.getBucketInfo().getBucketName(), this.defaultStorageService); } catch (RuntimeException exception) { @@ -657,6 +667,12 @@ public StorageUploadFileOperation uploadFile( GetStorageServiceResult result = getStorageServiceResult(options.getBucket()); + long stallTimeoutSeconds = resolveProgressStallTimeoutSeconds( + options instanceof AWSS3StorageUploadFileOptions + ? ((AWSS3StorageUploadFileOptions) options).getProgressStallTimeout() + : null + ); + AWSS3StorageUploadFileOperation operation = new AWSS3StorageUploadFileOperation( result.storageService, executorService, @@ -665,7 +681,8 @@ public StorageUploadFileOperation uploadFile( awsS3StoragePluginConfiguration, onProgress, onSuccess, - onError + onError, + stallTimeoutSeconds ); handleGetStorageServiceResult(onError, result, operation); @@ -685,6 +702,11 @@ public StorageUploadFileOperation uploadFile( ) { boolean useAccelerateEndpoint = options instanceof AWSS3StorageUploadFileOptions && ((AWSS3StorageUploadFileOptions) options).useAccelerateEndpoint(); + long stallTimeoutSeconds = resolveProgressStallTimeoutSeconds( + options instanceof AWSS3StorageUploadFileOptions + ? ((AWSS3StorageUploadFileOptions) options).getProgressStallTimeout() + : null + ); AWSS3StoragePathUploadRequest request = new AWSS3StoragePathUploadRequest<>( path, local, @@ -693,7 +715,8 @@ public StorageUploadFileOperation uploadFile( ? ((AWSS3StorageUploadFileOptions) options).getServerSideEncryption() : ServerSideEncryption.NONE, options.getMetadata(), - useAccelerateEndpoint + useAccelerateEndpoint, + stallTimeoutSeconds ); GetStorageServiceResult result = getStorageServiceResult(options.getBucket()); @@ -791,6 +814,12 @@ public StorageUploadInputStreamOperation uploadInputStream( GetStorageServiceResult result = getStorageServiceResult(options.getBucket()); + long stallTimeoutSeconds = resolveProgressStallTimeoutSeconds( + options instanceof AWSS3StorageUploadInputStreamOptions + ? ((AWSS3StorageUploadInputStreamOptions) options).getProgressStallTimeout() + : null + ); + AWSS3StorageUploadInputStreamOperation operation = new AWSS3StorageUploadInputStreamOperation( result.storageService, executorService, @@ -799,7 +828,8 @@ public StorageUploadInputStreamOperation uploadInputStream( request, onProgress, onSuccess, - onError + onError, + stallTimeoutSeconds ); handleGetStorageServiceResult(onError, result, operation); @@ -819,6 +849,11 @@ public StorageUploadInputStreamOperation uploadInputStream( ) { boolean useAccelerateEndpoint = options instanceof AWSS3StorageUploadInputStreamOptions && ((AWSS3StorageUploadInputStreamOptions) options).useAccelerateEndpoint(); + long stallTimeoutSeconds = resolveProgressStallTimeoutSeconds( + options instanceof AWSS3StorageUploadInputStreamOptions + ? ((AWSS3StorageUploadInputStreamOptions) options).getProgressStallTimeout() + : null + ); AWSS3StoragePathUploadRequest request = new AWSS3StoragePathUploadRequest<>( path, local, @@ -827,7 +862,8 @@ public StorageUploadInputStreamOperation uploadInputStream( ? ((AWSS3StorageUploadInputStreamOptions) options).getServerSideEncryption() : ServerSideEncryption.NONE, options.getMetadata(), - useAccelerateEndpoint + useAccelerateEndpoint, + stallTimeoutSeconds ); GetStorageServiceResult result = getStorageServiceResult(options.getBucket()); @@ -848,6 +884,26 @@ public StorageUploadInputStreamOperation uploadInputStream( return operation; } + /** + * Resolves the effective stall-timer interval in seconds for an upload. + * + *

If {@code override} is non-null, it takes precedence over the plugin default. This + * includes {@link ProgressStallTimeout.Disabled}, which lets callers explicitly opt a single + * upload out of stall detection even when the plugin enables it. If {@code override} is + * {@code null}, the plugin-wide default from + * {@link com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration} is + * used. Returns {@code 0} whenever the resolved timeout disables stall detection.

+ * + * @param override per-upload override or {@code null} to defer to the plugin default + * @return the stall interval in seconds, or {@code 0} if disabled + */ + private long resolveProgressStallTimeoutSeconds(@Nullable ProgressStallTimeout override) { + ProgressStallTimeout effective = override != null + ? override + : awsS3StoragePluginConfiguration.getProgressStallTimeout(); + return effective.getSecondsForStallTimer(); + } + @SuppressWarnings("deprecation") @NonNull @Override diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/TransferOperations.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/TransferOperations.kt index 7cb09f222c..e5167e4a07 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/TransferOperations.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/TransferOperations.kt @@ -49,7 +49,8 @@ internal object TransferOperations { workManager: WorkManager, workerObserver: TransferWorkerObserver, transferDB: TransferDB, - listener: TransferListener? + listener: TransferListener?, + progressStallTimeoutSeconds: Long = 0L ): TransferObserver { if (transferRecord.isMultipart == 1) { enqueueMultiPartUpload( @@ -58,14 +59,22 @@ internal object TransferOperations { workManager, workerObserver, transferStatusUpdater, - transferDB + transferDB, + progressStallTimeoutSeconds ) transferStatusUpdater.registerMultiPartTransferListener( transferRecord.id, MultiPartUploadTaskListener(transferRecord, transferDB, transferStatusUpdater) ) } else { - enqueueTransfer(transferRecord, pluginKey, workManager, workerObserver, transferStatusUpdater) + enqueueTransfer( + transferRecord, + pluginKey, + workManager, + workerObserver, + transferStatusUpdater, + progressStallTimeoutSeconds + ) } return TransferObserver( transferRecord.id, @@ -103,10 +112,20 @@ internal object TransferOperations { transferStatusUpdater: TransferStatusUpdater, workManager: WorkManager, workerObserver: TransferWorkerObserver, - transferDB: TransferDB + transferDB: TransferDB, + progressStallTimeoutSeconds: Long = 0L ): Boolean { if (!TransferState.isStarted(transferRecord.state) && !TransferState.isInTerminalState(transferRecord.state)) { - start(transferRecord, pluginKey, transferStatusUpdater, workManager, workerObserver, transferDB, null) + start( + transferRecord, + pluginKey, + transferStatusUpdater, + workManager, + workerObserver, + transferDB, + null, + progressStallTimeoutSeconds + ) if (transferRecord.isMultipart == 0) { transferStatusUpdater.updateTransferState(transferRecord.id, TransferState.RESUMED_WAITING) } @@ -164,7 +183,8 @@ internal object TransferOperations { pluginKey: String, workManager: WorkManager, transferWorkerObserver: TransferWorkerObserver, - transferStatusUpdater: TransferStatusUpdater + transferStatusUpdater: TransferStatusUpdater, + progressStallTimeoutSeconds: Long ) { val type = transferRecord.type ?: throw IllegalStateException("Transfer type missing") val workerClassName = @@ -179,7 +199,8 @@ internal object TransferOperations { workDataOf( BaseTransferWorker.TRANSFER_RECORD_ID to transferRecord.id, RouterWorker.WORKER_CLASS_NAME to workerClassName, - BaseTransferWorker.WORKER_ID to pluginKey + BaseTransferWorker.WORKER_ID to pluginKey, + BaseTransferWorker.PROGRESS_STALL_TIMEOUT_SECONDS to progressStallTimeoutSeconds ), listOf(pluginKey, transferRecord.id.toString()) ) @@ -197,10 +218,11 @@ internal object TransferOperations { workManager: WorkManager, transferWorkerObserver: TransferWorkerObserver, transferStatusUpdater: TransferStatusUpdater, - transferDB: TransferDB + transferDB: TransferDB, + progressStallTimeoutSeconds: Long ) { transferRecord.multipartId?.let { - val pendingParts = pendingParts(transferRecord, pluginKey, transferDB) + val pendingParts = pendingParts(transferRecord, pluginKey, transferDB, progressStallTimeoutSeconds) if (pendingParts.size > 0) { workManager .beginUniqueWork( @@ -225,7 +247,7 @@ internal object TransferOperations { ExistingWorkPolicy.KEEP, initiateRequest(transferRecord, pluginKey, transferStatusUpdater) ) - .then(pendingParts(transferRecord, pluginKey, transferDB)) + .then(pendingParts(transferRecord, pluginKey, transferDB, progressStallTimeoutSeconds)) .then(completeRequest(transferRecord, pluginKey, transferStatusUpdater)) .enqueue() transferStatusUpdater.updateTransferState(transferRecord.id, TransferState.WAITING) @@ -254,7 +276,12 @@ internal object TransferOperations { return request } - private fun pendingParts(transferRecord: TransferRecord, pluginKey: String, transferDB: TransferDB) = let { + private fun pendingParts( + transferRecord: TransferRecord, + pluginKey: String, + transferDB: TransferDB, + progressStallTimeoutSeconds: Long = 0L + ) = let { val listOfPendingParts = transferDB.getNonCompletedPartRequestsFromDB(transferRecord.id) val pendingPartRequest = mutableListOf() for (part in listOfPendingParts) { @@ -266,7 +293,8 @@ internal object TransferOperations { BaseTransferWorker.PART_RECORD_ID to part, BaseTransferWorker.MULTI_PART_UPLOAD_ID to transferRecord.multipartId, RouterWorker.WORKER_CLASS_NAME to PartUploadTransferWorker::class.java.name, - BaseTransferWorker.WORKER_ID to pluginKey + BaseTransferWorker.WORKER_ID to pluginKey, + BaseTransferWorker.PROGRESS_STALL_TIMEOUT_SECONDS to progressStallTimeoutSeconds ), listOf(transferRecord.id.toString(), pluginKey, "PartUploadRequest") ) diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration.kt index 4aa3a37130..89144493f5 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfiguration.kt @@ -16,11 +16,24 @@ package com.amplifyframework.storage.s3.configuration import com.amplifyframework.auth.AuthCredentialsProvider +import com.amplifyframework.storage.ProgressStallTimeout class AWSS3StoragePluginConfiguration private constructor(builder: Builder) { private val awsS3PluginPrefixResolver = builder.awsS3PluginPrefixResolver + /** + * Default progress stall timeout applied to S3 uploads. + * + * When an upload does not report any forward progress within this interval, the transfer is + * cancelled and the `onError` callback receives a [com.amplifyframework.storage.StorageException] + * whose `cause` is a [com.amplifyframework.storage.ProgressStallTimeoutException]. The default is + * [ProgressStallTimeout.Disabled], which preserves existing behavior. + * + * This value is used whenever a per-upload override has not been supplied on the upload options. + */ + val progressStallTimeout: ProgressStallTimeout = builder.progressStallTimeout + companion object { operator fun invoke(block: Builder.() -> Unit): AWSS3StoragePluginConfiguration = Builder() .apply(block) @@ -37,6 +50,13 @@ class AWSS3StoragePluginConfiguration private constructor(builder: Builder) { @Deprecated("Unused for operations using StoragePath") var awsS3PluginPrefixResolver: AWSS3PluginPrefixResolver? = null + /** + * Default [ProgressStallTimeout] applied to uploads created by this plugin. + * + * Defaults to [ProgressStallTimeout.Disabled]. + */ + var progressStallTimeout: ProgressStallTimeout = ProgressStallTimeout.Disabled + fun build(): AWSS3StoragePluginConfiguration = AWSS3StoragePluginConfiguration(this) } } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/extensions/StorageExceptionExtensions.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/extensions/StorageExceptionExtensions.kt index 3c6e0b87f4..8ee460d5e8 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/extensions/StorageExceptionExtensions.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/extensions/StorageExceptionExtensions.kt @@ -14,6 +14,7 @@ */ package com.amplifyframework.storage.s3.extensions +import com.amplifyframework.storage.ProgressStallTimeoutException import com.amplifyframework.storage.StorageException import com.amplifyframework.storage.StorageFilePermissionException import com.amplifyframework.storage.StoragePathValidationException @@ -41,3 +42,43 @@ internal fun StorageFilePermissionException.Companion.unableToOverwriteFileExcep ), "Acquire write permission for this file before attempting to overwrite it." ) + +internal fun ProgressStallTimeoutException.Companion.progressStallTimeoutException( + cause: ProgressStallTimeoutException? = null +) = StorageException( + "Upload cancelled due to progress stall timeout.", + cause ?: ProgressStallTimeoutException( + "Upload cancelled due to progress stall timeout.", + "Increase the configured progress stall timeout or verify the network conditions, " + + "then retry the upload." + ), + "Increase the configured progress stall timeout or verify the network conditions, " + + "then retry the upload." +) + +/** + * Wraps [this] into the appropriate [StorageException] for an upload callback. + * + * When the underlying failure is a [ProgressStallTimeoutException], the typed stall exception is + * surfaced verbatim via [ProgressStallTimeoutException.Companion.progressStallTimeoutException] so + * callers can branch on `storageException.cause is ProgressStallTimeoutException`. All other + * throwables are wrapped in a generic [StorageException] using [defaultMessage]. + */ +internal fun Throwable.toStorageUploadException(defaultMessage: String): StorageException { + val stall = findProgressStallTimeoutCause() + if (stall != null) { + return ProgressStallTimeoutException.progressStallTimeoutException(stall) + } + return StorageException( + defaultMessage, + this, + "See attached exception for more information and suggestions" + ) +} + +private tailrec fun Throwable.findProgressStallTimeoutCause(): ProgressStallTimeoutException? { + if (this is ProgressStallTimeoutException) return this + val next = cause ?: return null + if (next === this) return null + return next.findProgressStallTimeoutCause() +} diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadFileOperation.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadFileOperation.kt index 167723b8e2..98ee209154 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadFileOperation.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadFileOperation.kt @@ -28,6 +28,7 @@ import com.amplifyframework.storage.result.StorageTransferProgress import com.amplifyframework.storage.result.StorageUploadFileResult import com.amplifyframework.storage.s3.ServerSideEncryption import com.amplifyframework.storage.s3.extensions.toS3ServiceKey +import com.amplifyframework.storage.s3.extensions.toStorageUploadException import com.amplifyframework.storage.s3.request.AWSS3StoragePathUploadRequest import com.amplifyframework.storage.s3.service.StorageService import com.amplifyframework.storage.s3.transfer.TransferListener @@ -116,7 +117,8 @@ internal class AWSS3StoragePathUploadFileOperation internal constructor( serviceKey, file, objectMetadata, - uploadRequest.useAccelerateEndpoint + uploadRequest.useAccelerateEndpoint, + uploadRequest.progressStallTimeoutSeconds ) transferObserver?.setTransferListener(UploadTransferListener()) } catch (exception: Exception) { @@ -223,10 +225,8 @@ internal class AWSS3StoragePathUploadFileOperation internal constructor( HubEvent.create(StorageChannelEventName.UPLOAD_ERROR, ex) ) onError?.accept( - StorageException( - "Something went wrong with your AWS S3 Storage upload file operation", - ex, - "See attached exception for more information and suggestions" + ex.toStorageUploadException( + "Something went wrong with your AWS S3 Storage upload file operation" ) ) } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadInputStreamOperation.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadInputStreamOperation.kt index 36a6d63ac7..1602d8e512 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadInputStreamOperation.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadInputStreamOperation.kt @@ -28,6 +28,7 @@ import com.amplifyframework.storage.result.StorageTransferProgress import com.amplifyframework.storage.result.StorageUploadInputStreamResult import com.amplifyframework.storage.s3.ServerSideEncryption import com.amplifyframework.storage.s3.extensions.toS3ServiceKey +import com.amplifyframework.storage.s3.extensions.toStorageUploadException import com.amplifyframework.storage.s3.request.AWSS3StoragePathUploadRequest import com.amplifyframework.storage.s3.service.StorageService import com.amplifyframework.storage.s3.transfer.TransferListener @@ -115,7 +116,8 @@ internal class AWSS3StoragePathUploadInputStreamOperation internal constructor( serviceKey, inputStream, objectMetadata, - request.useAccelerateEndpoint + request.useAccelerateEndpoint, + request.progressStallTimeoutSeconds ) transferObserver?.setTransferListener(UploadTransferListener()) } catch (exception: Exception) { @@ -222,10 +224,8 @@ internal class AWSS3StoragePathUploadInputStreamOperation internal constructor( HubEvent.create(StorageChannelEventName.UPLOAD_ERROR, ex) ) onError?.accept( - StorageException( - "Something went wrong with your AWS S3 Storage upload InputStream operation", - ex, - "See attached exception for more information and suggestions" + ex.toStorageUploadException( + "Something went wrong with your AWS S3 Storage upload InputStream operation" ) ) } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperation.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperation.kt index cb28a8360c..9e3188a88c 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperation.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperation.kt @@ -28,6 +28,7 @@ import com.amplifyframework.storage.result.StorageTransferProgress import com.amplifyframework.storage.result.StorageUploadFileResult import com.amplifyframework.storage.s3.ServerSideEncryption import com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration +import com.amplifyframework.storage.s3.extensions.toStorageUploadException import com.amplifyframework.storage.s3.request.AWSS3StorageUploadRequest import com.amplifyframework.storage.s3.service.StorageService import com.amplifyframework.storage.s3.transfer.TransferListener @@ -53,7 +54,8 @@ class AWSS3StorageUploadFileOperation @JvmOverloads internal constructor( private var transferObserver: TransferObserver? = null, onProgress: Consumer? = null, onSuccess: Consumer? = null, - onError: Consumer? = null + onError: Consumer? = null, + private val progressStallTimeoutSeconds: Long = 0L ) : StorageUploadFileOperation>(request, transferId, onProgress, onSuccess, onError) { constructor( @@ -75,7 +77,41 @@ class AWSS3StorageUploadFileOperation @JvmOverloads internal constructor( null, onProgress, onSuccess, - onError + onError, + 0L + ) + + /** + * Java-friendly secondary constructor that accepts a resolved progress-stall timeout. + * + * The plugin resolves the effective seconds from the plugin configuration and any per-upload + * override before instantiating this operation, so downstream workers can arm a stall timer + * without re-reading the plugin configuration. + * + * @param progressStallTimeoutSeconds resolved stall interval in seconds; `0` disables detection. + */ + constructor( + storageService: StorageService, + executorService: ExecutorService, + authCredentialsProvider: AuthCredentialsProvider, + request: AWSS3StorageUploadRequest, + awsS3StoragePluginConfiguration: AWSS3StoragePluginConfiguration, + onProgress: Consumer, + onSuccess: Consumer, + onError: Consumer, + progressStallTimeoutSeconds: Long + ) : this( + UUID.randomUUID().toString(), + storageService, + executorService, + authCredentialsProvider, + awsS3StoragePluginConfiguration, + request, + null, + onProgress, + onSuccess, + onError, + progressStallTimeoutSeconds ) init { @@ -114,7 +150,8 @@ class AWSS3StorageUploadFileOperation @JvmOverloads internal constructor( serviceKey, file, objectMetadata, - uploadRequest.useAccelerateEndpoint() + uploadRequest.useAccelerateEndpoint(), + progressStallTimeoutSeconds ) transferObserver?.setTransferListener(UploadTransferListener()) } catch (exception: Exception) { @@ -226,10 +263,8 @@ class AWSS3StorageUploadFileOperation @JvmOverloads internal constructor( HubEvent.create(StorageChannelEventName.UPLOAD_ERROR, exception) ) onError?.accept( - StorageException( - "Something went wrong with your AWS S3 Storage upload file operation", - exception, - "See attached exception for more information and suggestions" + exception.toStorageUploadException( + "Something went wrong with your AWS S3 Storage upload file operation" ) ) } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperation.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperation.kt index 4a216bd3d4..2ca2645b48 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperation.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperation.kt @@ -28,6 +28,7 @@ import com.amplifyframework.storage.result.StorageTransferProgress import com.amplifyframework.storage.result.StorageUploadInputStreamResult import com.amplifyframework.storage.s3.ServerSideEncryption import com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration +import com.amplifyframework.storage.s3.extensions.toStorageUploadException import com.amplifyframework.storage.s3.request.AWSS3StorageUploadRequest import com.amplifyframework.storage.s3.service.StorageService import com.amplifyframework.storage.s3.transfer.TransferListener @@ -54,7 +55,8 @@ class AWSS3StorageUploadInputStreamOperation @JvmOverloads internal constructor( private var transferObserver: TransferObserver? = null, onProgress: Consumer? = null, onSuccess: Consumer? = null, - onError: Consumer? = null + onError: Consumer? = null, + private val progressStallTimeoutSeconds: Long = 0L ) : StorageUploadInputStreamOperation>( request, transferId, @@ -82,7 +84,41 @@ class AWSS3StorageUploadInputStreamOperation @JvmOverloads internal constructor( null, onProgress, onSuccess, - onError + onError, + 0L + ) + + /** + * Java-friendly secondary constructor that accepts a resolved progress-stall timeout. + * + * The plugin resolves the effective seconds from the plugin configuration and any per-upload + * override before instantiating this operation, so downstream workers can arm a stall timer + * without re-reading the plugin configuration. + * + * @param progressStallTimeoutSeconds resolved stall interval in seconds; `0` disables detection. + */ + constructor( + storageService: StorageService, + executorService: ExecutorService, + authCredentialsProvider: AuthCredentialsProvider, + awsS3StoragePluginConfiguration: AWSS3StoragePluginConfiguration, + request: AWSS3StorageUploadRequest, + onProgress: Consumer, + onSuccess: Consumer, + onError: Consumer, + progressStallTimeoutSeconds: Long + ) : this( + UUID.randomUUID().toString(), + storageService, + executorService, + authCredentialsProvider, + awsS3StoragePluginConfiguration, + request, + null, + onProgress, + onSuccess, + onError, + progressStallTimeoutSeconds ) init { @@ -121,7 +157,8 @@ class AWSS3StorageUploadInputStreamOperation @JvmOverloads internal constructor( serviceKey, inputStream, objectMetadata, - uploadRequest.useAccelerateEndpoint() + uploadRequest.useAccelerateEndpoint(), + progressStallTimeoutSeconds ) transferObserver?.setTransferListener(UploadTransferListener()) } catch (ioException: IOException) { @@ -232,10 +269,8 @@ class AWSS3StorageUploadInputStreamOperation @JvmOverloads internal constructor( HubEvent.create(StorageChannelEventName.UPLOAD_ERROR, exception) ) onError?.accept( - StorageException( - "Something went wrong with your AWS S3 Storage upload input stream operation", - exception, - "See attached exception for more information and suggestions" + exception.toStorageUploadException( + "Something went wrong with your AWS S3 Storage upload input stream operation" ) ) } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions.java b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions.java index 00ce8ff610..c802a35e71 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions.java +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptions.java @@ -16,8 +16,10 @@ package com.amplifyframework.storage.s3.options; import androidx.annotation.NonNull; +import androidx.annotation.Nullable; import androidx.core.util.ObjectsCompat; +import com.amplifyframework.storage.ProgressStallTimeout; import com.amplifyframework.storage.options.StorageUploadFileOptions; import com.amplifyframework.storage.s3.ServerSideEncryption; @@ -29,11 +31,13 @@ public final class AWSS3StorageUploadFileOptions extends StorageUploadFileOptions { private final ServerSideEncryption serverSideEncryption; private final boolean useAccelerationMode; + private final ProgressStallTimeout progressStallTimeout; private AWSS3StorageUploadFileOptions(final Builder builder) { super(builder); this.serverSideEncryption = builder.getServerSideEncryption(); this.useAccelerationMode = builder.useAccelerateEndpoint; + this.progressStallTimeout = builder.progressStallTimeout; } /** @@ -76,7 +80,8 @@ public static Builder from(@NonNull final AWSS3StorageUploadFileOptions options) .contentType(options.getContentType()) .serverSideEncryption(options.getServerSideEncryption()) .metadata(options.getMetadata()) - .bucket(options.getBucket()); + .bucket(options.getBucket()) + .progressStallTimeout(options.getProgressStallTimeout()); } /** @@ -97,6 +102,21 @@ public boolean useAccelerateEndpoint() { return useAccelerationMode; } + /** + * Per-upload override for the progress stall timeout. + * + *

When {@code null}, the value configured on + * {@link com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration} + * is applied instead. When non-null, this value takes precedence and disables plugin-level + * stall detection overrides for this single upload.

+ * + * @return a {@link ProgressStallTimeout} override, or {@code null} to defer to the plugin default + */ + @Nullable + public ProgressStallTimeout getProgressStallTimeout() { + return progressStallTimeout; + } + @Override @SuppressWarnings("deprecation") public boolean equals(Object obj) { @@ -111,7 +131,8 @@ public boolean equals(Object obj) { ObjectsCompat.equals(getContentType(), that.getContentType()) && ObjectsCompat.equals(getServerSideEncryption(), that.getServerSideEncryption()) && ObjectsCompat.equals(getMetadata(), that.getMetadata()) && - ObjectsCompat.equals(getBucket(), that.getBucket()); + ObjectsCompat.equals(getBucket(), that.getBucket()) && + ObjectsCompat.equals(getProgressStallTimeout(), that.getProgressStallTimeout()); } } @@ -124,7 +145,8 @@ public int hashCode() { getContentType(), getServerSideEncryption(), getMetadata(), - getBucket() + getBucket(), + getProgressStallTimeout() ); } @@ -139,6 +161,7 @@ public String toString() { ", serverSideEncryption=" + getServerSideEncryption().getName() + ", metadata=" + getMetadata() + ", bucket=" + getBucket() + + ", progressStallTimeout=" + getProgressStallTimeout() + '}'; } @@ -150,6 +173,8 @@ public String toString() { public static final class Builder extends StorageUploadFileOptions.Builder { private ServerSideEncryption serverSideEncryption; private boolean useAccelerateEndpoint; + @Nullable + private ProgressStallTimeout progressStallTimeout; private Builder() { super(); @@ -177,6 +202,22 @@ public Builder serverSideEncryption(@NonNull ServerSideEncryption serverSideEncr return this; } + /** + * Configures the {@link ProgressStallTimeout} override for a single upload. + * + *

When {@code null} (the default), the value configured on + * {@link com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration} is + * applied. When non-null, this value takes precedence for this upload.

+ * + * @param progressStallTimeout per-upload override, or {@code null} to defer to the plugin default + * @return Current Builder instance for fluent chaining + */ + @NonNull + public Builder progressStallTimeout(@Nullable ProgressStallTimeout progressStallTimeout) { + this.progressStallTimeout = progressStallTimeout; + return this; + } + @NonNull ServerSideEncryption getServerSideEncryption() { return serverSideEncryption; diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions.java b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions.java index 3bf4f6a368..123e2abe0c 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions.java +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptions.java @@ -16,8 +16,10 @@ package com.amplifyframework.storage.s3.options; import androidx.annotation.NonNull; +import androidx.annotation.Nullable; import androidx.core.util.ObjectsCompat; +import com.amplifyframework.storage.ProgressStallTimeout; import com.amplifyframework.storage.options.StorageUploadInputStreamOptions; import com.amplifyframework.storage.s3.ServerSideEncryption; @@ -29,11 +31,13 @@ public final class AWSS3StorageUploadInputStreamOptions extends StorageUploadInputStreamOptions { private final ServerSideEncryption serverSideEncryption; private final boolean useAccelerationMode; + private final ProgressStallTimeout progressStallTimeout; private AWSS3StorageUploadInputStreamOptions(final Builder builder) { super(builder); this.serverSideEncryption = builder.serverSideEncryption; this.useAccelerationMode = builder.useAccelerateEndpoint; + this.progressStallTimeout = builder.progressStallTimeout; } /** @@ -76,7 +80,8 @@ public static Builder from(@NonNull final AWSS3StorageUploadInputStreamOptions o .contentType(options.getContentType()) .serverSideEncryption(options.getServerSideEncryption()) .metadata(options.getMetadata()) - .bucket(options.getBucket()); + .bucket(options.getBucket()) + .progressStallTimeout(options.getProgressStallTimeout()); } /** @@ -97,6 +102,21 @@ public boolean useAccelerateEndpoint() { return useAccelerationMode; } + /** + * Per-upload override for the progress stall timeout. + * + *

When {@code null}, the value configured on + * {@link com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration} + * is applied instead. When non-null, this value takes precedence and disables plugin-level + * stall detection overrides for this single upload.

+ * + * @return a {@link ProgressStallTimeout} override, or {@code null} to defer to the plugin default + */ + @Nullable + public ProgressStallTimeout getProgressStallTimeout() { + return progressStallTimeout; + } + @Override @SuppressWarnings("deprecation") public boolean equals(Object obj) { @@ -111,7 +131,8 @@ public boolean equals(Object obj) { ObjectsCompat.equals(getContentType(), that.getContentType()) && ObjectsCompat.equals(getServerSideEncryption(), that.getServerSideEncryption()) && ObjectsCompat.equals(getMetadata(), that.getMetadata()) && - ObjectsCompat.equals(getBucket(), that.getBucket()); + ObjectsCompat.equals(getBucket(), that.getBucket()) && + ObjectsCompat.equals(getProgressStallTimeout(), that.getProgressStallTimeout()); } } @@ -124,7 +145,8 @@ public int hashCode() { getContentType(), getServerSideEncryption(), getMetadata(), - getBucket() + getBucket(), + getProgressStallTimeout() ); } @@ -139,6 +161,7 @@ public String toString() { ", serverSideEncryption=" + getServerSideEncryption().getName() + ", metadata=" + getMetadata() + ", bucket=" + getBucket() + + ", progressStallTimeout=" + getProgressStallTimeout() + '}'; } @@ -150,6 +173,8 @@ public String toString() { public static final class Builder extends StorageUploadInputStreamOptions.Builder { private ServerSideEncryption serverSideEncryption; private boolean useAccelerateEndpoint; + @Nullable + private ProgressStallTimeout progressStallTimeout; private Builder() { super(); @@ -177,6 +202,22 @@ public Builder serverSideEncryption(@NonNull ServerSideEncryption serverSideEncr return this; } + /** + * Configures the {@link ProgressStallTimeout} override for a single upload. + * + *

When {@code null} (the default), the value configured on + * {@link com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration} is + * applied. When non-null, this value takes precedence for this upload.

+ * + * @param progressStallTimeout per-upload override, or {@code null} to defer to the plugin default + * @return Current Builder instance for fluent chaining + */ + @NonNull + public Builder progressStallTimeout(@Nullable ProgressStallTimeout progressStallTimeout) { + this.progressStallTimeout = progressStallTimeout; + return this; + } + @Override @NonNull public AWSS3StorageUploadInputStreamOptions build() { diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/request/AWSS3StoragePathUploadRequest.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/request/AWSS3StoragePathUploadRequest.kt index 0c86b78db6..4911e8ef40 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/request/AWSS3StoragePathUploadRequest.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/request/AWSS3StoragePathUploadRequest.kt @@ -20,6 +20,11 @@ import com.amplifyframework.storage.s3.ServerSideEncryption /** * Parameters to provide to S3 that describe a request to upload a * file or input stream. + * + * @property progressStallTimeoutSeconds Resolved progress-stall interval in seconds. `0` disables + * stall detection. The value is the result of merging the plugin default with any per-upload + * override supplied on the upload options, so downstream code does not need the plugin + * configuration to decide whether to arm the stall timer. */ internal data class AWSS3StoragePathUploadRequest( val path: StoragePath, @@ -27,5 +32,6 @@ internal data class AWSS3StoragePathUploadRequest( val contentType: String?, val serverSideEncryption: ServerSideEncryption, val metadata: Map, - val useAccelerateEndpoint: Boolean + val useAccelerateEndpoint: Boolean, + val progressStallTimeoutSeconds: Long = 0L ) diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/AWSS3StorageService.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/AWSS3StorageService.kt index a8f1512e6a..70eb50225e 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/AWSS3StorageService.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/AWSS3StorageService.kt @@ -62,13 +62,20 @@ internal class AWSS3StorageService( private val authCredentialsProvider: AuthCredentialsProvider, private val awsS3StoragePluginKey: String, private val clientProvider: StorageTransferClientProvider, - private val transferStatusUpdater: TransferStatusUpdater + private val transferStatusUpdater: TransferStatusUpdater, + private val defaultProgressStallTimeoutSeconds: Long = 0L ) : StorageService { private var s3Client: S3Client = S3StorageTransferClientProvider.getS3Client(awsRegion, authCredentialsProvider) val transferManager: TransferManager = - TransferManager(context, clientProvider, awsS3StoragePluginKey, transferStatusUpdater) + TransferManager( + context, + clientProvider, + awsS3StoragePluginKey, + transferStatusUpdater, + defaultProgressStallTimeoutSeconds = defaultProgressStallTimeoutSeconds + ) /** * Generate pre-signed URL for an object. @@ -165,7 +172,8 @@ internal class AWSS3StorageService( serviceKey: String, file: File, metadata: ObjectMetadata, - useAccelerateEndpoint: Boolean + useAccelerateEndpoint: Boolean, + progressStallTimeoutSeconds: Long ): TransferObserver = transferManager.upload( transferId, s3BucketName, @@ -173,7 +181,8 @@ internal class AWSS3StorageService( serviceKey, file, metadata, - useAccelerateEndpoint = useAccelerateEndpoint + useAccelerateEndpoint = useAccelerateEndpoint, + progressStallTimeoutSeconds = progressStallTimeoutSeconds ) /** @@ -189,10 +198,18 @@ internal class AWSS3StorageService( serviceKey: String, inputStream: InputStream, metadata: ObjectMetadata, - useAccelerateEndpoint: Boolean + useAccelerateEndpoint: Boolean, + progressStallTimeoutSeconds: Long ): TransferObserver { val uploadOptions = UploadOptions(s3BucketName, awsRegion, metadata) - return transferManager.upload(transferId, serviceKey, inputStream, uploadOptions, useAccelerateEndpoint) + return transferManager.upload( + transferId, + serviceKey, + inputStream, + uploadOptions, + useAccelerateEndpoint, + progressStallTimeoutSeconds + ) } /** @@ -434,6 +451,10 @@ internal class AWSS3StorageService( * @param context Android context * @param region S3 bucket region * @param bucketName Name of the bucket where the items are stored + * @param defaultProgressStallTimeoutSeconds Plugin-level default progress-stall interval in + * seconds, used when a transfer is resumed (and any per-upload override has been lost + * because it was only passed through `WorkData` for the original enqueue). `0` disables + * stall detection on resume, preserving legacy behavior. * @return An instantiated storage service instance */ fun create( @@ -441,7 +462,8 @@ internal class AWSS3StorageService( region: String, bucketName: String, clientProvider: StorageTransferClientProvider, - transferStatusUpdater: TransferStatusUpdater + transferStatusUpdater: TransferStatusUpdater, + defaultProgressStallTimeoutSeconds: Long = 0L ): AWSS3StorageService } } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/AWSS3StorageServiceContainer.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/AWSS3StorageServiceContainer.kt index 859337ac57..7eb811ce7b 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/AWSS3StorageServiceContainer.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/AWSS3StorageServiceContainer.kt @@ -30,14 +30,23 @@ internal class AWSS3StorageServiceContainer( private val storageServiceFactory: AWSS3StorageService.Factory, private val clientProvider: StorageTransferClientProvider, private val awsS3StorageServicesByBucketName: ConcurrentHashMap, - private val transferStatusUpdater: TransferStatusUpdater + private val transferStatusUpdater: TransferStatusUpdater, + private val defaultProgressStallTimeoutSeconds: Long = 0L ) { constructor( context: Context, storageServiceFactory: AWSS3StorageService.Factory, clientProvider: S3StorageTransferClientProvider, - transferStatusUpdater: TransferStatusUpdater - ) : this(context, storageServiceFactory, clientProvider, ConcurrentHashMap(), transferStatusUpdater) + transferStatusUpdater: TransferStatusUpdater, + defaultProgressStallTimeoutSeconds: Long = 0L + ) : this( + context, + storageServiceFactory, + clientProvider, + ConcurrentHashMap(), + transferStatusUpdater, + defaultProgressStallTimeoutSeconds + ) private val lock = Any() @@ -64,8 +73,14 @@ internal class AWSS3StorageServiceContainer( var service = awsS3StorageServicesByBucketName.get(bucketName) if (service == null) { val region: String = resolvedStorageBucket.bucketInfo.region - service = - storageServiceFactory.create(context, region, bucketName, clientProvider, transferStatusUpdater) + service = storageServiceFactory.create( + context, + region, + bucketName, + clientProvider, + transferStatusUpdater, + defaultProgressStallTimeoutSeconds + ) awsS3StorageServicesByBucketName[bucketName] = service } return service @@ -82,8 +97,14 @@ internal class AWSS3StorageServiceContainer( synchronized(lock) { var service = awsS3StorageServicesByBucketName[bucketName] if (service == null) { - service = - storageServiceFactory.create(context, region, bucketName, clientProvider, transferStatusUpdater) + service = storageServiceFactory.create( + context, + region, + bucketName, + clientProvider, + transferStatusUpdater, + defaultProgressStallTimeoutSeconds + ) awsS3StorageServicesByBucketName[bucketName] = service } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/StorageService.java b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/StorageService.java index 491acce954..80fa68c1ac 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/StorageService.java +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/service/StorageService.java @@ -81,12 +81,15 @@ TransferObserver downloadToFile(@NonNull String transferId, @NonNull File file, boolean useAccelerateEndpoint); - /** * Begin uploading a file to a key in storage and return an observer * to monitor upload progress. This item will be stored with specified * metadata. * + *

Delegates to {@link #uploadFile(String, String, File, ObjectMetadata, boolean, long)} with + * a stall timeout of {@code 0}, preserving legacy behavior when no stall detection is + * configured.

+ * * @param transferId unique id for this transfer * @param serviceKey Key to uniquely label item in storage * @param file file to upload @@ -94,22 +97,77 @@ TransferObserver downloadToFile(@NonNull String transferId, * @param useAccelerateEndpoint flag to use accelerate endpoint * @return An instance of {@link TransferObserver} to monitor upload */ + default TransferObserver uploadFile(@NonNull String transferId, + @NonNull String serviceKey, + @NonNull File file, + @NonNull ObjectMetadata metadata, + boolean useAccelerateEndpoint) { + return uploadFile(transferId, serviceKey, file, metadata, useAccelerateEndpoint, 0L); + } + + /** + * Begin uploading a file with progress-stall detection applied. + * + *

A {@code progressStallTimeoutSeconds} of {@code 0} disables stall detection (identical to + * the legacy overload). Any positive value arms a timer that cancels the upload if no forward + * progress is observed within that many seconds.

+ * + * @param transferId unique id for this transfer + * @param serviceKey Key to uniquely label item in storage + * @param file file to upload + * @param metadata metadata to attach to uploaded item + * @param useAccelerateEndpoint flag to use accelerate endpoint + * @param progressStallTimeoutSeconds resolved stall interval in seconds; {@code 0} disables detection + * @return An instance of {@link TransferObserver} to monitor upload + */ TransferObserver uploadFile(@NonNull String transferId, @NonNull String serviceKey, @NonNull File file, @NonNull ObjectMetadata metadata, - boolean useAccelerateEndpoint); + boolean useAccelerateEndpoint, + long progressStallTimeoutSeconds); /** * Begin uploading an InputStream to a key in storage and return an observer * to monitor upload progress. This item will be stored with specified * metadata. * + *

Delegates to + * {@link #uploadInputStream(String, String, InputStream, ObjectMetadata, boolean, long)} with a + * stall timeout of {@code 0}, preserving legacy behavior when no stall detection is + * configured.

+ * + * @param transferId unique id for this transfer + * @param serviceKey key to uniquely label item in storage + * @param inputStream InputStream from which to read content + * @param metadata Metadata to attach to uploaded item + * @param useAccelerateEndpoint Flag to use accelerate endpoint + * @return An instance of {@link TransferObserver} to monitor upload + * @throws IOException on error reading the InputStream, or saving it to a temporary + * File before the upload begins. + */ + default TransferObserver uploadInputStream(@NonNull String transferId, + @NonNull String serviceKey, + @NonNull InputStream inputStream, + @NonNull ObjectMetadata metadata, + boolean useAccelerateEndpoint) + throws IOException { + return uploadInputStream(transferId, serviceKey, inputStream, metadata, useAccelerateEndpoint, 0L); + } + + /** + * Begin uploading an InputStream with progress-stall detection applied. + * + *

A {@code progressStallTimeoutSeconds} of {@code 0} disables stall detection (identical to + * the legacy overload). Any positive value arms a timer that cancels the upload if no forward + * progress is observed within that many seconds.

+ * * @param transferId unique id for this transfer * @param serviceKey key to uniquely label item in storage * @param inputStream InputStream from which to read content * @param metadata Metadata to attach to uploaded item * @param useAccelerateEndpoint Flag to use accelerate endpoint + * @param progressStallTimeoutSeconds resolved stall interval in seconds; {@code 0} disables detection * @return An instance of {@link TransferObserver} to monitor upload * @throws IOException on error reading the InputStream, or saving it to a temporary * File before the upload begins. @@ -118,7 +176,8 @@ TransferObserver uploadInputStream(@NonNull String transferId, @NonNull String serviceKey, @NonNull InputStream inputStream, @NonNull ObjectMetadata metadata, - boolean useAccelerateEndpoint) + boolean useAccelerateEndpoint, + long progressStallTimeoutSeconds) throws IOException; /** diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/StallDetectingProgressListener.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/StallDetectingProgressListener.kt new file mode 100644 index 0000000000..1149393716 --- /dev/null +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/StallDetectingProgressListener.kt @@ -0,0 +1,100 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.storage.s3.transfer + +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlin.time.Duration.Companion.seconds +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * Decorates a [ProgressListener] with stall detection. + * + * When the wrapped listener has not observed any forward progress within [stallTimeoutSeconds], + * [onStall] is invoked exactly once. Callers are expected to cancel the associated transfer and + * surface a `ProgressStallTimeoutException` to the user in response to [onStall]. + * + * Timer lifecycle: + * - Call [start] once the transfer has been enqueued to arm the initial timer. + * - Every call to [progressChanged] with `bytesTransferred > 0` re-arms the timer. + * - Call [close] when the transfer reaches a terminal state (completed, cancelled, failed) to + * cancel any pending timer and suppress further stall detection. + * + * A [stallTimeoutSeconds] of `0` disables detection — [onStall] will never be invoked, but + * progress events are still forwarded to [delegate]. Callers typically avoid constructing this + * decorator at all when detection is disabled, but the guard is kept here to make the decorator + * safe to construct unconditionally. + */ +internal class StallDetectingProgressListener( + private val delegate: ProgressListener, + private val stallTimeoutSeconds: Long, + private val onStall: () -> Unit, + private val scope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default), + private val ownsScope: Boolean = true +) : ProgressListener, AutoCloseable { + + private val timerJob = AtomicReference(null) + private val stalled = AtomicBoolean(false) + private val closed = AtomicBoolean(false) + + /** + * Arms the stall timer. Intended to be called once when the transfer begins so that uploads + * that never produce a single progress event still time out. + */ + fun start() { + if (!shouldMonitor()) return + armTimer() + } + + @Synchronized + override fun progressChanged(bytesTransferred: Long) { + delegate.progressChanged(bytesTransferred) + if (!shouldMonitor()) return + if (bytesTransferred > 0L) { + armTimer() + } + } + + /** + * Cancels the pending stall timer and prevents future stall detection. Safe to call multiple + * times. When the decorator owns its [scope] (the default), the scope is also cancelled. + */ + override fun close() { + if (!closed.compareAndSet(false, true)) return + timerJob.getAndSet(null)?.cancel() + if (ownsScope) { + scope.cancel() + } + } + + private fun shouldMonitor(): Boolean = !closed.get() && !stalled.get() && stallTimeoutSeconds > 0L + + private fun armTimer() { + val next = scope.launch { + delay(stallTimeoutSeconds.seconds) + if (stalled.compareAndSet(false, true)) { + onStall() + } + } + val previous = timerJob.getAndSet(next) + previous?.cancel() + } +} diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/TransferManager.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/TransferManager.kt index f27fd4f7f8..50953b9809 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/TransferManager.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/TransferManager.kt @@ -49,7 +49,8 @@ internal class TransferManager( clientProvider: StorageTransferClientProvider, private val pluginKey: String, private val transferStatusUpdater: TransferStatusUpdater, - private val workManager: WorkManager = WorkManager.getInstance(context) + private val workManager: WorkManager = WorkManager.getInstance(context), + private val defaultProgressStallTimeoutSeconds: Long = 0L ) { private val transferDB: TransferDB = TransferDB.getInstance(context) @@ -100,7 +101,8 @@ internal class TransferManager( metadata: ObjectMetadata, cannedAcl: ObjectCannedAcl? = null, listener: TransferListener? = null, - useAccelerateEndpoint: Boolean = false + useAccelerateEndpoint: Boolean = false, + progressStallTimeoutSeconds: Long = 0L ): TransferObserver { val transferRecordId = if (shouldUploadInMultipart(file)) { createMultipartUploadRecords( @@ -137,7 +139,8 @@ internal class TransferManager( workManager, transferWorkerObserver, transferDB, - listener + listener, + progressStallTimeoutSeconds ) mainHandler.post { workManager @@ -148,12 +151,14 @@ internal class TransferManager( } @Throws(IOException::class) + @JvmOverloads fun upload( transferId: String, key: String, inputStream: InputStream, options: UploadOptions, - useAccelerateEndpoint: Boolean + useAccelerateEndpoint: Boolean, + progressStallTimeoutSeconds: Long = 0L ): TransferObserver { val file = writeInputStreamToFile(inputStream) return upload( @@ -165,7 +170,8 @@ internal class TransferManager( options.objectMetadata, options.cannedAcl, options.transferListener, - useAccelerateEndpoint + useAccelerateEndpoint, + progressStallTimeoutSeconds ) } @@ -233,7 +239,8 @@ internal class TransferManager( transferStatusUpdater, workManager, transferWorkerObserver, - transferDB + transferDB, + defaultProgressStallTimeoutSeconds ) mainHandler.post { workManager diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BaseTransferWorker.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BaseTransferWorker.kt index e28360a0b1..b65bcf09ad 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BaseTransferWorker.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BaseTransferWorker.kt @@ -49,6 +49,15 @@ internal interface BaseTransferWorker { internal const val COMPLETION_REQUEST_TAG: String = "COMPLETION_REQUEST_TAG_%s" internal const val INITIATION_REQUEST_TAG: String = "INITIATION_REQUEST_TAG_%s" internal const val MULTIPART_UPLOAD: String = "MULTIPART_UPLOAD" + + /** + * WorkData key carrying the resolved progress-stall interval (seconds) for this transfer. + * + * A value of `0` (or absence of the key) disables stall detection. Upload workers consume + * this value to decide whether to wrap their [ProgressListener] in a stall-detecting + * decorator. + */ + internal const val PROGRESS_STALL_TIMEOUT_SECONDS = "PROGRESS_STALL_TIMEOUT_SECONDS" } fun isNetworkAvailable(context: Context): Boolean { diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BlockingTransferWorker.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BlockingTransferWorker.kt index 37fb5f53d0..8b39f51592 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BlockingTransferWorker.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/BlockingTransferWorker.kt @@ -23,6 +23,7 @@ import androidx.work.WorkerParameters import androidx.work.workDataOf import com.amplifyframework.core.Amplify import com.amplifyframework.core.category.CategoryType +import com.amplifyframework.storage.ProgressStallTimeoutException import com.amplifyframework.storage.TransferState import com.amplifyframework.storage.s3.AWSS3StoragePlugin import com.amplifyframework.storage.s3.transfer.TransferDB @@ -76,7 +77,7 @@ internal abstract class BlockingTransferWorker( if (isRetryableError(ex)) { Result.retry() } else { - transferStatusUpdater.updateOnError(transferRecord.id, Exception(ex)) + transferStatusUpdater.updateOnError(transferRecord.id, ex.asReportableException()) transferStatusUpdater.updateTransferState( transferRecord.id, TransferState.FAILED @@ -91,8 +92,19 @@ internal abstract class BlockingTransferWorker( internal open var maxRetryCount = 0 - private fun isRetryableError(e: Throwable?): Boolean = !isNetworkAvailable(applicationContext) || - runAttemptCount < maxRetryCount || - // SocketException is thrown when download is terminated due to network disconnection. - e is SocketException + private fun isRetryableError(e: Throwable?): Boolean { + // Progress-stall cancellations are terminal; retrying will almost certainly hit the same + // stall again and keeps the user waiting. Report the typed failure instead. + if (e is ProgressStallTimeoutException) return false + return !isNetworkAvailable(applicationContext) || + runAttemptCount < maxRetryCount || + // SocketException is thrown when download is terminated due to network disconnection. + e is SocketException + } + + private fun Throwable?.asReportableException(): Exception = when (this) { + null -> Exception() + is Exception -> this + else -> Exception(this) + } } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/PartUploadTransferWorker.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/PartUploadTransferWorker.kt index 106e27c97d..4449576a70 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/PartUploadTransferWorker.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/PartUploadTransferWorker.kt @@ -20,14 +20,21 @@ import aws.sdk.kotlin.services.s3.S3Client import aws.sdk.kotlin.services.s3.uploadPart import aws.sdk.kotlin.services.s3.withConfig import aws.smithy.kotlin.runtime.content.asByteStream +import com.amplifyframework.storage.ProgressStallTimeoutException import com.amplifyframework.storage.TransferState import com.amplifyframework.storage.s3.transfer.PartUploadProgressListener +import com.amplifyframework.storage.s3.transfer.ProgressListener +import com.amplifyframework.storage.s3.transfer.StallDetectingProgressListener import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider import com.amplifyframework.storage.s3.transfer.TransferDB import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater import com.amplifyframework.storage.s3.transfer.UploadProgressListenerInterceptor import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.MULTI_PART_UPLOAD_ID +import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.PROGRESS_STALL_TIMEOUT_SECONDS import java.io.File +import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Job import kotlinx.coroutines.runBlocking /** @@ -49,27 +56,62 @@ internal class PartUploadTransferWorker( transferStatusUpdater.updateTransferState(transferRecord.mainUploadId, TransferState.IN_PROGRESS) multiPartUploadId = inputData.keyValueMap[MULTI_PART_UPLOAD_ID] as String partUploadProgressListener = PartUploadProgressListener(transferRecord, transferStatusUpdater) + val stallTimeoutSeconds = (inputData.keyValueMap[PROGRESS_STALL_TIMEOUT_SECONDS] as? Long) ?: 0L + val stallDetected = AtomicBoolean(false) val s3: S3Client = clientProvider.getStorageTransferClient(transferRecord.region, transferRecord.bucketName) - return runBlocking { - s3.withConfig { - interceptors += UploadProgressListenerInterceptor(partUploadProgressListener) - enableAccelerate = transferRecord.useAccelerateEndpoint == 1 - }.uploadPart { - bucket = transferRecord.bucketName - key = transferRecord.key - uploadId = multiPartUploadId - body = File(transferRecord.file).asByteStream( - start = transferRecord.fileOffset, - transferRecord.fileOffset + transferRecord.bytesTotal - 1 + val uploadPartResponse = try { + runBlocking { + val uploadJob = coroutineContext[Job] + val stallDecorator: StallDetectingProgressListener? = if (stallTimeoutSeconds > 0L) { + StallDetectingProgressListener( + delegate = partUploadProgressListener, + stallTimeoutSeconds = stallTimeoutSeconds, + onStall = { + if (stallDetected.compareAndSet(false, true)) { + uploadJob?.cancel(CancellationException("Progress stall timeout")) + } + } + ) + } else { + null + } + val effectiveListener: ProgressListener = stallDecorator ?: partUploadProgressListener + try { + stallDecorator?.start() + s3.withConfig { + interceptors += UploadProgressListenerInterceptor(effectiveListener) + enableAccelerate = transferRecord.useAccelerateEndpoint == 1 + }.uploadPart { + bucket = transferRecord.bucketName + key = transferRecord.key + uploadId = multiPartUploadId + body = File(transferRecord.file).asByteStream( + start = transferRecord.fileOffset, + transferRecord.fileOffset + transferRecord.bytesTotal - 1 + ) + partNumber = transferRecord.partNumber + } + } finally { + stallDecorator?.close() + } + } + } catch (cancellation: CancellationException) { + if (stallDetected.get()) { + throw ProgressStallTimeoutException( + "Upload cancelled due to progress stall timeout.", + "Increase the configured progress stall timeout or verify the network conditions, " + + "then retry the upload." ) - partNumber = transferRecord.partNumber } - }.eTag?.let { tag -> + throw cancellation + } + + return uploadPartResponse.eTag?.let { tag -> transferDB.updateETag(transferRecord.id, tag) transferDB.updateState(transferRecord.id, TransferState.PART_COMPLETED) updateProgress() - return Result.success(outputData) + Result.success(outputData) } ?: run { throw IllegalStateException("Etag is empty") } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/SinglePartUploadWorker.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/SinglePartUploadWorker.kt index 2502422b4d..6bf0706375 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/SinglePartUploadWorker.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/SinglePartUploadWorker.kt @@ -22,11 +22,19 @@ import android.content.Context import androidx.work.WorkerParameters import aws.sdk.kotlin.services.s3.S3Client import aws.sdk.kotlin.services.s3.withConfig +import com.amplifyframework.storage.ProgressStallTimeoutException +import com.amplifyframework.storage.s3.transfer.ProgressListener +import com.amplifyframework.storage.s3.transfer.StallDetectingProgressListener import com.amplifyframework.storage.s3.transfer.StorageTransferClientProvider import com.amplifyframework.storage.s3.transfer.TransferDB import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater import com.amplifyframework.storage.s3.transfer.UploadProgressListener import com.amplifyframework.storage.s3.transfer.UploadProgressListenerInterceptor +import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker.Companion.PROGRESS_STALL_TIMEOUT_SECONDS +import java.util.concurrent.atomic.AtomicBoolean +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Job +import kotlinx.coroutines.coroutineScope internal class SinglePartUploadWorker( private val clientProvider: StorageTransferClientProvider, @@ -40,13 +48,51 @@ internal class SinglePartUploadWorker( override suspend fun performWork(): Result { uploadProgressListener = UploadProgressListener(transferRecord, transferStatusUpdater) - val putObjectRequest = createPutObjectRequest(transferRecord, uploadProgressListener) - val s3: S3Client = clientProvider.getStorageTransferClient(transferRecord.region, transferRecord.bucketName) - return s3.withConfig { - interceptors += UploadProgressListenerInterceptor(uploadProgressListener) - enableAccelerate = transferRecord.useAccelerateEndpoint == 1 - }.putObject(putObjectRequest).let { - Result.success(outputData) + val stallTimeoutSeconds = (inputData.keyValueMap[PROGRESS_STALL_TIMEOUT_SECONDS] as? Long) ?: 0L + val stallDetected = AtomicBoolean(false) + + return try { + coroutineScope { + val uploadJob = coroutineContext[Job] + val stallDecorator: StallDetectingProgressListener? = if (stallTimeoutSeconds > 0L) { + StallDetectingProgressListener( + delegate = uploadProgressListener, + stallTimeoutSeconds = stallTimeoutSeconds, + onStall = { + if (stallDetected.compareAndSet(false, true)) { + uploadJob?.cancel(CancellationException("Progress stall timeout")) + } + } + ) + } else { + null + } + val effectiveListener: ProgressListener = stallDecorator ?: uploadProgressListener + val putObjectRequest = createPutObjectRequest(transferRecord, effectiveListener) + val s3: S3Client = clientProvider.getStorageTransferClient( + transferRecord.region, + transferRecord.bucketName + ) + try { + stallDecorator?.start() + s3.withConfig { + interceptors += UploadProgressListenerInterceptor(effectiveListener) + enableAccelerate = transferRecord.useAccelerateEndpoint == 1 + }.putObject(putObjectRequest) + } finally { + stallDecorator?.close() + } + Result.success(outputData) + } + } catch (cancellation: CancellationException) { + if (stallDetected.get()) { + throw ProgressStallTimeoutException( + "Upload cancelled due to progress stall timeout.", + "Increase the configured progress stall timeout or verify the network conditions, " + + "then retry the upload." + ) + } + throw cancellation } } } diff --git a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/SuspendingTransferWorker.kt b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/SuspendingTransferWorker.kt index bb75109ec1..b390511c73 100644 --- a/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/SuspendingTransferWorker.kt +++ b/aws-storage-s3/src/main/java/com/amplifyframework/storage/s3/transfer/worker/SuspendingTransferWorker.kt @@ -29,6 +29,7 @@ import androidx.work.WorkerParameters import androidx.work.workDataOf import com.amplifyframework.core.Amplify import com.amplifyframework.core.category.CategoryType +import com.amplifyframework.storage.ProgressStallTimeoutException import com.amplifyframework.storage.TransferState import com.amplifyframework.storage.s3.AWSS3StoragePlugin import com.amplifyframework.storage.s3.R @@ -95,7 +96,7 @@ internal abstract class SuspendingTransferWorker( if (!currentCoroutineContext().isActive && isRetryableError(ex)) { Result.retry() } else { - transferStatusUpdater.updateOnError(transferRecord.id, Exception(ex)) + transferStatusUpdater.updateOnError(transferRecord.id, ex.asReportableException()) transferStatusUpdater.updateTransferState( transferRecord.id, TransferState.FAILED @@ -127,11 +128,22 @@ internal abstract class SuspendingTransferWorker( ) } - private fun isRetryableError(e: Throwable?): Boolean = !isNetworkAvailable(applicationContext) || - runAttemptCount < maxRetryCount || - e is CancellationException || - // SocketException is thrown when download is terminated due to network disconnection. - e is SocketException + private fun isRetryableError(e: Throwable?): Boolean { + // Progress-stall cancellations are terminal; retrying will almost certainly hit the same + // stall again and keeps the user waiting. Report the typed failure instead. + if (e is ProgressStallTimeoutException) return false + return !isNetworkAvailable(applicationContext) || + runAttemptCount < maxRetryCount || + e is CancellationException || + // SocketException is thrown when download is terminated due to network disconnection. + e is SocketException + } + + private fun Throwable?.asReportableException(): Exception = when (this) { + null -> Exception() + is Exception -> this + else -> Exception(this) + } @RequiresApi(Build.VERSION_CODES.O) private fun createChannel() { diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/AWSS3StorageServiceContainerTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/AWSS3StorageServiceContainerTest.kt index d0441b0445..38656b39dd 100644 --- a/aws-storage-s3/src/test/java/com/amplifyframework/storage/AWSS3StorageServiceContainerTest.kt +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/AWSS3StorageServiceContainerTest.kt @@ -33,7 +33,7 @@ import org.junit.Test class AWSS3StorageServiceContainerTest { private val storageServiceFactory = mockk { - every { create(any(), any(), any(), any(), any()) } returns mockk() + every { create(any(), any(), any(), any(), any(), any()) } returns mockk() } private val context = mockk() private val clientProvider = mockk() @@ -51,13 +51,14 @@ class AWSS3StorageServiceContainerTest { storageServiceFactory, clientProvider, serviceContainerHashMap, - mockk() + mockk(), + 0L ) } @Test fun `put default AWSS3Service in container`() { - val service = storageServiceFactory.create(context, region, bucketName, clientProvider, mockk()) + val service = storageServiceFactory.create(context, region, bucketName, clientProvider, mockk(), 0L) serviceContainer.put(bucketName, service) serviceContainerHashMap.size shouldBe 1 diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/AWSS3StoragePluginProgressStallTimeoutTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/AWSS3StoragePluginProgressStallTimeoutTest.kt new file mode 100644 index 0000000000..99c5af80fa --- /dev/null +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/AWSS3StoragePluginProgressStallTimeoutTest.kt @@ -0,0 +1,299 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amplifyframework.storage.s3 + +import androidx.test.core.app.ApplicationProvider +import com.amplifyframework.AmplifyException +import com.amplifyframework.auth.AuthCredentialsProvider +import com.amplifyframework.core.Consumer +import com.amplifyframework.core.NoOpConsumer +import com.amplifyframework.storage.ProgressStallTimeout +import com.amplifyframework.storage.StorageCategory +import com.amplifyframework.storage.StorageCategoryConfiguration +import com.amplifyframework.storage.StorageException +import com.amplifyframework.storage.StoragePath +import com.amplifyframework.storage.options.StorageUploadFileOptions +import com.amplifyframework.storage.options.StorageUploadInputStreamOptions +import com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration +import com.amplifyframework.storage.s3.options.AWSS3StorageUploadFileOptions +import com.amplifyframework.storage.s3.options.AWSS3StorageUploadInputStreamOptions +import com.amplifyframework.storage.s3.service.AWSS3StorageService +import com.amplifyframework.storage.s3.transfer.TransferObserver +import io.kotest.matchers.shouldBe +import io.mockk.coEvery +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.verify +import java.io.ByteArrayInputStream +import org.json.JSONObject +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.robolectric.RobolectricTestRunner + +@RunWith(RobolectricTestRunner::class) +class AWSS3StoragePluginProgressStallTimeoutTest { + + private lateinit var storageService: AWSS3StorageService + private lateinit var authCredentialsProvider: AuthCredentialsProvider + private lateinit var transferObserver: TransferObserver + + @Before + fun setup() { + transferObserver = mockk(relaxed = true) + storageService = mockk(relaxed = true) { + every { + uploadFile(any(), any(), any(), any(), any(), any()) + } returns transferObserver + every { + uploadInputStream(any(), any(), any(), any(), any(), any()) + } returns transferObserver + } + authCredentialsProvider = mockk(relaxed = true) + coEvery { authCredentialsProvider.getIdentityId() } returns "test-identity" + } + + private fun configuredPlugin(pluginConfiguration: AWSS3StoragePluginConfiguration): AWSS3StoragePlugin { + val factory = mockk { + every { create(any(), any(), any(), any(), any(), any()) } returns storageService + } + val plugin = AWSS3StoragePlugin(factory, authCredentialsProvider, pluginConfiguration) + // Wire the plugin into a configured StorageCategory so executors and config are initialized. + val category = StorageCategory() + category.addPlugin(plugin) + category.configure(buildConfiguration(), ApplicationProvider.getApplicationContext()) + category.initialize(ApplicationProvider.getApplicationContext()) + return plugin + } + + private fun buildConfiguration(): StorageCategoryConfiguration { + val configuration = StorageCategoryConfiguration() + try { + configuration.populateFromJSON( + JSONObject().put( + "plugins", + JSONObject().put( + "awsS3StoragePlugin", + JSONObject() + .put("region", "us-east-1") + .put("bucket", "test-bucket") + ) + ) + ) + } catch (jsonException: org.json.JSONException) { + throw AssertionError(jsonException) + } catch (amplifyException: AmplifyException) { + throw AssertionError(amplifyException) + } + return configuration + } + + private fun captureUploadFileStallSeconds(): Long { + val slot = slot() + // The plugin schedules the actual `storageService.uploadFile` call on its internal + // executor, so verification needs to wait for the asynchronous work to land. + verify(timeout = 5_000L) { + storageService.uploadFile(any(), any(), any(), any(), any(), capture(slot)) + } + return slot.captured + } + + private fun captureUploadInputStreamStallSeconds(): Long { + val slot = slot() + verify(timeout = 5_000L) { + storageService.uploadInputStream(any(), any(), any(), any(), any(), capture(slot)) + } + return slot.captured + } + + /** + * Test that the plugin-level default of [ProgressStallTimeout.Disabled] resolves to `0` when + * the upload options do not supply a per-upload override. This is the legacy behavior — no + * stall detection is armed for the upload. + * + * - Given: a plugin configured with `ProgressStallTimeout.Disabled` and a default upload options + * - When: `uploadFile` is invoked with a string key + * - Then: `StorageService.uploadFile` is called with `progressStallTimeoutSeconds = 0` + */ + @Test + fun `string key uploadFile with no override and Disabled plugin default uses zero seconds`() { + val plugin = configuredPlugin(AWSS3StoragePluginConfiguration {}) + + plugin.uploadFile( + "test-key", + java.io.File.createTempFile("any", ".tmp"), + StorageUploadFileOptions.defaultInstance(), + NoOpConsumer.create(), + NoOpConsumer.create(), + Consumer { /* no-op */ } + ) + + captureUploadFileStallSeconds() shouldBe 0L + } + + /** + * Test that the plugin-level default `Interval` flows through to the storage service when no + * per-upload override is supplied. This covers the "non-Disabled" branch of the plugin + * configuration without any options-side overrides. + * + * - Given: a plugin configured with `ProgressStallTimeout.Interval(seconds = 25)` + * and a default upload options + * - When: `uploadFile` is invoked with a string key + * - Then: `StorageService.uploadFile` is called with `progressStallTimeoutSeconds = 25` + */ + @Test + fun `string key uploadFile uses plugin Interval default when override is null`() { + val plugin = configuredPlugin( + AWSS3StoragePluginConfiguration { + progressStallTimeout = ProgressStallTimeout.Interval(seconds = 25L) + } + ) + + plugin.uploadFile( + "test-key", + java.io.File.createTempFile("any", ".tmp"), + StorageUploadFileOptions.defaultInstance(), + NoOpConsumer.create(), + NoOpConsumer.create(), + Consumer { /* no-op */ } + ) + + captureUploadFileStallSeconds() shouldBe 25L + } + + /** + * Test that a per-upload [ProgressStallTimeout.Interval] override wins over the plugin-level + * default. This is the primary feature path: upload-level options must take precedence so + * apps can opt individual uploads in or out of stall detection without changing the plugin + * configuration. + * + * - Given: a plugin configured with `ProgressStallTimeout.Disabled` and AWS S3 upload options + * carrying a per-upload `Interval(60)` override + * - When: `uploadFile` is invoked with a string key + * - Then: `StorageService.uploadFile` is called with `progressStallTimeoutSeconds = 60` + */ + @Test + fun `string key uploadFile uses per-upload Interval override over Disabled plugin default`() { + val plugin = configuredPlugin(AWSS3StoragePluginConfiguration {}) + val options = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Interval(seconds = 60L)) + .build() + + plugin.uploadFile( + "test-key", + java.io.File.createTempFile("any", ".tmp"), + options, + NoOpConsumer.create(), + NoOpConsumer.create(), + Consumer { /* no-op */ } + ) + + captureUploadFileStallSeconds() shouldBe 60L + } + + /** + * Test that a per-upload `Disabled` override wins over a plugin-level `Interval` default, + * letting an individual upload opt out of detection even when the plugin enables it. + * + * - Given: a plugin configured with `ProgressStallTimeout.Interval(seconds = 30)` and AWS S3 + * upload options carrying a per-upload `Disabled` override + * - When: `uploadFile` is invoked with a `StoragePath` + * - Then: `StorageService.uploadFile` is called with `progressStallTimeoutSeconds = 0` + */ + @Test + fun `path uploadFile uses per-upload Disabled override over Interval plugin default`() { + val plugin = configuredPlugin( + AWSS3StoragePluginConfiguration { + progressStallTimeout = ProgressStallTimeout.Interval(seconds = 30L) + } + ) + val options = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Disabled) + .build() + + plugin.uploadFile( + StoragePath.fromString("public/test-key"), + java.io.File.createTempFile("any", ".tmp"), + options, + NoOpConsumer.create(), + NoOpConsumer.create(), + Consumer { /* no-op */ } + ) + + captureUploadFileStallSeconds() shouldBe 0L + } + + /** + * Test the `instanceof` fallback branch: when a caller supplies a plain + * [StorageUploadInputStreamOptions] (not an AWS-specific subclass), the plugin must fall + * back to its configured default rather than treating the upload as an explicit override. + * + * - Given: a plugin configured with `ProgressStallTimeout.Interval(seconds = 15)` and a + * non-AWS `StorageUploadInputStreamOptions` instance + * - When: `uploadInputStream` is invoked with a string key + * - Then: `StorageService.uploadInputStream` is called with `progressStallTimeoutSeconds = 15` + */ + @Test + fun `string key uploadInputStream falls back to plugin default for generic options`() { + val plugin = configuredPlugin( + AWSS3StoragePluginConfiguration { + progressStallTimeout = ProgressStallTimeout.Interval(seconds = 15L) + } + ) + val options = StorageUploadInputStreamOptions.defaultInstance() + + plugin.uploadInputStream( + "test-key", + ByteArrayInputStream(byteArrayOf(1, 2, 3)), + options, + NoOpConsumer.create(), + NoOpConsumer.create(), + Consumer { /* no-op */ } + ) + + captureUploadInputStreamStallSeconds() shouldBe 15L + } + + /** + * Test that a per-upload [ProgressStallTimeout.Interval] override is honored on the + * `StoragePath` overload of `uploadInputStream`. This mirrors the same precedence rules + * proven for `uploadFile` but exercises the input-stream code path. + * + * - Given: a plugin configured with `ProgressStallTimeout.Disabled` and AWS S3 upload options + * carrying a per-upload `Interval(90)` override + * - When: `uploadInputStream` is invoked with a `StoragePath` + * - Then: `StorageService.uploadInputStream` is called with `progressStallTimeoutSeconds = 90` + */ + @Test + fun `path uploadInputStream uses per-upload Interval override over Disabled plugin default`() { + val plugin = configuredPlugin(AWSS3StoragePluginConfiguration {}) + val options = AWSS3StorageUploadInputStreamOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Interval(seconds = 90L)) + .build() + + plugin.uploadInputStream( + StoragePath.fromString("public/test-key"), + ByteArrayInputStream(byteArrayOf(1, 2, 3)), + options, + NoOpConsumer.create(), + NoOpConsumer.create(), + Consumer { /* no-op */ } + ) + + captureUploadInputStreamStallSeconds() shouldBe 90L + } +} diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/AWSS3StoragePluginTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/AWSS3StoragePluginTest.kt index 9540b95091..786346092d 100644 --- a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/AWSS3StoragePluginTest.kt +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/AWSS3StoragePluginTest.kt @@ -21,6 +21,7 @@ import com.amplifyframework.storage.BucketInfo import com.amplifyframework.storage.InvalidStorageBucketException import com.amplifyframework.storage.StorageBucket import com.amplifyframework.storage.StorageException +import com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration import com.amplifyframework.storage.s3.service.AWSS3StorageService import com.amplifyframework.testutils.configuration.amplifyOutputsData import io.kotest.assertions.throwables.shouldThrow @@ -40,13 +41,13 @@ class AWSS3StoragePluginTest { var context: Context = ApplicationProvider.getApplicationContext() private val storageServiceFactory = mockk { - every { create(any(), any(), any(), any(), any()) } returns mockk() + every { create(any(), any(), any(), any(), any(), any()) } returns mockk() } private val plugin = AWSS3StoragePlugin( storageServiceFactory, mockk(), - mockk() + AWSS3StoragePluginConfiguration.Builder().build() ) @Test @@ -61,7 +62,7 @@ class AWSS3StoragePluginTest { plugin.configure(data, context) verify { - storageServiceFactory.create(any(), "test-region", "test-bucket", any(), any()) + storageServiceFactory.create(any(), "test-region", "test-bucket", any(), any(), any()) } } diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/StorageComponentTest.java b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/StorageComponentTest.java index bd5d6f668b..9002ffcdb4 100644 --- a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/StorageComponentTest.java +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/StorageComponentTest.java @@ -59,6 +59,7 @@ import static androidx.test.core.app.ApplicationProvider.getApplicationContext; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; @@ -95,7 +96,8 @@ public void setup() throws AmplifyException { region, bucket, clientProvider, - transferStatusUpdater + transferStatusUpdater, + defaultProgressStallTimeoutSeconds ) -> (AWSS3StorageService) storageService; AuthCredentialsProvider cognitoAuthProvider = mock(AuthCredentialsProvider.class); doReturn(RandomString.string()).when(cognitoAuthProvider).getIdentityId(null); @@ -269,7 +271,8 @@ public void testUploadFileGetsKey() throws Exception { anyString(), any(File.class), any(ObjectMetadata.class), - anyBoolean()) + anyBoolean(), + anyLong()) ) .thenReturn(observer); @@ -312,7 +315,8 @@ public void testUploadInputStreamGetsKey() throws Exception { anyString(), any(InputStream.class), any(ObjectMetadata.class), - anyBoolean()) + anyBoolean(), + anyLong()) ) .thenReturn(observer); @@ -361,7 +365,8 @@ public void testUploadFileError() throws IOException { anyString(), any(File.class), any(ObjectMetadata.class), - anyBoolean()) + anyBoolean(), + anyLong()) ).thenReturn(observer); doAnswer(invocation -> { @@ -407,7 +412,8 @@ public void testInputStreamError() throws IOException { anyString(), any(InputStream.class), any(ObjectMetadata.class), - anyBoolean()) + anyBoolean(), + anyLong()) ) .thenReturn(observer); diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/TransferOperationsResumeTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/TransferOperationsResumeTest.kt new file mode 100644 index 0000000000..c14cfe7bf4 --- /dev/null +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/TransferOperationsResumeTest.kt @@ -0,0 +1,139 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amplifyframework.storage.s3 + +import androidx.work.ExistingWorkPolicy +import androidx.work.OneTimeWorkRequest +import androidx.work.WorkManager +import com.amplifyframework.storage.TransferState +import com.amplifyframework.storage.s3.transfer.TransferDB +import com.amplifyframework.storage.s3.transfer.TransferRecord +import com.amplifyframework.storage.s3.transfer.TransferStatusUpdater +import com.amplifyframework.storage.s3.transfer.TransferType +import com.amplifyframework.storage.s3.transfer.TransferWorkerObserver +import com.amplifyframework.storage.s3.transfer.worker.BaseTransferWorker +import io.kotest.matchers.shouldBe +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.verify +import org.junit.Test + +class TransferOperationsResumeTest { + + /** + * Single-part upload resume must propagate the plugin-level default progress-stall timeout + * into the enqueued [androidx.work.WorkRequest] so the worker can re-arm stall detection. + * Without this, resumed uploads silently lose the stall-cancel behavior the developer + * configured on the plugin (parity with the iOS Amplify Storage plugin). + * + * - Given: a paused single-part upload `TransferRecord` and a `progressStallTimeoutSeconds` + * value of 42 carried from the plugin configuration + * - When: `TransferOperations.resume(...)` is invoked + * - Then: the work data enqueued for the worker contains + * `PROGRESS_STALL_TIMEOUT_SECONDS = 42` + */ + @Test + fun `resume single-part upload propagates plugin default progressStallTimeoutSeconds`() { + val transferRecord = TransferRecord( + id = 1, + transferId = "transfer-id", + isMultipart = 0, + type = TransferType.UPLOAD, + state = TransferState.PAUSED, + bucketName = "bucket", + region = "us-east-1", + key = "key", + file = "file" + ) + val workManager = mockk(relaxed = true) { + every { enqueueUniqueWork(any(), any(), any()) } returns + mockk(relaxed = true) + } + val transferStatusUpdater = mockk(relaxed = true) + val transferDB = mockk(relaxed = true) + val workerObserver = mockk(relaxed = true) + + val workRequestSlot = slot() + every { + workManager.enqueueUniqueWork(any(), any(), capture(workRequestSlot)) + } returns mockk(relaxed = true) + + val resumed = TransferOperations.resume( + transferRecord, + "pluginKey", + transferStatusUpdater, + workManager, + workerObserver, + transferDB, + 42L + ) + + resumed shouldBe true + verify { + workManager.enqueueUniqueWork( + transferRecord.id.toString(), + ExistingWorkPolicy.KEEP, + any() + ) + } + val workData = workRequestSlot.captured.workSpec.input + workData.getLong(BaseTransferWorker.PROGRESS_STALL_TIMEOUT_SECONDS, -1L) shouldBe 42L + } + + /** + * Validates the legacy/backward-compatible call path where no plugin default is supplied. The + * default `0L` must continue to flow through the resume path so existing callers (and + * downloads, which never opted into stall detection) keep their pre-feature behavior. + * + * - Given: a paused single-part upload `TransferRecord` and the default + * `progressStallTimeoutSeconds` parameter (i.e. `0L`) + * - When: `TransferOperations.resume(...)` is invoked without a stall-timeout argument + * - Then: the enqueued work data carries `PROGRESS_STALL_TIMEOUT_SECONDS = 0` + */ + @Test + fun `resume without plugin default keeps zero seconds for backward compatibility`() { + val transferRecord = TransferRecord( + id = 7, + transferId = "transfer-id-7", + isMultipart = 0, + type = TransferType.UPLOAD, + state = TransferState.PAUSED, + bucketName = "bucket", + region = "us-east-1", + key = "key", + file = "file" + ) + val workManager = mockk(relaxed = true) + val workRequestSlot = slot() + every { + workManager.enqueueUniqueWork(any(), any(), capture(workRequestSlot)) + } returns mockk(relaxed = true) + + val resumed = TransferOperations.resume( + transferRecord, + "pluginKey", + mockk(relaxed = true), + workManager, + mockk(relaxed = true), + mockk(relaxed = true) + ) + + resumed shouldBe true + val workData = workRequestSlot.captured.workSpec.input + workData.getLong(BaseTransferWorker.PROGRESS_STALL_TIMEOUT_SECONDS, -1L) shouldBe 0L + } +} diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfigurationTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfigurationTest.kt index 73551b326a..e4c9ebdedd 100644 --- a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfigurationTest.kt +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/configuration/AWSS3StoragePluginConfigurationTest.kt @@ -17,6 +17,7 @@ package com.amplifyframework.storage.s3.configuration import com.amplifyframework.auth.AuthCredentialsProvider import com.amplifyframework.core.Consumer +import com.amplifyframework.storage.ProgressStallTimeout import com.amplifyframework.storage.StorageAccessLevel import com.amplifyframework.storage.StorageException import io.mockk.mockk @@ -51,4 +52,49 @@ class AWSS3StoragePluginConfigurationTest : TestCase() { awsS3StoragePluginConfiguration.getAWSS3PluginPrefixResolver(authCredentialsProvider) assert(awsS3PluginPrefixResolver is StorageAccessLevelAwarePrefixResolver) } + + /** + * When no [ProgressStallTimeout] is supplied via the builder, the plugin configuration + * should default to [ProgressStallTimeout.Disabled] to preserve existing upload behavior. + * + * - Given: a plugin configuration built without overriding `progressStallTimeout` + * - When: the configuration is constructed + * - Then: `progressStallTimeout` equals [ProgressStallTimeout.Disabled] + */ + fun testProgressStallTimeoutDefaultsToDisabled() { + val configuration = AWSS3StoragePluginConfiguration {} + assertEquals(ProgressStallTimeout.Disabled, configuration.progressStallTimeout) + } + + /** + * A custom [ProgressStallTimeout.Interval] provided on the builder must be honored by the + * resulting configuration so that it can be propagated to uploads that do not override it. + * + * - Given: a builder that sets `progressStallTimeout` to an [ProgressStallTimeout.Interval] + * - When: the configuration is built + * - Then: the same [ProgressStallTimeout.Interval] is exposed on the configuration + */ + fun testProgressStallTimeoutIntervalPropagatesFromBuilder() { + val interval = ProgressStallTimeout.Interval(seconds = 15L) + val configuration = AWSS3StoragePluginConfiguration { + progressStallTimeout = interval + } + assertEquals(interval, configuration.progressStallTimeout) + } + + /** + * Non-positive intervals must not attempt to schedule a stall timer. The [ProgressStallTimeout] + * sealed class reports `secondsForStallTimer = 0` for such values, effectively disabling + * detection while still preserving the original user-provided configuration object. + * + * - Given: a configuration with an [ProgressStallTimeout.Interval] of `0` seconds + * - When: `secondsForStallTimer` is read from the configured timeout + * - Then: the returned value is `0`, disabling the stall timer + */ + fun testProgressStallTimeoutZeroIntervalDisablesStallTimer() { + val configuration = AWSS3StoragePluginConfiguration { + progressStallTimeout = ProgressStallTimeout.Interval(seconds = 0L) + } + assertEquals(0L, configuration.progressStallTimeout.secondsForStallTimer) + } } diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/extensions/StorageExceptionExtensionsTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/extensions/StorageExceptionExtensionsTest.kt new file mode 100644 index 0000000000..91117c146a --- /dev/null +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/extensions/StorageExceptionExtensionsTest.kt @@ -0,0 +1,75 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.storage.s3.extensions + +import com.amplifyframework.storage.ProgressStallTimeoutException +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import org.junit.Test + +internal class StorageExceptionExtensionsTest { + + // / Wrapping a direct ProgressStallTimeoutException preserves the typed cause. + // / + // / - Given: A ProgressStallTimeoutException thrown from the worker layer. + // / - When: toStorageUploadException is invoked with a fallback message. + // / - Then: The returned StorageException's cause is the original typed exception, and the + // / stall-specific message is used instead of the fallback. + @Test + fun progressStallTimeoutExceptionPreservesTypedCause() { + val fallbackMessage = "generic upload failure" + val stall = ProgressStallTimeoutException("stall", "retry") + + val wrapped = stall.toStorageUploadException(fallbackMessage) + + wrapped.cause.shouldBeInstanceOf() + wrapped.cause shouldBe stall + wrapped.message shouldNotBe fallbackMessage + wrapped.message!!.shouldContain("progress stall") + } + + // / A ProgressStallTimeoutException nested inside another throwable is still surfaced. + // / + // / - Given: A generic Exception whose cause is a ProgressStallTimeoutException. + // / - When: toStorageUploadException is invoked. + // / - Then: The returned StorageException surfaces the typed cause, not the outer wrapper. + @Test + fun nestedProgressStallTimeoutExceptionIsDetected() { + val stall = ProgressStallTimeoutException("stall", "retry") + val outer = RuntimeException("wrapper", stall) + + val wrapped = outer.toStorageUploadException("generic upload failure") + + wrapped.cause shouldBe stall + } + + // / Non-stall throwables are wrapped with the fallback message and preserve the original cause. + // / + // / - Given: A generic RuntimeException unrelated to stall detection. + // / - When: toStorageUploadException is invoked. + // / - Then: The returned StorageException uses the fallback message and keeps the original as cause. + @Test + fun genericThrowableUsesFallbackMessage() { + val fallbackMessage = "something went wrong" + val original = RuntimeException("boom") + + val wrapped = original.toStorageUploadException(fallbackMessage) + + wrapped.message shouldBe fallbackMessage + wrapped.cause shouldBe original + } +} diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadFileOperationTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadFileOperationTest.kt index 3c3125cc64..a8e9abcbfc 100644 --- a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadFileOperationTest.kt +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadFileOperationTest.kt @@ -81,7 +81,8 @@ class AWSS3StoragePathUploadFileOperationTest { expectedServiceKey, tempFile, any(), - false + false, + 0L ) } } @@ -123,7 +124,8 @@ class AWSS3StoragePathUploadFileOperationTest { expectedServiceKey, tempFile, any(), - false + false, + 0L ) } } @@ -159,7 +161,7 @@ class AWSS3StoragePathUploadFileOperationTest { // THEN verify { onError.accept(StoragePathValidationException.invalidStoragePathException()) } verify(exactly = 0) { - storageService.uploadFile(any(), any(), any(), any(), any()) + storageService.uploadFile(any(), any(), any(), any(), any(), any()) } } @@ -203,7 +205,60 @@ class AWSS3StoragePathUploadFileOperationTest { ) } verify(exactly = 0) { - storageService.uploadFile(any(), any(), any(), any(), any()) + storageService.uploadFile(any(), any(), any(), any(), any(), any()) + } + } + + /** + * Test that the resolved progress-stall timeout from the upload request is forwarded to + * `StorageService.uploadFile`. The plugin resolves the effective seconds when it builds the + * request so the operation only needs to plumb the value through. + * + * - Given: a [AWSS3StoragePathUploadRequest] with `progressStallTimeoutSeconds = 30` + * - When: the operation starts and the path resolves successfully + * - Then: `StorageService.uploadFile` is invoked with `progressStallTimeoutSeconds = 30` + */ + @Test + fun `progressStallTimeoutSeconds from request is forwarded to storage service`() { + // GIVEN + val path = StoragePath.fromString("public/123") + val tempFile = File.createTempFile("new", "file.tmp") + val expectedServiceKey = "public/123" + val expectedStallTimeout = 30L + val request = AWSS3StoragePathUploadRequest( + path, + tempFile, + "/image", + ServerSideEncryption.NONE, + emptyMap(), + false, + expectedStallTimeout + ) + val onError = mockk>(relaxed = true) + awsS3StorageUploadFileOperation = AWSS3StoragePathUploadFileOperation( + request = request, + storageService = storageService, + executorService = MoreExecutors.newDirectExecutorService(), + authCredentialsProvider = authCredentialsProvider, + {}, + {}, + onError + ) + + // WHEN + awsS3StorageUploadFileOperation.start() + + // THEN + verify(exactly = 0) { onError.accept(any()) } + verify { + storageService.uploadFile( + awsS3StorageUploadFileOperation.transferId, + expectedServiceKey, + tempFile, + any(), + false, + expectedStallTimeout + ) } } @@ -237,7 +292,7 @@ class AWSS3StoragePathUploadFileOperationTest { // THEN verify { onError.accept(StoragePathValidationException.unsupportedStoragePathException()) } verify(exactly = 0) { - storageService.uploadFile(any(), any(), any(), any(), any()) + storageService.uploadFile(any(), any(), any(), any(), any(), any()) } } diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadInputStreamOperationTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadInputStreamOperationTest.kt index 1b6bad2fdb..52651a9498 100644 --- a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadInputStreamOperationTest.kt +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StoragePathUploadInputStreamOperationTest.kt @@ -82,7 +82,8 @@ class AWSS3StoragePathUploadInputStreamOperationTest { expectedServiceKey, inputStream, any(), - false + false, + 0L ) } } @@ -124,7 +125,8 @@ class AWSS3StoragePathUploadInputStreamOperationTest { expectedServiceKey, inputStream, any(), - false + false, + 0L ) } } @@ -160,7 +162,7 @@ class AWSS3StoragePathUploadInputStreamOperationTest { // THEN verify { onError.accept(StoragePathValidationException.invalidStoragePathException()) } verify(exactly = 0) { - storageService.uploadInputStream(any(), any(), any(), any(), any()) + storageService.uploadInputStream(any(), any(), any(), any(), any(), any()) } } @@ -204,7 +206,60 @@ class AWSS3StoragePathUploadInputStreamOperationTest { ) } verify(exactly = 0) { - storageService.uploadInputStream(any(), any(), any(), any(), any()) + storageService.uploadInputStream(any(), any(), any(), any(), any(), any()) + } + } + + /** + * Test that the resolved progress-stall timeout from the upload request is forwarded to + * `StorageService.uploadInputStream`. The plugin resolves the effective seconds when it + * builds the request so the operation only needs to plumb the value through. + * + * - Given: a [AWSS3StoragePathUploadRequest] with `progressStallTimeoutSeconds = 45` + * - When: the operation starts and the path resolves successfully + * - Then: `StorageService.uploadInputStream` is invoked with `progressStallTimeoutSeconds = 45` + */ + @Test + fun `progressStallTimeoutSeconds from request is forwarded to storage service`() { + // GIVEN + val path = StoragePath.fromString("public/123") + val inputStream = File.createTempFile("new", "file.tmp").inputStream() + val expectedServiceKey = "public/123" + val expectedStallTimeout = 45L + val request = AWSS3StoragePathUploadRequest( + path, + inputStream, + "/image", + ServerSideEncryption.NONE, + emptyMap(), + false, + expectedStallTimeout + ) + val onError = mockk>(relaxed = true) + awsS3StorageUploadInputStreamOperation = AWSS3StoragePathUploadInputStreamOperation( + request = request, + storageService = storageService, + executorService = MoreExecutors.newDirectExecutorService(), + authCredentialsProvider = authCredentialsProvider, + {}, + {}, + onError + ) + + // WHEN + awsS3StorageUploadInputStreamOperation.start() + + // THEN + verify(exactly = 0) { onError.accept(any()) } + verify { + storageService.uploadInputStream( + awsS3StorageUploadInputStreamOperation.transferId, + expectedServiceKey, + inputStream, + any(), + false, + expectedStallTimeout + ) } } @@ -238,7 +293,7 @@ class AWSS3StoragePathUploadInputStreamOperationTest { // THEN verify { onError.accept(StoragePathValidationException.unsupportedStoragePathException()) } verify(exactly = 0) { - storageService.uploadInputStream(any(), any(), any(), any(), any()) + storageService.uploadInputStream(any(), any(), any(), any(), any(), any()) } } diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperationTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperationTest.kt index fce3093071..1719e61193 100644 --- a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperationTest.kt +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadFileOperationTest.kt @@ -24,6 +24,7 @@ import com.amplifyframework.storage.s3.configuration.AWSS3PluginPrefixResolver import com.amplifyframework.storage.s3.configuration.AWSS3StoragePluginConfiguration import com.amplifyframework.storage.s3.request.AWSS3StorageUploadRequest import com.amplifyframework.storage.s3.service.StorageService +import com.amplifyframework.storage.s3.transfer.TransferObserver import com.google.common.util.concurrent.MoreExecutors import java.io.File import org.junit.Before @@ -31,6 +32,8 @@ import org.junit.Test import org.junit.runner.RunWith import org.mockito.Mockito import org.mockito.Mockito.any +import org.mockito.Mockito.anyLong +import org.mockito.Mockito.anyString import org.mockito.Mockito.eq import org.robolectric.RobolectricTestRunner @@ -78,7 +81,8 @@ class AWSS3StorageUploadFileOperationTest { eq(expectedKey), eq(tempFile), any(ObjectMetadata::class.java), - eq(false) + eq(false), + eq(0L) ) } @@ -124,7 +128,8 @@ class AWSS3StorageUploadFileOperationTest { eq(expectedKey), eq(tempFile), any(ObjectMetadata::class.java), - eq(false) + eq(false), + eq(0L) ) } @@ -170,7 +175,70 @@ class AWSS3StorageUploadFileOperationTest { eq(expectedKey), eq(tempFile), any(ObjectMetadata::class.java), - eq(false) + eq(false), + eq(0L) + ) + } + + /** + * Test that the Java-friendly constructor that accepts a resolved progress-stall timeout + * propagates the value all the way to `StorageService.uploadFile`. The plugin resolves the + * effective value before instantiating the operation, so the operation only needs to hand it + * to the service. + * + * - Given: a [AWSS3StorageUploadFileOperation] built via the new long-arg constructor + * with a positive `progressStallTimeoutSeconds` + * - When: the operation starts + * - Then: `StorageService.uploadFile` receives the same `progressStallTimeoutSeconds` + */ + @Test + fun `progressStallTimeoutSeconds is forwarded to storage service uploadFile`() { + val key = "123" + val expectedKey = "public/123" + val expectedStallTimeout = 30L + val tempFile = File.createTempFile("new", "file.tmp") + Mockito.`when`( + storageService.uploadFile( + anyString(), + anyString(), + any(), + any(), + eq(false), + anyLong() + ) + ).thenReturn(Mockito.mock(TransferObserver::class.java)) + val request = AWSS3StorageUploadRequest( + key, + tempFile, + StorageAccessLevel.PUBLIC, + "", + "/image", + ServerSideEncryption.NONE, + mutableMapOf(), + false + ) + + awsS3StorageUploadFileOperation = AWSS3StorageUploadFileOperation( + storageService, + MoreExecutors.newDirectExecutorService(), + authCredentialsProvider, + request, + AWSS3StoragePluginConfiguration {}, + {}, + {}, + {}, + expectedStallTimeout + ) + + awsS3StorageUploadFileOperation.start() + + Mockito.verify(storageService).uploadFile( + eq(awsS3StorageUploadFileOperation.transferId), + eq(expectedKey), + eq(tempFile), + any(ObjectMetadata::class.java), + eq(false), + eq(expectedStallTimeout) ) } } diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperationTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperationTest.kt index f73d467a6a..0e9da76927 100644 --- a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperationTest.kt +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/operation/AWSS3StorageUploadInputStreamOperationTest.kt @@ -35,6 +35,7 @@ import org.junit.Test import org.junit.runner.RunWith import org.mockito.Mockito import org.mockito.Mockito.any +import org.mockito.Mockito.anyLong import org.mockito.Mockito.eq import org.robolectric.RobolectricTestRunner @@ -63,7 +64,8 @@ class AWSS3StorageUploadInputStreamOperationTest { any(), any(), any(), - eq(false) + eq(false), + anyLong() ) ).thenReturn(Mockito.mock(TransferObserver::class.java)) val request = AWSS3StorageUploadRequest( @@ -92,7 +94,8 @@ class AWSS3StorageUploadInputStreamOperationTest { eq(expectedKey), eq(tempInputStream), any(ObjectMetadata::class.java), - eq(false) + eq(false), + eq(0L) ) } @@ -108,7 +111,8 @@ class AWSS3StorageUploadInputStreamOperationTest { any(), any(), any(), - eq(false) + eq(false), + anyLong() ) ) .thenReturn(Mockito.mock(TransferObserver::class.java)) @@ -149,7 +153,8 @@ class AWSS3StorageUploadInputStreamOperationTest { eq(expectedKey), eq(tempInputStream), any(ObjectMetadata::class.java), - eq(false) + eq(false), + eq(0L) ) } @@ -165,7 +170,8 @@ class AWSS3StorageUploadInputStreamOperationTest { any(), any(), any(), - eq(false) + eq(false), + anyLong() ) ).thenReturn(Mockito.mock(TransferObserver::class.java)) val request = AWSS3StorageUploadRequest( @@ -205,7 +211,71 @@ class AWSS3StorageUploadInputStreamOperationTest { eq(expectedKey), eq(tempInputStream), any(ObjectMetadata::class.java), - eq(false) + eq(false), + eq(0L) + ) + } + + /** + * Test that the Java-friendly constructor that accepts a resolved progress-stall timeout + * propagates the value all the way to `StorageService.uploadInputStream`. The plugin + * resolves the effective value before instantiating the operation, so the operation only + * needs to hand it to the service. + * + * - Given: a [AWSS3StorageUploadInputStreamOperation] built via the new long-arg constructor + * with a positive `progressStallTimeoutSeconds` + * - When: the operation starts + * - Then: `StorageService.uploadInputStream` receives the same `progressStallTimeoutSeconds` + */ + @Test + fun `progressStallTimeoutSeconds is forwarded to storage service uploadInputStream`() { + val key = "123" + val expectedKey = "public/123" + val expectedStallTimeout = 45L + val tempInputStream = File.createTempFile("new", "file.tmp").inputStream() + coEvery { authCredentialsProvider.getIdentityId() } returns "abc" + Mockito.`when`( + storageService.uploadInputStream( + any(), + any(), + any(), + any(), + eq(false), + anyLong() + ) + ).thenReturn(Mockito.mock(TransferObserver::class.java)) + val request = AWSS3StorageUploadRequest( + key, + tempInputStream, + StorageAccessLevel.PUBLIC, + "", + "/image", + ServerSideEncryption.NONE, + mutableMapOf(), + false + ) + + inputStreamOperation = AWSS3StorageUploadInputStreamOperation( + storageService, + MoreExecutors.newDirectExecutorService(), + authCredentialsProvider, + AWSS3StoragePluginConfiguration {}, + request, + {}, + {}, + {}, + expectedStallTimeout + ) + + inputStreamOperation.start() + + Mockito.verify(storageService).uploadInputStream( + eq(inputStreamOperation.transferId), + eq(expectedKey), + eq(tempInputStream), + any(ObjectMetadata::class.java), + eq(false), + eq(expectedStallTimeout) ) } } diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptionsTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptionsTest.kt new file mode 100644 index 0000000000..64dba82cb0 --- /dev/null +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadFileOptionsTest.kt @@ -0,0 +1,111 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.storage.s3.options + +import com.amplifyframework.storage.ProgressStallTimeout +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import org.junit.Test + +class AWSS3StorageUploadFileOptionsTest { + + /** + * The default instance must leave `progressStallTimeout` as `null` so that the plugin-level + * default from `AWSS3StoragePluginConfiguration` is used unless a caller explicitly overrides it. + * + * - Given: the default instance of [AWSS3StorageUploadFileOptions] + * - When: `progressStallTimeout` is read + * - Then: the value is `null`, meaning "defer to plugin default" + */ + @Test + fun `default progressStallTimeout is null so plugin default applies`() { + val options = AWSS3StorageUploadFileOptions.defaultInstance() + options.progressStallTimeout shouldBe null + } + + /** + * Supplying a per-upload [ProgressStallTimeout] via the builder must make that value + * reachable from the built options object so that downstream code can prefer the override + * over the plugin default. + * + * - Given: a builder with a non-null [ProgressStallTimeout.Interval] + * - When: the options are built + * - Then: `progressStallTimeout` equals the configured override + */ + @Test + fun `builder propagates per-upload progressStallTimeout override`() { + val override = ProgressStallTimeout.Interval(seconds = 20L) + val options = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(override) + .build() + options.progressStallTimeout shouldBe override + } + + /** + * [ProgressStallTimeout.Disabled] is a legitimate per-upload override that opts the upload + * out of stall detection even if the plugin default enables it. It must round-trip through + * the builder without being treated as "defer to plugin default". + * + * - Given: a builder with `progressStallTimeout = ProgressStallTimeout.Disabled` + * - When: the options are built + * - Then: `progressStallTimeout` is `Disabled` (not `null`) + */ + @Test + fun `builder preserves Disabled as explicit per-upload opt out`() { + val options = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Disabled) + .build() + options.progressStallTimeout shouldBe ProgressStallTimeout.Disabled + options.progressStallTimeout shouldNotBe null + } + + /** + * `from(options)` is used to clone or customize an existing options object. The resulting + * builder must carry over the previously configured [ProgressStallTimeout] so that callers + * do not accidentally lose the override when creating derived options. + * + * - Given: an options object with a non-null [ProgressStallTimeout.Interval] override + * - When: `from(options)` is used to produce a new options object + * - Then: the clone exposes the same `progressStallTimeout` + */ + @Test + fun `from copies progressStallTimeout`() { + val original = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Interval(seconds = 5L)) + .build() + val clone = AWSS3StorageUploadFileOptions.from(original).build() + clone.progressStallTimeout shouldBe original.progressStallTimeout + } + + /** + * Equality and hashing must include `progressStallTimeout` so that two options objects + * that only differ on the override are not considered equal. + * + * - Given: two options objects with different [ProgressStallTimeout] overrides + * - When: `equals`/`hashCode` are evaluated + * - Then: the objects are not equal + */ + @Test + fun `equals and hashCode account for progressStallTimeout`() { + val a = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Interval(seconds = 10L)) + .build() + val b = AWSS3StorageUploadFileOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Disabled) + .build() + (a == b) shouldBe false + (a.hashCode() == b.hashCode()) shouldBe false + } +} diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptionsTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptionsTest.kt new file mode 100644 index 0000000000..ef2431a7f5 --- /dev/null +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/options/AWSS3StorageUploadInputStreamOptionsTest.kt @@ -0,0 +1,111 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.storage.s3.options + +import com.amplifyframework.storage.ProgressStallTimeout +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import org.junit.Test + +class AWSS3StorageUploadInputStreamOptionsTest { + + /** + * The default instance must leave `progressStallTimeout` as `null` so that the plugin-level + * default from `AWSS3StoragePluginConfiguration` is used unless a caller explicitly overrides it. + * + * - Given: the default instance of [AWSS3StorageUploadInputStreamOptions] + * - When: `progressStallTimeout` is read + * - Then: the value is `null`, meaning "defer to plugin default" + */ + @Test + fun `default progressStallTimeout is null so plugin default applies`() { + val options = AWSS3StorageUploadInputStreamOptions.defaultInstance() + options.progressStallTimeout shouldBe null + } + + /** + * Supplying a per-upload [ProgressStallTimeout] via the builder must make that value + * reachable from the built options object so that downstream code can prefer the override + * over the plugin default. + * + * - Given: a builder with a non-null [ProgressStallTimeout.Interval] + * - When: the options are built + * - Then: `progressStallTimeout` equals the configured override + */ + @Test + fun `builder propagates per-upload progressStallTimeout override`() { + val override = ProgressStallTimeout.Interval(seconds = 20L) + val options = AWSS3StorageUploadInputStreamOptions.builder() + .progressStallTimeout(override) + .build() + options.progressStallTimeout shouldBe override + } + + /** + * [ProgressStallTimeout.Disabled] is a legitimate per-upload override that opts the upload + * out of stall detection even if the plugin default enables it. It must round-trip through + * the builder without being treated as "defer to plugin default". + * + * - Given: a builder with `progressStallTimeout = ProgressStallTimeout.Disabled` + * - When: the options are built + * - Then: `progressStallTimeout` is `Disabled` (not `null`) + */ + @Test + fun `builder preserves Disabled as explicit per-upload opt out`() { + val options = AWSS3StorageUploadInputStreamOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Disabled) + .build() + options.progressStallTimeout shouldBe ProgressStallTimeout.Disabled + options.progressStallTimeout shouldNotBe null + } + + /** + * `from(options)` is used to clone or customize an existing options object. The resulting + * builder must carry over the previously configured [ProgressStallTimeout] so that callers + * do not accidentally lose the override when creating derived options. + * + * - Given: an options object with a non-null [ProgressStallTimeout.Interval] override + * - When: `from(options)` is used to produce a new options object + * - Then: the clone exposes the same `progressStallTimeout` + */ + @Test + fun `from copies progressStallTimeout`() { + val original = AWSS3StorageUploadInputStreamOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Interval(seconds = 5L)) + .build() + val clone = AWSS3StorageUploadInputStreamOptions.from(original).build() + clone.progressStallTimeout shouldBe original.progressStallTimeout + } + + /** + * Equality and hashing must include `progressStallTimeout` so that two options objects + * that only differ on the override are not considered equal. + * + * - Given: two options objects with different [ProgressStallTimeout] overrides + * - When: `equals`/`hashCode` are evaluated + * - Then: the objects are not equal + */ + @Test + fun `equals and hashCode account for progressStallTimeout`() { + val a = AWSS3StorageUploadInputStreamOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Interval(seconds = 10L)) + .build() + val b = AWSS3StorageUploadInputStreamOptions.builder() + .progressStallTimeout(ProgressStallTimeout.Disabled) + .build() + (a == b) shouldBe false + (a.hashCode() == b.hashCode()) shouldBe false + } +} diff --git a/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/transfer/StallDetectingProgressListenerTest.kt b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/transfer/StallDetectingProgressListenerTest.kt new file mode 100644 index 0000000000..adf1cc00c1 --- /dev/null +++ b/aws-storage-s3/src/test/java/com/amplifyframework/storage/s3/transfer/StallDetectingProgressListenerTest.kt @@ -0,0 +1,247 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.storage.s3.transfer + +import io.kotest.matchers.shouldBe +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.Duration.Companion.seconds +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runCurrent +import kotlinx.coroutines.test.runTest +import org.junit.Test + +@OptIn(ExperimentalCoroutinesApi::class) +internal class StallDetectingProgressListenerTest { + + private class RecordingListener : ProgressListener { + val totalBytes = AtomicLong(0L) + val callCount = AtomicInteger(0) + override fun progressChanged(bytesTransferred: Long) { + totalBytes.addAndGet(bytesTransferred) + callCount.incrementAndGet() + } + } + + /** + * Test that a timeout of 0 suppresses stall detection. + * + * - Given: a listener constructed with `stallTimeoutSeconds = 0` + * - When: time passes beyond any plausible timeout without progress events + * - Then: `onStall` is never invoked + */ + @Test + fun `stall timeout of 0 never fires onStall`() = runTest { + val stallCount = AtomicInteger(0) + val listener = StallDetectingProgressListener( + delegate = RecordingListener(), + stallTimeoutSeconds = 0L, + onStall = { stallCount.incrementAndGet() }, + scope = CoroutineScope(StandardTestDispatcher(testScheduler)), + ownsScope = false + ) + + listener.start() + advanceTimeBy(10.seconds) + runCurrent() + + stallCount.get() shouldBe 0 + } + + /** + * Test that stall fires once after the configured interval with no progress. + * + * - Given: a listener started with `stallTimeoutSeconds = 5` + * - When: 5 seconds of virtual time pass with no progress events + * - Then: `onStall` is invoked exactly once + */ + @Test + fun `fires onStall exactly once after timeout when no progress`() = runTest { + val stallCount = AtomicInteger(0) + val delegate = RecordingListener() + val listener = StallDetectingProgressListener( + delegate = delegate, + stallTimeoutSeconds = 5L, + onStall = { stallCount.incrementAndGet() }, + scope = CoroutineScope(StandardTestDispatcher(testScheduler)), + ownsScope = false + ) + + listener.start() + advanceTimeBy(5.seconds) + runCurrent() + + stallCount.get() shouldBe 1 + delegate.callCount.get() shouldBe 0 + } + + /** + * Test that progress events reset the stall timer. + * + * - Given: a listener with a 5 second timeout + * - When: a progress event fires every 3 seconds + * - Then: `onStall` is never invoked and each event is forwarded to the delegate + */ + @Test + fun `progress before timeout resets timer and prevents stall`() = runTest { + val stallCount = AtomicInteger(0) + val delegate = RecordingListener() + val listener = StallDetectingProgressListener( + delegate = delegate, + stallTimeoutSeconds = 5L, + onStall = { stallCount.incrementAndGet() }, + scope = CoroutineScope(StandardTestDispatcher(testScheduler)), + ownsScope = false + ) + + listener.start() + repeat(4) { + advanceTimeBy(3.seconds) + listener.progressChanged(1024L) + runCurrent() + } + + stallCount.get() shouldBe 0 + delegate.callCount.get() shouldBe 4 + delegate.totalBytes.get() shouldBe 4096L + } + + /** + * Test that a zero-byte progress event does not reset the stall timer. + * + * `progressChanged(0)` is used in multipart flows to signal a part reset without transferring + * bytes; it should not count as "forward progress" for stall detection. + * + * - Given: a listener with a 5 second timeout + * - When: zero-byte progress events are dispatched for the full timeout window + * - Then: `onStall` still fires exactly once and the delegate sees every event + */ + @Test + fun `zero byte progress events do not reset stall timer`() = runTest { + val stallCount = AtomicInteger(0) + val delegate = RecordingListener() + val listener = StallDetectingProgressListener( + delegate = delegate, + stallTimeoutSeconds = 5L, + onStall = { stallCount.incrementAndGet() }, + scope = CoroutineScope(StandardTestDispatcher(testScheduler)), + ownsScope = false + ) + + listener.start() + repeat(4) { + advanceTimeBy(1.seconds) + listener.progressChanged(0L) + } + advanceTimeBy(2.seconds) + runCurrent() + + stallCount.get() shouldBe 1 + delegate.callCount.get() shouldBe 4 + delegate.totalBytes.get() shouldBe 0L + } + + /** + * Test that closing the listener prevents a pending stall fire. + * + * - Given: a listener that has armed its stall timer + * - When: `close()` is called before the timeout elapses + * - Then: `onStall` is never invoked even if virtual time advances past the timeout + */ + @Test + fun `close before timeout prevents stall`() = runTest { + val stallCount = AtomicInteger(0) + val listener = StallDetectingProgressListener( + delegate = RecordingListener(), + stallTimeoutSeconds = 5L, + onStall = { stallCount.incrementAndGet() }, + scope = CoroutineScope(StandardTestDispatcher(testScheduler)), + ownsScope = false + ) + + listener.start() + advanceTimeBy(3.seconds) + listener.close() + advanceTimeBy(10.seconds) + runCurrent() + + stallCount.get() shouldBe 0 + } + + /** + * Test that close is idempotent and subsequent progress events still reach the delegate. + * + * - Given: a listener that has been closed + * - When: `close()` is called again and a progress event is dispatched + * - Then: neither call throws; `onStall` never fires; and the delegate continues to receive + * progress events (progress forwarding is independent of stall detection) + */ + @Test + fun `close is idempotent and delegate still receives progress after close`() = runTest { + val stallCount = AtomicInteger(0) + val delegate = RecordingListener() + val listener = StallDetectingProgressListener( + delegate = delegate, + stallTimeoutSeconds = 5L, + onStall = { stallCount.incrementAndGet() }, + scope = CoroutineScope(StandardTestDispatcher(testScheduler)), + ownsScope = false + ) + + listener.start() + listener.close() + listener.close() + listener.progressChanged(2048L) + + stallCount.get() shouldBe 0 + delegate.callCount.get() shouldBe 1 + delegate.totalBytes.get() shouldBe 2048L + } + + /** + * Test that a rapid burst of progress events does not accumulate concurrent timers. + * + * This is a regression guard: before rearming, the listener must cancel the previous job. + * If it did not, many timers would be in flight and the first one to fire would trigger + * [onStall] even though progress was still actively arriving. + * + * - Given: a listener with a 5 second timeout + * - When: a flurry of progress events is dispatched and then progress stops for one interval + * - Then: exactly one stall fires, not N + */ + @Test + fun `rapid progress events do not accumulate timers`() = runTest { + val stallCount = AtomicInteger(0) + val listener = StallDetectingProgressListener( + delegate = RecordingListener(), + stallTimeoutSeconds = 5L, + onStall = { stallCount.incrementAndGet() }, + scope = CoroutineScope(StandardTestDispatcher(testScheduler)), + ownsScope = false + ) + + listener.start() + repeat(20) { + listener.progressChanged(512L) + } + advanceTimeBy(5.seconds) + runCurrent() + + stallCount.get() shouldBe 1 + } +} diff --git a/core/api/core.api b/core/api/core.api index db391e81d3..850ce67329 100644 --- a/core/api/core.api +++ b/core/api/core.api @@ -4682,6 +4682,38 @@ public final class com/amplifyframework/storage/ObjectMetadata { public final class com/amplifyframework/storage/ObjectMetadata$Companion { } +public abstract class com/amplifyframework/storage/ProgressStallTimeout { + public static final field Companion Lcom/amplifyframework/storage/ProgressStallTimeout$Companion; + public static final fun disabled ()Lcom/amplifyframework/storage/ProgressStallTimeout; + public abstract fun getSecondsForStallTimer ()J + public static final fun interval (J)Lcom/amplifyframework/storage/ProgressStallTimeout; +} + +public final class com/amplifyframework/storage/ProgressStallTimeout$Companion { + public final fun disabled ()Lcom/amplifyframework/storage/ProgressStallTimeout; + public final fun interval (J)Lcom/amplifyframework/storage/ProgressStallTimeout; +} + +public final class com/amplifyframework/storage/ProgressStallTimeout$Disabled : com/amplifyframework/storage/ProgressStallTimeout { + public static final field INSTANCE Lcom/amplifyframework/storage/ProgressStallTimeout$Disabled; + public fun getSecondsForStallTimer ()J +} + +public final class com/amplifyframework/storage/ProgressStallTimeout$Interval : com/amplifyframework/storage/ProgressStallTimeout { + public fun (J)V + public final fun component1 ()J + public final fun copy (J)Lcom/amplifyframework/storage/ProgressStallTimeout$Interval; + public static synthetic fun copy$default (Lcom/amplifyframework/storage/ProgressStallTimeout$Interval;JILjava/lang/Object;)Lcom/amplifyframework/storage/ProgressStallTimeout$Interval; + public fun equals (Ljava/lang/Object;)Z + public final fun getSeconds ()J + public fun getSecondsForStallTimer ()J + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class com/amplifyframework/storage/ProgressStallTimeoutException : com/amplifyframework/AmplifyException { +} + public final class com/amplifyframework/storage/StorageAccessLevel : java/lang/Enum { public static final field PRIVATE Lcom/amplifyframework/storage/StorageAccessLevel; public static final field PROTECTED Lcom/amplifyframework/storage/StorageAccessLevel; diff --git a/core/src/main/java/com/amplifyframework/storage/ProgressStallTimeout.kt b/core/src/main/java/com/amplifyframework/storage/ProgressStallTimeout.kt new file mode 100644 index 0000000000..512e3fd70e --- /dev/null +++ b/core/src/main/java/com/amplifyframework/storage/ProgressStallTimeout.kt @@ -0,0 +1,70 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.storage + +/** + * Strategy for cancelling uploads when progress stops advancing. + * + * Configure a default on the storage plugin configuration and optionally override per upload via the + * upload options (`StorageUploadFileOptions` / `StorageUploadInputStreamOptions` implementations). + * + * When enabled, the upload is cancelled and the `onError` callback receives a [StorageException] + * whose `cause` is a [ProgressStallTimeoutException] if progress does not advance within the + * configured interval. The default is [Disabled], which preserves existing behavior. + */ +sealed class ProgressStallTimeout { + + /** + * Duration in seconds used by the stall timer, or `0` when disabled. + */ + abstract val secondsForStallTimer: Long + + /** + * Do not cancel uploads when progress stalls. + * + * Named `Disabled` (not `None`) so that, when used as a nullable option, it does not collide + * with "option not supplied, defer to plugin default" semantics. + */ + object Disabled : ProgressStallTimeout() { + override val secondsForStallTimer: Long = 0 + } + + /** + * Cancel the upload if progress does not advance within this interval. + * + * Values of `0` or less are treated the same as [Disabled] — the stall timer is not started. + * + * @property seconds Stall interval in seconds. + */ + data class Interval(val seconds: Long) : ProgressStallTimeout() { + override val secondsForStallTimer: Long = if (seconds > 0) seconds else 0 + } + + companion object { + /** + * Factory for [Disabled], convenient for Java consumers. + */ + @JvmStatic + fun disabled(): ProgressStallTimeout = Disabled + + /** + * Factory for [Interval], convenient for Java consumers. + * + * @param seconds Stall interval in seconds. + */ + @JvmStatic + fun interval(seconds: Long): ProgressStallTimeout = Interval(seconds) + } +} diff --git a/core/src/main/java/com/amplifyframework/storage/ProgressStallTimeoutException.kt b/core/src/main/java/com/amplifyframework/storage/ProgressStallTimeoutException.kt new file mode 100644 index 0000000000..962a6c557b --- /dev/null +++ b/core/src/main/java/com/amplifyframework/storage/ProgressStallTimeoutException.kt @@ -0,0 +1,39 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amplifyframework.storage + +import com.amplifyframework.AmplifyException +import com.amplifyframework.annotations.InternalAmplifyApi + +/** + * Exception thrown when an upload is cancelled because progress did not advance within the + * configured [ProgressStallTimeout]. + * + * This exception is surfaced as the `cause` of a [StorageException] delivered to the upload + * operation's `onError` consumer: + * + * ```kotlin + * // Detect a stall error from an upload callback + * if (storageException.cause is ProgressStallTimeoutException) { + * // Show "weak connection" UI, retry, etc. + * } + * ``` + */ +class ProgressStallTimeoutException @InternalAmplifyApi constructor( + message: String, + recoverySuggestion: String +) : AmplifyException(message, recoverySuggestion) { + @InternalAmplifyApi companion object +} diff --git a/core/src/test/java/com/amplifyframework/storage/ProgressStallTimeoutExceptionTest.kt b/core/src/test/java/com/amplifyframework/storage/ProgressStallTimeoutExceptionTest.kt new file mode 100644 index 0000000000..e4b86c4f02 --- /dev/null +++ b/core/src/test/java/com/amplifyframework/storage/ProgressStallTimeoutExceptionTest.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amplifyframework.storage + +import com.amplifyframework.AmplifyException +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import org.junit.Test + +class ProgressStallTimeoutExceptionTest { + + /** + * The exception must surface its message and recovery suggestion verbatim so callers using + * `storageException.cause as ProgressStallTimeoutException` can render both fields without + * any unwrapping. + * + * - Given: a [ProgressStallTimeoutException] constructed with explicit text + * - When: `message` and `recoverySuggestion` are read + * - Then: both fields equal the constructor inputs + */ + @Test + fun `message and recovery suggestion round trip through constructor`() { + val exception = ProgressStallTimeoutException( + "Upload cancelled due to progress stall timeout.", + "Increase the configured progress stall timeout or verify the network conditions, then retry the upload." + ) + + exception.message shouldBe "Upload cancelled due to progress stall timeout." + exception.recoverySuggestion shouldBe + "Increase the configured progress stall timeout or verify the network conditions, then retry the upload." + } + + /** + * The exception must inherit from [AmplifyException] so that callers handling generic Amplify + * failures will also catch a stall failure when they walk the cause chain. + * + * - Given: a [ProgressStallTimeoutException] + * - When: it is checked against [AmplifyException] + * - Then: it is a subtype of [AmplifyException] + */ + @Test + fun `extends AmplifyException so generic catches still match`() { + val exception = ProgressStallTimeoutException("stall", "retry") + + exception.shouldBeInstanceOf() + } + + /** + * The companion object is annotated `@InternalAmplifyApi` and is the anchor for plugin-side + * factory extensions defined in `aws-storage-s3`. Confirming that it is accessible (and not + * private) keeps the contract for those extension declarations. + * + * - Given: the [ProgressStallTimeoutException.Companion] + * - When: it is referenced + * - Then: the reference is non-null + */ + @Test + fun `companion object is exposed for plugin extensions`() { + val companion = ProgressStallTimeoutException.Companion + + @Suppress("USELESS_IS_CHECK") // safety net if the companion type ever changes + (companion is ProgressStallTimeoutException.Companion) shouldBe true + } +} diff --git a/core/src/test/java/com/amplifyframework/storage/ProgressStallTimeoutTest.kt b/core/src/test/java/com/amplifyframework/storage/ProgressStallTimeoutTest.kt new file mode 100644 index 0000000000..a9c960d9d2 --- /dev/null +++ b/core/src/test/java/com/amplifyframework/storage/ProgressStallTimeoutTest.kt @@ -0,0 +1,153 @@ +/* + * Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amplifyframework.storage + +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeSameInstanceAs +import org.junit.Test + +class ProgressStallTimeoutTest { + + /** + * `ProgressStallTimeout.Disabled` represents the legacy "no stall detection" mode and must + * report a `secondsForStallTimer` of `0` so downstream code skips arming the timer. + * + * - Given: the singleton [ProgressStallTimeout.Disabled] + * - When: `secondsForStallTimer` is read + * - Then: the value is `0` + */ + @Test + fun `Disabled secondsForStallTimer is zero`() { + ProgressStallTimeout.Disabled.secondsForStallTimer shouldBe 0L + } + + /** + * A positive `Interval` must surface its configured seconds so the worker layer arms a timer + * with that exact duration. + * + * - Given: an [ProgressStallTimeout.Interval] of 30 seconds + * - When: `secondsForStallTimer` is read + * - Then: the value is `30` + */ + @Test + fun `Interval with positive seconds returns same value`() { + val timeout = ProgressStallTimeout.Interval(seconds = 30L) + timeout.secondsForStallTimer shouldBe 30L + timeout.seconds shouldBe 30L + } + + /** + * An `Interval` of zero must be normalized to `0` so it behaves as if stall detection were + * disabled. This guards callers that pass `0` as a "no override" sentinel. + * + * - Given: an [ProgressStallTimeout.Interval] of 0 seconds + * - When: `secondsForStallTimer` is read + * - Then: the value is `0` + */ + @Test + fun `Interval with zero seconds disables the stall timer`() { + val timeout = ProgressStallTimeout.Interval(seconds = 0L) + timeout.secondsForStallTimer shouldBe 0L + } + + /** + * A negative `Interval` is treated the same as `Disabled` so a misconfigured value never + * cancels uploads with a near-immediate timer. The original `seconds` is still preserved on + * the value object for diagnostics. + * + * - Given: an [ProgressStallTimeout.Interval] of -5 seconds + * - When: `secondsForStallTimer` is read + * - Then: the value is `0` even though `seconds` itself is negative + */ + @Test + fun `Interval with negative seconds disables the stall timer but preserves raw seconds`() { + val timeout = ProgressStallTimeout.Interval(seconds = -5L) + timeout.secondsForStallTimer shouldBe 0L + timeout.seconds shouldBe -5L + } + + /** + * The Java-friendly `disabled()` factory must return the singleton [ProgressStallTimeout.Disabled] + * so Java callers and Kotlin callers compare as identical references. + * + * - Given: a call to [ProgressStallTimeout.disabled] + * - When: the returned value is compared to [ProgressStallTimeout.Disabled] + * - Then: it is the same singleton instance + */ + @Test + fun `disabled factory returns the Disabled singleton`() { + val factoryValue = ProgressStallTimeout.disabled() + factoryValue shouldBeSameInstanceAs ProgressStallTimeout.Disabled + factoryValue.shouldBeInstanceOf() + } + + /** + * The Java-friendly `interval(seconds)` factory must produce an [ProgressStallTimeout.Interval] + * with the supplied value so Java callers do not need to import the Kotlin data class + * constructor directly. + * + * - Given: a call to [ProgressStallTimeout.interval] with 45 seconds + * - When: the returned value is downcast + * - Then: it is an [ProgressStallTimeout.Interval] of 45 seconds + */ + @Test + fun `interval factory wraps seconds in Interval`() { + val factoryValue = ProgressStallTimeout.interval(45L) + factoryValue.shouldBeInstanceOf() + factoryValue.seconds shouldBe 45L + factoryValue.secondsForStallTimer shouldBe 45L + } + + /** + * `Interval` is a data class, so two intervals with the same `seconds` must compare as equal + * and share a hash code. This matters when diffing equivalent options objects. + * + * - Given: two [ProgressStallTimeout.Interval] values with identical `seconds` + * - When: `equals`/`hashCode` are evaluated + * - Then: the values are equal and have the same hash + */ + @Test + fun `Interval data class equality and hashCode use seconds`() { + val a = ProgressStallTimeout.Interval(seconds = 12L) + val b = ProgressStallTimeout.Interval(seconds = 12L) + val c = ProgressStallTimeout.Interval(seconds = 13L) + + (a == b) shouldBe true + a.hashCode() shouldBe b.hashCode() + (a == c) shouldBe false + } + + /** + * `Disabled` is a distinct subtype from `Interval` and the two must never compare as equal, + * even when `secondsForStallTimer` happens to coincide (both are `0` for `Interval(0)`). + * This prevents callers that branch on the type from being misled by a numerically equal + * value. + * + * - Given: [ProgressStallTimeout.Disabled] and [ProgressStallTimeout.Interval] of 0 seconds + * - When: they are compared with `==` + * - Then: they are not equal even though their `secondsForStallTimer` matches + */ + @Test + fun `Disabled is distinct from Interval of zero seconds`() { + val disabled: ProgressStallTimeout = ProgressStallTimeout.Disabled + val zeroInterval: ProgressStallTimeout = ProgressStallTimeout.Interval(seconds = 0L) + + disabled.secondsForStallTimer shouldBe zeroInterval.secondsForStallTimer + disabled shouldNotBe zeroInterval + } +}