diff --git a/pkg/common/cns-lib/volume/manager.go b/pkg/common/cns-lib/volume/manager.go index b6c4018b73..963fb09d1a 100644 --- a/pkg/common/cns-lib/volume/manager.go +++ b/pkg/common/cns-lib/volume/manager.go @@ -312,6 +312,48 @@ type defaultManager struct { clusterDistribution string } +// volumeOperationTimeout is the duration used for the fresh context when +// persisting final operation details. Defined once to avoid repeated +// int-to-Duration conversions at every defer site. +var volumeOperationTimeout = VolumeOperationTimeoutInSeconds * time.Second + +// persistVolumeOperationDetails persists the final state of a volume +// operation to the CnsVolumeOperationRequest CR. It uses a fresh context +// so the persist succeeds even if the original CSI operation context has +// expired. When quotaFSSEnabled is true and the operation completed with +// Success (or Error on a retry), the quota reservation is released. +func (m *defaultManager) persistVolumeOperationDetails( + details *cnsvolumeoperationrequest.VolumeOperationRequestDetails, + isRetry, quotaFSSEnabled bool, + operationName string, +) error { + if details == nil || details.OperationDetails == nil || + details.OperationDetails.TaskStatus == taskInvocationStatusInProgress { + return nil + } + + persistCtx, persistCancel := context.WithTimeout(context.Background(), volumeOperationTimeout) + defer persistCancel() + persistCtx = logger.NewContextWithLogger(persistCtx) + log := logger.GetLogger(persistCtx) + + if quotaFSSEnabled && m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload { + taskStatus := details.OperationDetails.TaskStatus + if details.QuotaDetails != nil && + (taskStatus == taskInvocationStatusSuccess || (taskStatus == taskInvocationStatusError && isRetry)) { + details.QuotaDetails.Reserved = resource.NewQuantity(0, resource.BinarySI) + log.Infof("Setting the reserved field for VolumeOperationDetails instance %s to 0", + details.Name) + } + } + + if err := m.operationStore.StoreRequestDetails(persistCtx, details); err != nil { + log.Warnf("failed to store %s details with error: %v", operationName, err) + return err + } + return nil +} + // ClearTaskInfoObjects is a go routine which runs in the background to clean // up expired taskInfo objects from volumeTaskMap. func ClearTaskInfoObjects() { @@ -427,6 +469,11 @@ func (m *defaultManager) MonitorCreateVolumeTask(ctx context.Context, log.Errorf("failed to query CNS for volume %s with error: %v. Cannot "+ "determine if CreateVolume task %s was successful.", volNameFromInputSpec, queryAllVolumeErr, task.Reference().Value) + *volumeOperationDetails = createRequestDetails(volNameFromInputSpec, "", "", 0, + (*volumeOperationDetails).QuotaDetails, + (*volumeOperationDetails).OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, vCenterServerForVolumeOperationCR, + (*volumeOperationDetails).OperationDetails.TaskID, taskInvocationStatusError, err.Error()) return nil, ExtractFaultTypeFromErr(ctx, err), err } if len(queryResult.Volumes) > 0 { @@ -476,14 +523,24 @@ func (m *defaultManager) MonitorCreateVolumeTask(ctx context.Context, log.Infof("CreateVolume: VolumeName: %q, opId: %q", volNameFromInputSpec, taskInfo.ActivationId) taskResult, err := getTaskResultFromTaskInfo(ctx, taskInfo) if taskResult == nil { - return nil, csifault.CSITaskResultEmptyFault, - logger.LogNewErrorf(log, "taskResult is empty for CreateVolume task: %q, opID: %q", - taskInfo.Task.Value, taskInfo.ActivationId) + errMsg := fmt.Sprintf("taskResult is empty for CreateVolume task: %q, opID: %q", + taskInfo.Task.Value, taskInfo.ActivationId) + *volumeOperationDetails = createRequestDetails(volNameFromInputSpec, "", "", 0, + (*volumeOperationDetails).QuotaDetails, + (*volumeOperationDetails).OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, vCenterServerForVolumeOperationCR, taskInfo.ActivationId, + taskInvocationStatusError, errMsg) + return nil, csifault.CSITaskResultEmptyFault, logger.LogNewError(log, errMsg) } if err != nil { log.Errorf("failed to get task result for task %s and volume name %s with error: %v", task.Reference().Value, volNameFromInputSpec, err) faultType = ExtractFaultTypeFromErr(ctx, err) + *volumeOperationDetails = createRequestDetails(volNameFromInputSpec, "", "", 0, + (*volumeOperationDetails).QuotaDetails, + (*volumeOperationDetails).OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, vCenterServerForVolumeOperationCR, taskInfo.ActivationId, + taskInvocationStatusError, err.Error()) return nil, faultType, err } volumeOperationRes := taskResult.GetCnsVolumeOperationResult() @@ -589,6 +646,7 @@ func (m *defaultManager) createVolumeWithImprovedIdempotency(ctx context.Context // Determine if CNS CreateVolume needs to be invoked. volumeOperationDetails, finalErr = m.operationStore.GetRequestDetails(ctx, volNameFromInputSpec) + isRetry := cnsvolumeoperationrequest.IsRetryAttempt(volumeOperationDetails, finalErr) switch { case finalErr == nil: if volumeOperationDetails.OperationDetails != nil { @@ -622,33 +680,10 @@ func (m *defaultManager) createVolumeWithImprovedIdempotency(ctx context.Context return nil, csifault.CSIInternalFault, finalErr } defer func() { - // Persist the operation details before returning. Only success or error - // needs to be stored as InProgress details are stored when the task is - // created on CNS. - if volumeOperationDetails != nil && volumeOperationDetails.OperationDetails != nil && - volumeOperationDetails.OperationDetails.TaskStatus != taskInvocationStatusInProgress { - - if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && isPodVMOnStretchSupervisorFSSEnabled { - // Decrease the reserved field in QuotaDetails when the CreateVolume task is - // successful or has errored out. - taskStatus := volumeOperationDetails.OperationDetails.TaskStatus - if (taskStatus == taskInvocationStatusSuccess || taskStatus == taskInvocationStatusError) && - volumeOperationDetails.QuotaDetails != nil { - volumeOperationDetails.QuotaDetails.Reserved = resource.NewQuantity(0, - resource.BinarySI) - log.Infof("Setting the reserved field for VolumeOperationDetails instance %s to 0", - volumeOperationDetails.Name) - } - tempErr := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails) - if finalErr == nil && tempErr != nil { - log.Errorf("failed to store CreateVolume details with error: %v", tempErr) - finalErr = tempErr - } - } else { - err := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails) - if err != nil { - log.Warnf("failed to store CreateVolume details with error: %v", err) - } + if tempErr := m.persistVolumeOperationDetails( + volumeOperationDetails, isRetry, isPodVMOnStretchSupervisorFSSEnabled, "CreateVolume"); tempErr != nil { + if finalErr == nil { + finalErr = tempErr } } }() @@ -744,37 +779,15 @@ func (m *defaultManager) createVolumeWithTransaction(ctx context.Context, spec * } volumeOperationDetails, finalErr = m.operationStore.GetRequestDetails(ctx, volNameFromInputSpec) + isRetry := cnsvolumeoperationrequest.IsRetryAttempt(volumeOperationDetails, finalErr) if finalErr != nil && !apierrors.IsNotFound(finalErr) { return nil, csifault.CSIInternalFault, finalErr } defer func() { - // Persist the operation details before returning. Only success or error - // needs to be stored as InProgress details are stored when the task is - // created on CNS. - if volumeOperationDetails != nil && volumeOperationDetails.OperationDetails != nil && - volumeOperationDetails.OperationDetails.TaskStatus != taskInvocationStatusInProgress { - - if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && isPodVMOnStretchSupervisorFSSEnabled { - // Decrease the reserved field in QuotaDetails when the CreateVolume task is - // successful or has errored out. - taskStatus := volumeOperationDetails.OperationDetails.TaskStatus - if (taskStatus == taskInvocationStatusSuccess || taskStatus == taskInvocationStatusError) && - volumeOperationDetails.QuotaDetails != nil { - volumeOperationDetails.QuotaDetails.Reserved = resource.NewQuantity(0, - resource.BinarySI) - log.Infof("Setting the reserved field for VolumeOperationDetails instance %s to 0", - volumeOperationDetails.Name) - } - tempErr := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails) - if finalErr == nil && tempErr != nil { - log.Errorf("failed to store CreateVolume details with error: %v", tempErr) - finalErr = tempErr - } - } else { - err := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails) - if err != nil { - log.Warnf("failed to store CreateVolume details with error: %v", err) - } + if tempErr := m.persistVolumeOperationDetails( + volumeOperationDetails, isRetry, isPodVMOnStretchSupervisorFSSEnabled, "CreateVolume"); tempErr != nil { + if finalErr == nil { + finalErr = tempErr } } }() @@ -1541,15 +1554,8 @@ func (m *defaultManager) deleteVolumeWithImprovedIdempotency(ctx context.Context } defer func() { - // Persist the operation details before returning. Only success or error - // needs to be stored as InProgress details are stored when the task is - // created on CNS. - if volumeOperationDetails != nil && volumeOperationDetails.OperationDetails != nil && - volumeOperationDetails.OperationDetails.TaskStatus != taskInvocationStatusInProgress { - err := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails) - if err != nil { - log.Warnf("failed to store DeleteVolume operation details with error: %v", err) - } + if tempErr := m.persistVolumeOperationDetails(volumeOperationDetails, false, false, "DeleteVolume"); tempErr != nil { + log.Warnf("failed to persist DeleteVolume operation details: %v", tempErr) } }() @@ -1612,8 +1618,14 @@ func (m *defaultManager) deleteVolumeWithImprovedIdempotency(ctx context.Context nil, metav1.Now(), task.Reference().Value, "", "", taskInvocationStatusError, msg) return faultType, logger.LogNewError(log, msg) } + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, + nil, volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", "", taskInvocationStatusError, err.Error()) } else { faultType = csifault.CSITaskInfoEmptyFault + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, + nil, volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", "", taskInvocationStatusError, "taskInfo is nil") } return faultType, err } @@ -1622,14 +1634,22 @@ func (m *defaultManager) deleteVolumeWithImprovedIdempotency(ctx context.Context // Get the task results for the given task. taskResult, err := getTaskResultFromTaskInfo(ctx, taskInfo) if taskResult == nil { - return csifault.CSITaskResultEmptyFault, - logger.LogNewErrorf(log, "taskResult is empty for DeleteVolume task: %q, opID: %q", - taskInfo.Task.Value, taskInfo.ActivationId) + errMsg := fmt.Sprintf("taskResult is empty for DeleteVolume task: %q, opID: %q", + taskInfo.Task.Value, taskInfo.ActivationId) + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, + nil, volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", taskInfo.ActivationId, + taskInvocationStatusError, errMsg) + return csifault.CSITaskResultEmptyFault, logger.LogNewError(log, errMsg) } if err != nil { log.Errorf("unable to find DeleteVolume task result from vCenter %q with taskID %s and deleteResults %v", m.virtualCenter.Config.Host, taskInfo.Task.Value, taskResult) faultType = ExtractFaultTypeFromErr(ctx, err) + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, + nil, volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", taskInfo.ActivationId, + taskInvocationStatusError, err.Error()) return faultType, err } @@ -2036,6 +2056,7 @@ func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context } volumeOperationDetails, finalErr = m.operationStore.GetRequestDetails(ctx, instanceName) + isRetry := cnsvolumeoperationrequest.IsRetryAttempt(volumeOperationDetails, finalErr) switch { case finalErr == nil: if volumeOperationDetails.OperationDetails != nil { @@ -2064,31 +2085,10 @@ func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context return csifault.CSIInternalFault, finalErr } defer func() { - // Persist the operation details before returning. - if volumeOperationDetails != nil && volumeOperationDetails.OperationDetails != nil && - volumeOperationDetails.OperationDetails.TaskStatus != taskInvocationStatusInProgress { - - if m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload && isPodVMOnStretchSupervisorFSSEnabled { - taskStatus := volumeOperationDetails.OperationDetails.TaskStatus - // Decrease the reserved field in QuotaDetails when the ExpandVolume task is - // successful or has errored out. - if (taskStatus == taskInvocationStatusSuccess || taskStatus == taskInvocationStatusError) && - volumeOperationDetails.QuotaDetails != nil { - volumeOperationDetails.QuotaDetails.Reserved = resource.NewQuantity(0, - resource.BinarySI) - log.Infof("Setting the reserved field for VolumeOperationDetails instance %s to 0", - volumeOperationDetails.Name) - } - tempErr := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails) - if finalErr == nil && tempErr != nil { - log.Errorf("failed to store ExpandVolume details with error: %v", tempErr) - finalErr = tempErr - } - } else { - err := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails) - if err != nil { - log.Warnf("failed to store ExpandVolume details with error: %v", err) - } + if tempErr := m.persistVolumeOperationDetails( + volumeOperationDetails, isRetry, isPodVMOnStretchSupervisorFSSEnabled, "ExpandVolume"); tempErr != nil { + if finalErr == nil { + finalErr = tempErr } } }() @@ -2173,14 +2173,24 @@ func (m *defaultManager) expandVolumeWithImprovedIdempotency(ctx context.Context var taskResult cnstypes.BaseCnsVolumeOperationResult taskResult, finalErr = getTaskResultFromTaskInfo(ctx, taskInfo) if taskResult == nil { - return csifault.CSITaskResultEmptyFault, - logger.LogNewErrorf(log, "taskResult is empty for ExpandVolume task: %q, opID: %q", - taskInfo.Task.Value, taskInfo.ActivationId) + errMsg := fmt.Sprintf("taskResult is empty for ExpandVolume task: %q, opID: %q", + taskInfo.Task.Value, taskInfo.ActivationId) + volumeOperationDetails = createRequestDetails(instanceName, "", "", + volumeOperationDetails.Capacity, quotaInfo, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", taskInfo.ActivationId, + taskInvocationStatusError, errMsg) + return csifault.CSITaskResultEmptyFault, logger.LogNewError(log, errMsg) } if finalErr != nil { log.Errorf("failed to get task result for task %s and volume ID %s with error: %v", task.Reference().Value, volumeID, finalErr) faultType = ExtractFaultTypeFromErr(ctx, finalErr) + volumeOperationDetails = createRequestDetails(instanceName, "", "", + volumeOperationDetails.Capacity, quotaInfo, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + task.Reference().Value, "", taskInfo.ActivationId, + taskInvocationStatusError, finalErr.Error()) return faultType, finalErr } @@ -2698,6 +2708,7 @@ func (m *defaultManager) createSnapshotWithImprovedIdempotencyCheck(ctx context. err error quotaInfo *cnsvolumeoperationrequest.QuotaDetails isStorageQuotaM2FSSEnabled bool + isRetry bool ) if extraParams != nil { createSnapParams, ok := extraParams.(*CreateSnapshotExtraParams) @@ -2723,6 +2734,7 @@ func (m *defaultManager) createSnapshotWithImprovedIdempotencyCheck(ctx context. } volumeOperationDetails, err = m.operationStore.GetRequestDetails(ctx, instanceName) + isRetry = cnsvolumeoperationrequest.IsRetryAttempt(volumeOperationDetails, err) switch { case err == nil: // Validate if previous operation was successful. @@ -2783,23 +2795,10 @@ func (m *defaultManager) createSnapshotWithImprovedIdempotencyCheck(ctx context. } defer func() { - // Persist the operation details before returning if the improved idempotency is enabled. Only success or error - // needs to be stored as InProgress details are stored when the task is created on CNS. - if m.idempotencyHandlingEnabled && - volumeOperationDetails != nil && volumeOperationDetails.OperationDetails != nil && - volumeOperationDetails.OperationDetails.TaskStatus != taskInvocationStatusInProgress { - taskStatus := volumeOperationDetails.OperationDetails.TaskStatus - if isStorageQuotaM2FSSEnabled && m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload { - if (taskStatus == taskInvocationStatusSuccess || taskStatus == taskInvocationStatusError) && - volumeOperationDetails.QuotaDetails != nil { - volumeOperationDetails.QuotaDetails.Reserved = resource.NewQuantity(0, - resource.BinarySI) - log.Infof("Setting the reserved field for VolumeOperationDetails instance %s to 0", - volumeOperationDetails.Name) - } - } - if err := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails); err != nil { - log.Warnf("failed to store CreateSnapshot details with error: %v", err) + if m.idempotencyHandlingEnabled { + if tempErr := m.persistVolumeOperationDetails( + volumeOperationDetails, isRetry, isStorageQuotaM2FSSEnabled, "CreateSnapshot"); tempErr != nil { + log.Warnf("failed to persist CreateSnapshot operation details: %v", tempErr) } } }() @@ -2896,6 +2895,11 @@ func (m *defaultManager) createSnapshotWithImprovedIdempotencyCheck(ctx context. return nil, logger.LogNewError(log, errMsg) } } + if m.idempotencyHandlingEnabled { + volumeOperationDetails = createRequestDetails(instanceName, volumeID, "", 0, quotaInfo, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + createSnapshotsTask.Reference().Value, "", "", taskInvocationStatusError, err.Error()) + } return nil, logger.LogNewErrorf(log, "Failed to get taskInfo for CreateSnapshots task "+ "from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) } @@ -2904,6 +2908,19 @@ func (m *defaultManager) createSnapshotWithImprovedIdempotencyCheck(ctx context. // Get the taskResult createSnapshotsTaskResult, err := cns.GetTaskResult(ctx, createSnapshotsTaskInfo) if err != nil || createSnapshotsTaskResult == nil { + if m.idempotencyHandlingEnabled { + errMsg := fmt.Sprintf("unable to find the task result for CreateSnapshots task "+ + "from vCenter %q. taskID: %q, opId: %q createResults: %+v", + m.virtualCenter.Config.Host, createSnapshotsTaskInfo.Task.Value, + createSnapshotsTaskInfo.ActivationId, createSnapshotsTaskResult) + if err != nil { + errMsg = err.Error() + } + volumeOperationDetails = createRequestDetails(instanceName, volumeID, "", 0, quotaInfo, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + createSnapshotsTask.Reference().Value, "", createSnapshotsTaskInfo.ActivationId, + taskInvocationStatusError, errMsg) + } return nil, logger.LogNewErrorf(log, "unable to find the task result for CreateSnapshots task "+ "from vCenter %q. taskID: %q, opId: %q createResults: %+v", m.virtualCenter.Config.Host, createSnapshotsTaskInfo.Task.Value, createSnapshotsTaskInfo.ActivationId, @@ -3055,27 +3072,14 @@ func (m *defaultManager) createSnapshotWithTransaction(ctx context.Context, volu return nil, csifault.CSIInternalFault, logger.LogNewError(log, "operation store cannot be nil") } volumeOperationDetails, err = m.operationStore.GetRequestDetails(ctx, instanceName) + isRetry := cnsvolumeoperationrequest.IsRetryAttempt(volumeOperationDetails, err) if err != nil && !apierrors.IsNotFound(err) { return nil, csifault.CSIInternalFault, err } defer func() { - // Persist the operation details before returning if the improved idempotency is enabled. Only success or error - // needs to be stored as InProgress details are stored when the task is created on CNS. - if volumeOperationDetails != nil && volumeOperationDetails.OperationDetails != nil && - volumeOperationDetails.OperationDetails.TaskStatus != taskInvocationStatusInProgress { - taskStatus := volumeOperationDetails.OperationDetails.TaskStatus - if isStorageQuotaM2FSSEnabled && m.clusterFlavor == cnstypes.CnsClusterFlavorWorkload { - if (taskStatus == taskInvocationStatusSuccess || taskStatus == taskInvocationStatusError) && - volumeOperationDetails.QuotaDetails != nil { - volumeOperationDetails.QuotaDetails.Reserved = resource.NewQuantity(0, - resource.BinarySI) - log.Infof("Setting the reserved field for VolumeOperationDetails instance %s to 0", - volumeOperationDetails.Name) - } - } - if err := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails); err != nil { - log.Warnf("failed to store CreateSnapshot details with error: %v", err) - } + if tempErr := m.persistVolumeOperationDetails( + volumeOperationDetails, isRetry, isStorageQuotaM2FSSEnabled, "CreateSnapshot"); tempErr != nil { + log.Warnf("failed to persist CreateSnapshot operation details: %v", tempErr) } }() volumeOperationDetails = createRequestDetails(instanceName, volumeID, "", 0, quotaInfo, @@ -3121,13 +3125,27 @@ func (m *defaultManager) createSnapshotWithTransaction(ctx context.Context, volu log.Infof("CreateSnapshots: VolumeID: %q, opId: %q", volumeID, createSnapshotsTaskInfo.ActivationId) createSnapshotsTaskResult, err := cns.GetTaskResult(ctx, createSnapshotsTaskInfo) if err != nil || createSnapshotsTaskResult == nil { + errMsg := fmt.Sprintf("unable to find the task result for CreateSnapshots task: %q "+ + "from vCenter %q with err: %v", createSnapshotsTaskInfo.Task.Value, m.virtualCenter.Config.Host, err) + if err != nil { + errMsg = err.Error() + } + volumeOperationDetails = createRequestDetails(instanceName, volumeID, "", 0, quotaInfo, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + createSnapshotsTask.Reference().Value, "", createSnapshotsTaskInfo.ActivationId, + taskInvocationStatusError, errMsg) return nil, "", logger.LogNewErrorf(log, "unable to find the task result for CreateSnapshots task: %q "+ "from vCenter %q with err: %v", createSnapshotsTaskInfo.Task.Value, m.virtualCenter.Config.Host, err) } snapshotCreateResult, ok := createSnapshotsTaskResult.(*cnstypes.CnsSnapshotCreateResult) if !ok || snapshotCreateResult == nil { - return nil, "", logger.LogNewErrorf(log, - "invalid task result: got %T with value %+v", createSnapshotsTaskResult, createSnapshotsTaskResult) + errMsg := fmt.Sprintf("invalid task result: got %T with value %+v", + createSnapshotsTaskResult, createSnapshotsTaskResult) + volumeOperationDetails = createRequestDetails(instanceName, volumeID, "", 0, quotaInfo, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + createSnapshotsTask.Reference().Value, "", createSnapshotsTaskInfo.ActivationId, + taskInvocationStatusError, errMsg) + return nil, "", logger.LogNewError(log, errMsg) } if snapshotCreateResult.Fault != nil { // Check for CnsNotRegisteredFault and attempt to re-register the volume @@ -3325,14 +3343,10 @@ func (m *defaultManager) deleteSnapshotWithImprovedIdempotencyCheck( } defer func() { - // Persist the operation details before returning. Only success or error - // needs to be stored as InProgress details are stored when the task is - // created on CNS. - if m.idempotencyHandlingEnabled && - volumeOperationDetails != nil && volumeOperationDetails.OperationDetails != nil && - volumeOperationDetails.OperationDetails.TaskStatus != taskInvocationStatusInProgress { - if err := m.operationStore.StoreRequestDetails(ctx, volumeOperationDetails); err != nil { - log.Warnf("failed to store DeleteSnapshot operation details with error: %v", err) + if m.idempotencyHandlingEnabled { + if tempErr := m.persistVolumeOperationDetails(volumeOperationDetails, false, false, + "DeleteSnapshot"); tempErr != nil { + log.Warnf("failed to persist DeleteSnapshot operation details: %v", tempErr) } } }() @@ -3452,11 +3466,25 @@ func (m *defaultManager) deleteSnapshotWithImprovedIdempotencyCheck( // Get the taskResult deleteSnapshotsTaskResult, err := getTaskResultFromTaskInfo(ctx, deleteSnapshotsTaskInfo) if err != nil { + if m.idempotencyHandlingEnabled { + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, nil, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + deleteSnapshotTask.Reference().Value, "", deleteSnapshotsTaskInfo.ActivationId, + taskInvocationStatusError, err.Error()) + } return nil, logger.LogNewErrorf(log, "failed to get the task result for DeleteSnapshots task "+ "from vCenter %q. taskID: %q, opId: %q createResults: %+v", m.virtualCenter.Config.Host, deleteSnapshotsTaskInfo.Task.Value, deleteSnapshotsTaskInfo.ActivationId, deleteSnapshotsTaskResult) } if deleteSnapshotsTaskResult == nil { + if m.idempotencyHandlingEnabled { + errMsg := fmt.Sprintf("task result is empty for DeleteSnapshot task: %q, opID: %q", + deleteSnapshotsTaskInfo.Task.Value, deleteSnapshotsTaskInfo.ActivationId) + volumeOperationDetails = createRequestDetails(instanceName, "", "", 0, nil, + volumeOperationDetails.OperationDetails.TaskInvocationTimestamp, + deleteSnapshotTask.Reference().Value, "", deleteSnapshotsTaskInfo.ActivationId, + taskInvocationStatusError, errMsg) + } return nil, logger.LogNewErrorf(log, "task result is empty for DeleteSnapshot task: %q, opID: %q", deleteSnapshotsTaskInfo.Task.Value, deleteSnapshotsTaskInfo.ActivationId) } diff --git a/pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest.go b/pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest.go index adaba7c8ae..129bb3fc6e 100644 --- a/pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest.go +++ b/pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest.go @@ -26,6 +26,7 @@ import ( "github.com/davecgh/go-spew/spew" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -381,14 +382,24 @@ func (or *operationRequestStore) DeleteRequestDetails(ctx context.Context, name return nil } +// staleInProgressTaskThreshold is the duration after which an InProgress task +// is considered orphaned and will be force-transitioned to Error. This handles +// cases where the CSI operation context expired before the final status could +// be persisted, leaving the CR stuck with an InProgress entry that prevents +// cleanup and leaks quota reservations. +const staleInProgressTaskThreshold = 48 * time.Hour + // cleanupStaleInstances cleans up CnsVolumeOperationRequest instances -// with latest TaskInvocationTimestamp older than 15 minutes +// with latest TaskInvocationTimestamp older than 15 minutes. +// It also force-transitions InProgress tasks older than staleInProgressTaskThreshold +// to Error so they become eligible for cleanup on the next cycle. func (or *operationRequestStore) cleanupStaleInstances(cleanupInterval int) { ticker := time.NewTicker(time.Duration(cleanupInterval) * time.Minute) ctx, log := logger.GetNewContextWithLogger() log.Infof("CnsVolumeOperationRequest clean up interval is set to %d minutes", cleanupInterval) for ; true; <-ticker.C { cutoffTime := time.Now().Add(-15 * time.Minute) + staleInProgressCutoff := time.Now().Add(-staleInProgressTaskThreshold) continueToken := "" log.Infof("Cleaning up stale CnsVolumeOperationRequest instances.") for { @@ -405,16 +416,23 @@ func (or *operationRequestStore) cleanupStaleInstances(cleanupInterval int) { } for _, instance := range cnsVolumeOperationRequestList.Items { latestOperationDetailsLength := len(instance.Status.LatestOperationDetails) - // Skip if task is still in progress - if latestOperationDetailsLength != 0 && - instance.Status.LatestOperationDetails[latestOperationDetailsLength-1].TaskStatus == - TaskInvocationStatusInProgress { + if latestOperationDetailsLength == 0 { + continue + } + latestOp := &instance.Status.LatestOperationDetails[latestOperationDetailsLength-1] + + if latestOp.TaskStatus == TaskInvocationStatusInProgress { + if latestOp.TaskInvocationTimestamp.Time.Before(staleInProgressCutoff) { + log.Infof("CnsVolumeOperationRequest instance %q has stale InProgress task %q "+ + "(invoked at %v, older than %v). Transitioning to Error.", + instance.Name, latestOp.TaskID, + latestOp.TaskInvocationTimestamp.Time, staleInProgressTaskThreshold) + or.forceTransitionStaleInProgressToError(ctx, &instance) + } continue } - // Delete instance if TaskInvocationTimestamp is older than 15 minutes - if latestOperationDetailsLength != 0 && - instance.Status.LatestOperationDetails[latestOperationDetailsLength-1]. - TaskInvocationTimestamp.Time.After(cutoffTime) { + + if latestOp.TaskInvocationTimestamp.Time.After(cutoffTime) { log.Debugf("CnsVolumeOperationRequest instance %q is skipped for deletion", instance.Name) continue } @@ -439,6 +457,52 @@ func (or *operationRequestStore) cleanupStaleInstances(cleanupInterval int) { } } +// forceTransitionStaleInProgressToError marks all InProgress entries in the +// given CnsVolumeOperationRequest as Error and proactively zeroes the quota +// reservation. This allows the normal cleanup cycle to delete the CR on its +// next pass without leaving leaked reservations. +func (or *operationRequestStore) forceTransitionStaleInProgressToError( + ctx context.Context, + instance *cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequest, +) { + log := logger.GetLogger(ctx) + updated := instance.DeepCopy() + modified := false + orphanMsg := fmt.Sprintf("task orphaned: still InProgress after %s", staleInProgressTaskThreshold) + + for i := range updated.Status.LatestOperationDetails { + if updated.Status.LatestOperationDetails[i].TaskStatus == TaskInvocationStatusInProgress { + updated.Status.LatestOperationDetails[i].TaskStatus = TaskInvocationStatusError + updated.Status.LatestOperationDetails[i].Error = orphanMsg + modified = true + } + } + if updated.Status.FirstOperationDetails.TaskStatus == TaskInvocationStatusInProgress { + updated.Status.FirstOperationDetails.TaskStatus = TaskInvocationStatusError + updated.Status.FirstOperationDetails.Error = orphanMsg + modified = true + } + + if !modified { + return + } + + if isPodVMOnStretchSupervisorFSSEnabled && updated.Status.StorageQuotaDetails != nil && + updated.Status.StorageQuotaDetails.Reserved != nil { + log.Infof("Zeroing quota reservation for orphaned CnsVolumeOperationRequest %s/%s "+ + "(was %s)", instance.Namespace, instance.Name, updated.Status.StorageQuotaDetails.Reserved.String()) + updated.Status.StorageQuotaDetails.Reserved = resource.NewQuantity(0, resource.BinarySI) + } + + if err := or.k8sclient.Update(ctx, updated); err != nil { + log.Errorf("failed to force-transition stale InProgress tasks to Error for "+ + "CnsVolumeOperationRequest %s/%s: %v", instance.Namespace, instance.Name, err) + } else { + log.Infof("Successfully transitioned stale InProgress tasks to Error for "+ + "CnsVolumeOperationRequest %s/%s", instance.Namespace, instance.Name) + } +} + // SetCSITransactionSupport sets the CSI Transaction Support feature flag. // This function allows runtime modification of the isCSITransactionSupportEnabled variable. func SetCSITransactionSupport(enabled bool) { diff --git a/pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest_test.go b/pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest_test.go index 7ac676b0e5..0ed04f33ef 100644 --- a/pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest_test.go +++ b/pkg/internalapis/cnsvolumeoperationrequest/cnsvolumeoperationrequest_test.go @@ -2,7 +2,9 @@ package cnsvolumeoperationrequest import ( "context" + "fmt" "testing" + "time" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,6 +17,7 @@ import ( // setupTestEnvironment creates a test environment with fake clients func setupTestEnvironment(t *testing.T, csiTransactionEnabled bool) (*operationRequestStore, context.Context) { + t.Helper() ctx := context.Background() scheme := runtime.NewScheme() @@ -71,6 +74,70 @@ func getCRDDirectly(ctx context.Context, store *operationRequestStore, return instance, err } +// mustStoreRequestDetails stores operation details and fails the test on error +func mustStoreRequestDetails(t *testing.T, store *operationRequestStore, ctx context.Context, + details *VolumeOperationRequestDetails) { + t.Helper() + if err := store.StoreRequestDetails(ctx, details); err != nil { + t.Fatalf("Failed to store request details: %v", err) + } +} + +// mustGetRequestDetails retrieves operation details and fails the test on error +func mustGetRequestDetails(t *testing.T, store *operationRequestStore, ctx context.Context, + name string) *VolumeOperationRequestDetails { + t.Helper() + details, err := store.GetRequestDetails(ctx, name) + if err != nil { + t.Fatalf("Failed to get request details for %s: %v", name, err) + } + return details +} + +// mustGetCRDDirectly retrieves CRD and fails the test on error +func mustGetCRDDirectly(t *testing.T, store *operationRequestStore, ctx context.Context, + instanceName string) *cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequest { + t.Helper() + crd, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD %s: %v", instanceName, err) + } + return crd +} + +// assertTaskStatus verifies the task status matches expected value +func assertTaskStatus(t *testing.T, details *VolumeOperationRequestDetails, expected string) { + t.Helper() + if details.OperationDetails.TaskStatus != expected { + t.Errorf("Expected TaskStatus %s, got %s", expected, details.OperationDetails.TaskStatus) + } +} + +// assertQuotaReservation verifies the quota reservation value +func assertQuotaReservation(t *testing.T, crd *cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequest, + expectedBytes int64) { + t.Helper() + if crd.Status.StorageQuotaDetails == nil { + if expectedBytes != 0 { + t.Fatal("Expected StorageQuotaDetails to exist") + } + return + } + actualBytes := crd.Status.StorageQuotaDetails.Reserved.Value() + if actualBytes != expectedBytes { + t.Errorf("Expected reservation %d bytes, got %d bytes", expectedBytes, actualBytes) + } +} + +// createStaleOperationDetails creates operation details with a stale timestamp +func createStaleOperationDetails(taskID, taskStatus string, hoursAgo int) cnsvolumeoprequestv1alpha1.OperationDetails { + return cnsvolumeoprequestv1alpha1.OperationDetails{ + TaskInvocationTimestamp: metav1.NewTime(time.Now().Add(-time.Duration(hoursAgo) * time.Hour)), + TaskID: taskID, + TaskStatus: taskStatus, + } +} + // TestStoreRequestDetails_CreateSnapshotWithImprovedIdempotencyCheck tests the pattern used in // createSnapshotWithImprovedIdempotencyCheck // Pattern: Single StoreRequestDetails call at the end with final status (Success/Error) @@ -665,3 +732,1055 @@ func TestStoreRequestDetails_RealWorkflowTransitions(t *testing.T) { } }) } + +// createTestCRDInstance creates a CnsVolumeOperationRequest CRD directly on the fake client +// for testing cleanup logic that operates on raw CRDs rather than the store interface. +func createTestCRDInstance( + ctx context.Context, + t *testing.T, + k8sclient client.Client, + name string, + latestOps []cnsvolumeoprequestv1alpha1.OperationDetails, + firstOp cnsvolumeoprequestv1alpha1.OperationDetails, + quotaDetails *cnsvolumeoprequestv1alpha1.QuotaDetails, +) { + t.Helper() + instance := &cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: csiNamespace, + }, + Spec: cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequestSpec{ + Name: name, + }, + Status: cnsvolumeoprequestv1alpha1.CnsVolumeOperationRequestStatus{ + FirstOperationDetails: firstOp, + LatestOperationDetails: latestOps, + StorageQuotaDetails: quotaDetails, + }, + } + if err := k8sclient.Create(ctx, instance); err != nil { + t.Fatalf("Failed to create test CRD instance %s: %v", name, err) + } +} + +// TestForceTransitionStaleInProgressToError tests that stale InProgress entries +// are correctly transitioned to Error status. +func TestForceTransitionStaleInProgressToError(t *testing.T) { + store, ctx := setupTestEnvironment(t, true) + + t.Run("transitions single stale InProgress entry to Error", func(t *testing.T) { + instanceName := "stale-single-inprogress" + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + createStaleOperationDetails("task-stale-1", TaskInvocationStatusInProgress, 72), + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + instance := mustGetCRDDirectly(t, store, ctx, instanceName) + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated := mustGetCRDDirectly(t, store, ctx, instanceName) + if updated.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected LatestOperationDetails[0] TaskStatus %s, got %s", + TaskInvocationStatusError, updated.Status.LatestOperationDetails[0].TaskStatus) + } + if updated.Status.LatestOperationDetails[0].Error == "" { + t.Error("Expected error message to be set on transitioned entry") + } + if updated.Status.FirstOperationDetails.TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected FirstOperationDetails TaskStatus %s, got %s", + TaskInvocationStatusError, updated.Status.FirstOperationDetails.TaskStatus) + } + }) + + t.Run("transitions mixed entries - only InProgress to Error", func(t *testing.T) { + instanceName := "stale-mixed-entries" + errorOp := createStaleOperationDetails("task-error-1", TaskInvocationStatusError, 72) + errorOp.Error = "original error from session timeout" + inProgressOp := createStaleOperationDetails("task-stuck-2", TaskInvocationStatusInProgress, 72) + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{errorOp, inProgressOp} + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, errorOp, nil) + + instance := mustGetCRDDirectly(t, store, ctx, instanceName) + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated := mustGetCRDDirectly(t, store, ctx, instanceName) + if updated.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected first entry to remain Error, got %s", + updated.Status.LatestOperationDetails[0].TaskStatus) + } + if updated.Status.LatestOperationDetails[0].Error != "original error from session timeout" { + t.Errorf("Expected original error message to be preserved, got %s", + updated.Status.LatestOperationDetails[0].Error) + } + if updated.Status.LatestOperationDetails[1].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected second (InProgress) entry to become Error, got %s", + updated.Status.LatestOperationDetails[1].TaskStatus) + } + if updated.Status.LatestOperationDetails[1].Error == "" { + t.Error("Expected error message on transitioned InProgress entry") + } + }) + + t.Run("does not modify instance with no InProgress entries", func(t *testing.T) { + instanceName := "stale-no-inprogress" + staleTime := metav1.NewTime(time.Now().Add(-72 * time.Hour)) + errorOp := cnsvolumeoprequestv1alpha1.OperationDetails{ + TaskInvocationTimestamp: staleTime, + TaskID: "task-already-error", + TaskStatus: TaskInvocationStatusError, + Error: "already errored", + } + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{errorOp} + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, errorOp, nil) + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + originalRV := instance.ResourceVersion + + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + if updated.ResourceVersion != originalRV { + t.Error("Expected instance not to be updated when no InProgress entries exist") + } + }) + + t.Run("transitions InProgress in FirstOperationDetails even when LatestOps differ", func(t *testing.T) { + instanceName := "stale-first-op-inprogress" + staleTime := metav1.NewTime(time.Now().Add(-72 * time.Hour)) + firstOp := cnsvolumeoprequestv1alpha1.OperationDetails{ + TaskInvocationTimestamp: staleTime, + TaskID: "task-first-stuck", + TaskStatus: TaskInvocationStatusInProgress, + } + latestOp := cnsvolumeoprequestv1alpha1.OperationDetails{ + TaskInvocationTimestamp: staleTime, + TaskID: "task-latest-error", + TaskStatus: TaskInvocationStatusError, + Error: "some error", + } + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{latestOp} + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, firstOp, nil) + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get updated CRD: %v", err) + } + + if updated.Status.FirstOperationDetails.TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected FirstOperationDetails TaskStatus %s, got %s", + TaskInvocationStatusError, updated.Status.FirstOperationDetails.TaskStatus) + } + if updated.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected LatestOperationDetails[0] to remain Error, got %s", + updated.Status.LatestOperationDetails[0].TaskStatus) + } + if updated.Status.LatestOperationDetails[0].Error != "some error" { + t.Errorf("Expected original error preserved, got %s", + updated.Status.LatestOperationDetails[0].Error) + } + }) +} + +// TestCleanupStaleInstances_StaleInProgressHandling tests the cleanupStaleInstances +// behavior with respect to stale InProgress tasks. +func TestCleanupStaleInstances_StaleInProgressHandling(t *testing.T) { + store, ctx := setupTestEnvironment(t, true) + + t.Run("skips recent InProgress tasks", func(t *testing.T) { + instanceName := "recent-inprogress" + recentTime := metav1.NewTime(time.Now().Add(-1 * time.Hour)) + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: recentTime, + TaskID: "task-recent", + TaskStatus: TaskInvocationStatusInProgress, + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + staleInProgressCutoff := time.Now().Add(-staleInProgressTaskThreshold) + latestOp := ops[len(ops)-1] + + shouldTransition := latestOp.TaskStatus == TaskInvocationStatusInProgress && + latestOp.TaskInvocationTimestamp.Time.Before(staleInProgressCutoff) + + if shouldTransition { + t.Error("Recent InProgress task should NOT be considered stale") + } + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + if instance.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusInProgress { + t.Errorf("Expected task to remain InProgress, got %s", + instance.Status.LatestOperationDetails[0].TaskStatus) + } + }) + + t.Run("transitions InProgress tasks older than 48 hours", func(t *testing.T) { + instanceName := "old-inprogress" + oldTime := metav1.NewTime(time.Now().Add(-72 * time.Hour)) + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: oldTime, + TaskID: "task-old", + TaskStatus: TaskInvocationStatusInProgress, + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + staleInProgressCutoff := time.Now().Add(-staleInProgressTaskThreshold) + latestOp := ops[len(ops)-1] + + shouldTransition := latestOp.TaskStatus == TaskInvocationStatusInProgress && + latestOp.TaskInvocationTimestamp.Time.Before(staleInProgressCutoff) + + if !shouldTransition { + t.Error("72-hour-old InProgress task SHOULD be considered stale") + } + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get updated CRD: %v", err) + } + if updated.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected stale InProgress to be transitioned to Error, got %s", + updated.Status.LatestOperationDetails[0].TaskStatus) + } + }) + + t.Run("deletes Error instances older than 15 minutes", func(t *testing.T) { + instanceName := "old-error-instance" + oldTime := metav1.NewTime(time.Now().Add(-30 * time.Minute)) + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: oldTime, + TaskID: "task-old-error", + TaskStatus: TaskInvocationStatusError, + Error: "some error", + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + cutoffTime := time.Now().Add(-15 * time.Minute) + latestOp := ops[len(ops)-1] + + shouldDelete := latestOp.TaskStatus != TaskInvocationStatusInProgress && + !latestOp.TaskInvocationTimestamp.Time.After(cutoffTime) + + if !shouldDelete { + t.Error("30-minute-old Error instance SHOULD be eligible for deletion") + } + + err := store.DeleteRequestDetails(ctx, instanceName) + if err != nil { + t.Fatalf("Failed to delete instance: %v", err) + } + + _, err = getCRDDirectly(ctx, store, instanceName) + if err == nil { + t.Error("Expected instance to be deleted") + } + }) + + t.Run("does not delete recent Error instances", func(t *testing.T) { + instanceName := "recent-error-instance" + recentTime := metav1.NewTime(time.Now().Add(-5 * time.Minute)) + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: recentTime, + TaskID: "task-recent-error", + TaskStatus: TaskInvocationStatusError, + Error: "recent error", + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + cutoffTime := time.Now().Add(-15 * time.Minute) + latestOp := ops[len(ops)-1] + + shouldDelete := latestOp.TaskStatus != TaskInvocationStatusInProgress && + !latestOp.TaskInvocationTimestamp.Time.After(cutoffTime) + + if shouldDelete { + t.Error("5-minute-old Error instance should NOT be eligible for deletion") + } + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + if instance == nil { + t.Error("Instance should still exist") + } + }) +} + +// TestCleanupStaleInstances_EndToEndScenario simulates the exact scenario +// from the bug report: session expiry during CreateVolume leaves CRs stuck +// with InProgress tasks and leaked quota reservations. +func TestCleanupStaleInstances_EndToEndScenario(t *testing.T) { + store, ctx := setupTestEnvironment(t, true) + + t.Run("session expiry leaves stuck InProgress - cleanup transitions and deletes", func(t *testing.T) { + instanceName := "pvc-stuck-session-expiry" + reservedQty := resource.NewQuantity(400*1024*1024*1024, resource.BinarySI) + quota := &cnsvolumeoprequestv1alpha1.QuotaDetails{ + Reserved: reservedQty, + StoragePolicyId: "policy-123", + StorageClassName: "storage-class-1", + Namespace: "test-ns", + } + + // Simulate the exact CR state from the bug report: + // First task errored with session auth failure, second task stuck InProgress + firstTaskTime := metav1.NewTime(time.Now().Add(-72 * time.Hour)) + secondTaskTime := metav1.NewTime(time.Now().Add(-71*time.Hour - 45*time.Minute)) + + errorOp := cnsvolumeoprequestv1alpha1.OperationDetails{ + TaskInvocationTimestamp: firstTaskTime, + TaskID: "task-484682", + TaskStatus: TaskInvocationStatusError, + Error: "destroy property filter failed with ServerFaultCode: " + + "The session is not authenticated.", + } + stuckOp := cnsvolumeoprequestv1alpha1.OperationDetails{ + TaskInvocationTimestamp: secondTaskTime, + TaskID: "task-485876", + TaskStatus: TaskInvocationStatusInProgress, + } + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{errorOp, stuckOp} + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, errorOp, quota) + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + + // Verify the stuck state matches what we expect + latestOps := instance.Status.LatestOperationDetails + if latestOps[len(latestOps)-1].TaskStatus != TaskInvocationStatusInProgress { + t.Fatalf("Test setup error: expected last entry to be InProgress") + } + + // Step 1: forceTransitionStaleInProgressToError should fix the stuck entries + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get updated CRD: %v", err) + } + + // Verify all InProgress entries are now Error + for i, op := range updated.Status.LatestOperationDetails { + if op.TaskStatus == TaskInvocationStatusInProgress { + t.Errorf("Entry %d should no longer be InProgress after force transition", i) + } + } + + // The last entry should now be Error (was InProgress) + lastOp := updated.Status.LatestOperationDetails[len(updated.Status.LatestOperationDetails)-1] + if lastOp.TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected last entry to be Error, got %s", lastOp.TaskStatus) + } + if lastOp.TaskID != "task-485876" { + t.Errorf("Expected TaskID task-485876, got %s", lastOp.TaskID) + } + + // Verify quota details are still present (they are released on CR deletion) + if updated.Status.StorageQuotaDetails == nil { + t.Error("StorageQuotaDetails should still be present before deletion") + } + if updated.Status.StorageQuotaDetails.Reserved.Value() != 400*1024*1024*1024 { + t.Errorf("Expected reserved 400Gi, got %v", updated.Status.StorageQuotaDetails.Reserved) + } + + // Step 2: Now that the entry is Error, cleanup should delete the CR + // (the entry is older than 15 minutes) + cutoffTime := time.Now().Add(-15 * time.Minute) + lastOpAfterTransition := updated.Status.LatestOperationDetails[len(updated.Status.LatestOperationDetails)-1] + shouldDelete := lastOpAfterTransition.TaskStatus != TaskInvocationStatusInProgress && + !lastOpAfterTransition.TaskInvocationTimestamp.Time.After(cutoffTime) + + if !shouldDelete { + t.Error("After force transition, the instance should be eligible for deletion") + } + + err = store.DeleteRequestDetails(ctx, instanceName) + if err != nil { + t.Fatalf("Failed to delete instance: %v", err) + } + + _, err = getCRDDirectly(ctx, store, instanceName) + if err == nil { + t.Error("Expected instance to be deleted after cleanup") + } + }) + + t.Run("InProgress task just under 48 hours is not transitioned", func(t *testing.T) { + instanceName := "pvc-boundary-48h" + // Use 47h59m to ensure we're safely under the threshold and avoid clock precision issues + justUnderBoundaryTime := metav1.NewTime(time.Now().Add(-47*time.Hour - 59*time.Minute)) + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: justUnderBoundaryTime, + TaskID: "task-boundary", + TaskStatus: TaskInvocationStatusInProgress, + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + staleInProgressCutoff := time.Now().Add(-staleInProgressTaskThreshold) + latestOp := ops[len(ops)-1] + + shouldTransition := latestOp.TaskStatus == TaskInvocationStatusInProgress && + latestOp.TaskInvocationTimestamp.Time.Before(staleInProgressCutoff) + + if shouldTransition { + t.Error("Task just under 48h should NOT be transitioned") + } + }) + + t.Run("InProgress task at 47 hours is not transitioned", func(t *testing.T) { + instanceName := "pvc-47h-inprogress" + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: metav1.NewTime(time.Now().Add(-47 * time.Hour)), + TaskID: "task-47h", + TaskStatus: TaskInvocationStatusInProgress, + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + staleInProgressCutoff := time.Now().Add(-staleInProgressTaskThreshold) + + shouldTransition := ops[0].TaskStatus == TaskInvocationStatusInProgress && + ops[0].TaskInvocationTimestamp.Time.Before(staleInProgressCutoff) + + if shouldTransition { + t.Error("47-hour InProgress task should NOT be transitioned") + } + }) + + t.Run("InProgress task at 49 hours is transitioned", func(t *testing.T) { + instanceName := "pvc-49h-inprogress" + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: metav1.NewTime(time.Now().Add(-49 * time.Hour)), + TaskID: "task-49h", + TaskStatus: TaskInvocationStatusInProgress, + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + staleInProgressCutoff := time.Now().Add(-staleInProgressTaskThreshold) + + shouldTransition := ops[0].TaskStatus == TaskInvocationStatusInProgress && + ops[0].TaskInvocationTimestamp.Time.Before(staleInProgressCutoff) + + if !shouldTransition { + t.Error("49-hour InProgress task SHOULD be transitioned") + } + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get updated CRD: %v", err) + } + if updated.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected Error, got %s", updated.Status.LatestOperationDetails[0].TaskStatus) + } + }) +} + +// TestStaleInProgressTaskThreshold verifies the threshold constant value. +func TestStaleInProgressTaskThreshold(t *testing.T) { + if staleInProgressTaskThreshold != 48*time.Hour { + t.Errorf("Expected staleInProgressTaskThreshold to be 48h, got %v", staleInProgressTaskThreshold) + } +} + +// testErrorPathTransition is a helper that tests the InProgress->Error transition pattern +func testErrorPathTransition(t *testing.T, store *operationRequestStore, ctx context.Context, + instanceName, taskID, errorMsg string) { + t.Helper() + + // Setup: Create InProgress operation + inProgress := createTestVolumeOperationDetails(instanceName, "", "", taskID, + TaskInvocationStatusInProgress, "", nil) + mustStoreRequestDetails(t, store, ctx, inProgress) + + // Simulate error path: transition to Error + errorDetails := createTestVolumeOperationDetails(instanceName, "", "", taskID, + TaskInvocationStatusError, errorMsg, nil) + mustStoreRequestDetails(t, store, ctx, errorDetails) + + // Verify: Final state is Error + result := mustGetRequestDetails(t, store, ctx, instanceName) + assertTaskStatus(t, result, TaskInvocationStatusError) +} + +// TestErrorPathTransitionsPreventStuckInProgress verifies that when an operation +// encounters an error after task creation (e.g., waitOnTask failure, nil taskResult, +// getTaskResultFromTaskInfo error), the CR is correctly transitioned to Error status. +// This validates the fixes across all operation types: create, delete, expand, snapshot. +func TestErrorPathTransitionsPreventStuckInProgress(t *testing.T) { + store, ctx := setupTestEnvironment(t, false) + + tests := []struct { + name string + instanceName string + taskID string + errorMsg string + }{ + { + name: "CreateVolume: waitOnTask general error transitions InProgress to Error", + instanceName: "pvc-create-waitontask-err", + taskID: "task-create-100", + errorMsg: "destroy property filter failed with ServerFaultCode: The session is not authenticated", + }, + { + name: "CreateVolume: nil taskResult transitions InProgress to Error", + instanceName: "pvc-create-nil-result", + taskID: "task-create-200", + errorMsg: "taskResult is empty for CreateVolume task", + }, + { + name: "CreateVolume: QueryAllVolume failure transitions InProgress to Error", + instanceName: "pvc-create-query-fail", + taskID: "task-create-300", + errorMsg: "failed to query CNS for volume", + }, + { + name: "DeleteVolume: waitOnTask general error transitions InProgress to Error", + instanceName: "pvc-delete-waitontask-err", + taskID: "task-delete-100", + errorMsg: "context deadline exceeded", + }, + { + name: "DeleteVolume: nil taskInfo transitions InProgress to Error", + instanceName: "pvc-delete-nil-taskinfo", + taskID: "task-delete-200", + errorMsg: "taskInfo is nil", + }, + { + name: "DeleteVolume: nil taskResult transitions InProgress to Error", + instanceName: "pvc-delete-nil-result", + taskID: "task-delete-300", + errorMsg: "taskResult is empty for DeleteVolume task", + }, + { + name: "ExpandVolume: nil taskResult transitions InProgress to Error", + instanceName: "pvc-expand-nil-result", + taskID: "task-expand-100", + errorMsg: "taskResult is empty for ExpandVolume task", + }, + { + name: "ExpandVolume: getTaskResultFromTaskInfo error transitions InProgress to Error", + instanceName: "pvc-expand-taskresult-err", + taskID: "task-expand-200", + errorMsg: "failed to get task result", + }, + { + name: "CreateSnapshot: waitOnTask general error transitions InProgress to Error", + instanceName: "snap-create-waitontask-err", + taskID: "task-snap-100", + errorMsg: "Failed to get taskInfo for CreateSnapshots task", + }, + { + name: "CreateSnapshot: GetTaskResult nil transitions InProgress to Error", + instanceName: "snap-create-nil-result", + taskID: "task-snap-200", + errorMsg: "unable to find the task result for CreateSnapshots task", + }, + { + name: "DeleteSnapshot: getTaskResultFromTaskInfo error transitions InProgress to Error", + instanceName: "snap-delete-taskresult-err", + taskID: "task-snapdel-100", + errorMsg: "failed to get the task result for DeleteSnapshots task", + }, + { + name: "DeleteSnapshot: nil taskResult transitions InProgress to Error", + instanceName: "snap-delete-nil-result", + taskID: "task-snapdel-200", + errorMsg: "task result is empty for DeleteSnapshot task", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + testErrorPathTransition(t, store, ctx, tc.instanceName, tc.taskID, tc.errorMsg) + }) + } +} + +// TestDeferSkipsInProgressAndPersistsError verifies the defer guard logic: +// the defer in volume operation functions only persists when status != InProgress. +// This confirms that once we set Error on volumeOperationDetails, the defer will persist it. +func TestDeferSkipsInProgressAndPersistsError(t *testing.T) { + store, ctx := setupTestEnvironment(t, false) + + t.Run("InProgress status is NOT persisted by defer (simulates old bug)", func(t *testing.T) { + name := "pvc-defer-skip-inprogress" + // Store initial InProgress + details := createTestVolumeOperationDetails( + name, "", "", "task-defer-1", TaskInvocationStatusInProgress, "", nil) + if err := store.StoreRequestDetails(ctx, details); err != nil { + t.Fatalf("Setup: %v", err) + } + + // Simulate what the defer guard checks: status == InProgress -> skip persist + got, err := store.GetRequestDetails(ctx, name) + if err != nil { + t.Fatalf("Get: %v", err) + } + if got.OperationDetails.TaskStatus != TaskInvocationStatusInProgress { + t.Fatalf("Expected InProgress, got %s", got.OperationDetails.TaskStatus) + } + // The defer would NOT call StoreRequestDetails because status is InProgress + // This is the condition that previously caused stuck CRs + }) + + t.Run("Error status IS persisted by defer (simulates fix)", func(t *testing.T) { + name := "pvc-defer-persist-error" + // Store initial InProgress + inProgressDetails := createTestVolumeOperationDetails( + name, "", "", "task-defer-2", TaskInvocationStatusInProgress, "", nil) + if err := store.StoreRequestDetails(ctx, inProgressDetails); err != nil { + t.Fatalf("Setup: %v", err) + } + + // Simulate the fix: error path sets Error BEFORE the defer runs + errorDetails := createTestVolumeOperationDetails( + name, "", "", "task-defer-2", + TaskInvocationStatusError, "session expired", nil) + // The defer guard checks: status != InProgress -> persist + if errorDetails.OperationDetails.TaskStatus != TaskInvocationStatusInProgress { + // This is what the defer does + if err := store.StoreRequestDetails(ctx, errorDetails); err != nil { + t.Fatalf("StoreError: %v", err) + } + } + + got, err := store.GetRequestDetails(ctx, name) + if err != nil { + t.Fatalf("Get: %v", err) + } + if got.OperationDetails.TaskStatus != TaskInvocationStatusError { + t.Errorf("want Error, got %s", got.OperationDetails.TaskStatus) + } + }) +} + +// TestEndToEndStuckInProgressWithImprovedIdempotency simulates the complete lifecycle +// with improved idempotency (CSI Transaction Support disabled): task created, waitOnTask +// fails, error path now correctly marks Error, defer persists it, and cleanup can delete it. +func TestEndToEndStuckInProgressWithImprovedIdempotency(t *testing.T) { + store, ctx := setupTestEnvironment(t, false) + + t.Run("improved idempotency: error path marks Error and cleanup deletes CR", func(t *testing.T) { + name := "pvc-e2e-improved-idempotency" + + // Step 1: Initial InProgress (before CNS task) + initial := createTestVolumeOperationDetails( + name, "", "", "", TaskInvocationStatusInProgress, "", nil) + if err := store.StoreRequestDetails(ctx, initial); err != nil { + t.Fatalf("Step1: %v", err) + } + + // Step 2: Task created, update with TaskID (still InProgress) + withTask := createTestVolumeOperationDetails( + name, "", "", "task-e2e-100", TaskInvocationStatusInProgress, "", nil) + withTask.OperationDetails.TaskInvocationTimestamp = initial.OperationDetails.TaskInvocationTimestamp + if err := store.StoreRequestDetails(ctx, withTask); err != nil { + t.Fatalf("Step2: %v", err) + } + + // Step 3: waitOnTask fails (session error) -> new code marks Error + errorDetails := createTestVolumeOperationDetails( + name, "", "", "task-e2e-100", TaskInvocationStatusError, + "destroy property filter failed with ServerFaultCode: The session is not authenticated", nil) + errorDetails.OperationDetails.TaskInvocationTimestamp = initial.OperationDetails.TaskInvocationTimestamp + if err := store.StoreRequestDetails(ctx, errorDetails); err != nil { + t.Fatalf("Step3: %v", err) + } + + // Verify Error state + got, err := store.GetRequestDetails(ctx, name) + if err != nil { + t.Fatalf("Get: %v", err) + } + if got.OperationDetails.TaskStatus != TaskInvocationStatusError { + t.Fatalf("want Error, got %s", got.OperationDetails.TaskStatus) + } + + // Step 4: Cleanup can now delete this CR (it's Error + old) + if err := store.DeleteRequestDetails(ctx, name); err != nil { + t.Fatalf("Delete: %v", err) + } + _, err = store.GetRequestDetails(ctx, name) + if err == nil { + t.Error("Expected CR to be deleted") + } + }) + + t.Run("improved idempotency: quota released when Error persisted on retry", func(t *testing.T) { + name := "pvc-e2e-quota-release" + quota := createTestQuotaDetails(400 * 1024 * 1024 * 1024) + + initial := createTestVolumeOperationDetails( + name, "", "", "task-quota-100", TaskInvocationStatusInProgress, "", quota) + if err := store.StoreRequestDetails(ctx, initial); err != nil { + t.Fatalf("Setup: %v", err) + } + + // On retry error, quota Reserved is zeroed + quota.Reserved = resource.NewQuantity(0, resource.BinarySI) + errorDetails := createTestVolumeOperationDetails( + name, "", "", "task-quota-100", TaskInvocationStatusError, + "session expired", quota) + errorDetails.OperationDetails.TaskInvocationTimestamp = initial.OperationDetails.TaskInvocationTimestamp + if err := store.StoreRequestDetails(ctx, errorDetails); err != nil { + t.Fatalf("StoreError: %v", err) + } + + got, err := store.GetRequestDetails(ctx, name) + if err != nil { + t.Fatalf("Get: %v", err) + } + if got.OperationDetails.TaskStatus != TaskInvocationStatusError { + t.Errorf("want Error, got %s", got.OperationDetails.TaskStatus) + } + }) +} + +// TestReservationRetainedOnFirstAttemptError verifies that on the first attempt +// (no prior CVOR exists), an error should NOT zero the reservation. The quota +// stays held so the sidecar retry can use it. On retries (CVOR already exists), +// an error should zero the reservation to release it. +// These tests use the exported IsRetryAttempt helper — the same function that +// the production code in manager.go calls — so they validate the actual logic +// path rather than reimplementing the defer condition inline. +func TestReservationRetainedOnFirstAttemptError(t *testing.T) { + store, ctx := setupTestEnvironment(t, true) + isPodVMOnStretchSupervisorFSSEnabled = true + defer func() { isPodVMOnStretchSupervisorFSSEnabled = false }() + + // shouldReleaseReservation mirrors the condition in persistVolumeOperationDetails. + shouldReleaseReservation := func(details *VolumeOperationRequestDetails, isRetry bool) bool { + if details == nil || details.QuotaDetails == nil { + return false + } + ts := details.OperationDetails.TaskStatus + return ts == TaskInvocationStatusSuccess || (ts == TaskInvocationStatusError && isRetry) + } + + t.Run("first attempt error keeps reservation non-zero", func(t *testing.T) { + name := "pvc-first-attempt-keep-reservation" + reservedBytes := int64(400 * 1024 * 1024 * 1024) + quota := createTestQuotaDetails(reservedBytes) + + // GetRequestDetails returns NotFound -> IsRetryAttempt == false + existing, err := store.GetRequestDetails(ctx, name) + isRetry := IsRetryAttempt(existing, err) + if isRetry { + t.Fatal("Expected IsRetryAttempt to be false on first attempt") + } + + inProgress := createTestVolumeOperationDetails( + name, "", "", "task-first-1", TaskInvocationStatusInProgress, "", quota) + if err := store.StoreRequestDetails(ctx, inProgress); err != nil { + t.Fatalf("StoreInProgress: %v", err) + } + + errorDetails := createTestVolumeOperationDetails( + name, "", "", "task-first-1", TaskInvocationStatusError, "session expired", quota) + if shouldReleaseReservation(errorDetails, isRetry) { + errorDetails.QuotaDetails.Reserved = resource.NewQuantity(0, resource.BinarySI) + } + if err := store.StoreRequestDetails(ctx, errorDetails); err != nil { + t.Fatalf("StoreError: %v", err) + } + + crd := mustGetCRDDirectly(t, store, ctx, name) + assertQuotaReservation(t, crd, reservedBytes) + }) + + t.Run("retry attempt error zeros reservation", func(t *testing.T) { + name := "pvc-retry-attempt-zero-reservation" + reservedBytes := int64(400 * 1024 * 1024 * 1024) + quota := createTestQuotaDetails(reservedBytes) + + inProgress1 := createTestVolumeOperationDetails( + name, "", "", "task-retry-1", TaskInvocationStatusInProgress, "", quota) + mustStoreRequestDetails(t, store, ctx, inProgress1) + error1 := createTestVolumeOperationDetails( + name, "", "", "task-retry-1", TaskInvocationStatusError, "first error", quota) + mustStoreRequestDetails(t, store, ctx, error1) + + // GetRequestDetails succeeds -> IsRetryAttempt == true + existing, err := store.GetRequestDetails(ctx, name) + isRetry := IsRetryAttempt(existing, err) + if !isRetry { + t.Fatal("Expected IsRetryAttempt to be true on retry") + } + + quota2 := createTestQuotaDetails(reservedBytes) + inProgress2 := createTestVolumeOperationDetails( + name, "", "", "task-retry-2", TaskInvocationStatusInProgress, "", quota2) + mustStoreRequestDetails(t, store, ctx, inProgress2) + + error2 := createTestVolumeOperationDetails( + name, "", "", "task-retry-2", TaskInvocationStatusError, "second error", quota2) + if shouldReleaseReservation(error2, isRetry) { + error2.QuotaDetails.Reserved = resource.NewQuantity(0, resource.BinarySI) + } + mustStoreRequestDetails(t, store, ctx, error2) + + crd := mustGetCRDDirectly(t, store, ctx, name) + assertQuotaReservation(t, crd, 0) + }) + + t.Run("success always zeros reservation regardless of retry", func(t *testing.T) { + name := "pvc-success-always-zeros" + reservedBytes := int64(400 * 1024 * 1024 * 1024) + quota := createTestQuotaDetails(reservedBytes) + + // First attempt -> IsRetryAttempt == false + existing, err := store.GetRequestDetails(ctx, name) + isRetry := IsRetryAttempt(existing, err) + + inProgress := createTestVolumeOperationDetails( + name, "", "", "task-success-1", TaskInvocationStatusInProgress, "", quota) + if err := store.StoreRequestDetails(ctx, inProgress); err != nil { + t.Fatalf("StoreInProgress: %v", err) + } + + success := createTestVolumeOperationDetails( + name, "vol-123", "", "task-success-1", TaskInvocationStatusSuccess, "", quota) + if shouldReleaseReservation(success, isRetry) { + success.QuotaDetails.Reserved = resource.NewQuantity(0, resource.BinarySI) + } + if err := store.StoreRequestDetails(ctx, success); err != nil { + t.Fatalf("StoreSuccess: %v", err) + } + + crd, err := getCRDDirectly(ctx, store, name) + if err != nil { + t.Fatalf("GetCRD: %v", err) + } + if crd.Status.StorageQuotaDetails == nil { + t.Fatal("Expected StorageQuotaDetails to exist") + } + if crd.Status.StorageQuotaDetails.Reserved.Value() != 0 { + t.Errorf("Success should always zero reservation, got %d", + crd.Status.StorageQuotaDetails.Reserved.Value()) + } + }) +} + +// TestIsRetryAttempt verifies the exported IsRetryAttempt helper function +// that the production code in manager.go relies on. +func TestIsRetryAttempt(t *testing.T) { + tests := []struct { + name string + details *VolumeOperationRequestDetails + err error + want bool + }{ + { + name: "nil details with NotFound error is not a retry", + details: nil, + err: fmt.Errorf("not found"), + want: false, + }, + { + name: "nil details with nil error is not a retry", + details: nil, + err: nil, + want: false, + }, + { + name: "non-nil details with nil error is a retry", + details: &VolumeOperationRequestDetails{ + Name: "test", + OperationDetails: &OperationDetails{ + TaskStatus: TaskInvocationStatusError, + }, + }, + err: nil, + want: true, + }, + { + name: "non-nil details with non-nil error is not a retry", + details: &VolumeOperationRequestDetails{ + Name: "test", + }, + err: fmt.Errorf("some error"), + want: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := IsRetryAttempt(tc.details, tc.err) + if got != tc.want { + t.Errorf("IsRetryAttempt() = %v, want %v", got, tc.want) + } + }) + } +} + +// TestForceTransitionZerosQuotaReservation verifies that forceTransitionStaleInProgressToError +// proactively zeroes the quota reservation when isPodVMOnStretchSupervisorFSSEnabled is true. +func TestForceTransitionZerosQuotaReservation(t *testing.T) { + store, ctx := setupTestEnvironment(t, true) + + t.Run("zeroes reservation when FSS enabled and quota exists", func(t *testing.T) { + isPodVMOnStretchSupervisorFSSEnabled = true + defer func() { isPodVMOnStretchSupervisorFSSEnabled = false }() + + instanceName := "stale-with-quota" + staleTime := metav1.NewTime(time.Now().Add(-72 * time.Hour)) + reservedQty := resource.NewQuantity(400*1024*1024*1024, resource.BinarySI) + quota := &cnsvolumeoprequestv1alpha1.QuotaDetails{ + Reserved: reservedQty, + StoragePolicyId: "policy-123", + StorageClassName: "sc-1", + Namespace: "ns-1", + } + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: staleTime, + TaskID: "task-stale-quota", + TaskStatus: TaskInvocationStatusInProgress, + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], quota) + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get updated CRD: %v", err) + } + + if updated.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected Error, got %s", updated.Status.LatestOperationDetails[0].TaskStatus) + } + if updated.Status.StorageQuotaDetails == nil { + t.Fatal("Expected StorageQuotaDetails to still exist") + } + if updated.Status.StorageQuotaDetails.Reserved.Value() != 0 { + t.Errorf("Expected reservation to be zeroed, got %d", + updated.Status.StorageQuotaDetails.Reserved.Value()) + } + }) + + t.Run("does not zero reservation when FSS disabled", func(t *testing.T) { + isPodVMOnStretchSupervisorFSSEnabled = false + + instanceName := "stale-with-quota-fss-off" + staleTime := metav1.NewTime(time.Now().Add(-72 * time.Hour)) + reservedBytes := int64(400 * 1024 * 1024 * 1024) + reservedQty := resource.NewQuantity(reservedBytes, resource.BinarySI) + quota := &cnsvolumeoprequestv1alpha1.QuotaDetails{ + Reserved: reservedQty, + StoragePolicyId: "policy-123", + StorageClassName: "sc-1", + Namespace: "ns-1", + } + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: staleTime, + TaskID: "task-stale-quota-fss-off", + TaskStatus: TaskInvocationStatusInProgress, + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], quota) + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get updated CRD: %v", err) + } + + if updated.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected Error, got %s", updated.Status.LatestOperationDetails[0].TaskStatus) + } + if updated.Status.StorageQuotaDetails == nil { + t.Fatal("Expected StorageQuotaDetails to still exist") + } + if updated.Status.StorageQuotaDetails.Reserved.Value() != reservedBytes { + t.Errorf("Expected reservation to remain %d when FSS disabled, got %d", + reservedBytes, updated.Status.StorageQuotaDetails.Reserved.Value()) + } + }) + + t.Run("handles nil StorageQuotaDetails gracefully", func(t *testing.T) { + isPodVMOnStretchSupervisorFSSEnabled = true + defer func() { isPodVMOnStretchSupervisorFSSEnabled = false }() + + instanceName := "stale-no-quota" + staleTime := metav1.NewTime(time.Now().Add(-72 * time.Hour)) + ops := []cnsvolumeoprequestv1alpha1.OperationDetails{ + { + TaskInvocationTimestamp: staleTime, + TaskID: "task-stale-no-quota", + TaskStatus: TaskInvocationStatusInProgress, + }, + } + createTestCRDInstance(ctx, t, store.k8sclient, instanceName, ops, ops[0], nil) + + instance, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get CRD: %v", err) + } + store.forceTransitionStaleInProgressToError(ctx, instance) + + updated, err := getCRDDirectly(ctx, store, instanceName) + if err != nil { + t.Fatalf("Failed to get updated CRD: %v", err) + } + + if updated.Status.LatestOperationDetails[0].TaskStatus != TaskInvocationStatusError { + t.Errorf("Expected Error, got %s", updated.Status.LatestOperationDetails[0].TaskStatus) + } + if updated.Status.StorageQuotaDetails != nil { + t.Error("Expected StorageQuotaDetails to remain nil") + } + }) +} diff --git a/pkg/internalapis/cnsvolumeoperationrequest/util.go b/pkg/internalapis/cnsvolumeoperationrequest/util.go index ea10d8630c..2d2fcc5a03 100644 --- a/pkg/internalapis/cnsvolumeoperationrequest/util.go +++ b/pkg/internalapis/cnsvolumeoperationrequest/util.go @@ -97,6 +97,13 @@ func CreateVolumeOperationRequestDetails(name, volumeID, snapshotID string, capa } } +// IsRetryAttempt returns true when a prior CnsVolumeOperationRequest CR +// already exists for this volume, indicating the current operation is a +// retry rather than a first attempt. +func IsRetryAttempt(details *VolumeOperationRequestDetails, err error) bool { + return err == nil && details != nil +} + // convertToCnsVolumeOperationRequestDetails converts an object of type // OperationDetails to the OperationDetails type defined by the // CnsVolumeOperationRequest Custom Resource.