diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index 67c78cf..0e55ace 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -30,9 +30,11 @@ import ( "github.com/awslabs/operatorpkg/status" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" "k8s.io/utils/ptr" capiv1beta1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -58,7 +60,8 @@ const ( taintsKey = "capacity.cluster-autoscaler.kubernetes.io/taints" maxPodsKey = "capacity.cluster-autoscaler.kubernetes.io/maxPods" - machineAnnotation = "cluster.x-k8s.io/machine" + machineAnnotation = "cluster.x-k8s.io/machine" + machineDeploymentAnnotation = "cluster.x-k8s.io/machine-deployment" ) func NewCloudProvider(ctx context.Context, kubeClient client.Client, machineProvider machine.Provider, machineDeploymentProvider machinedeployment.Provider) *CloudProvider { @@ -130,6 +133,17 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim) machine, err = c.machineProvider.Get(ctx, machineName, machineNamespace) if err != nil { + if apierrors.IsNotFound(err) { + // Machine is already gone. Decrement the MD replicas using the MD annotation so + // the replica count stays accurate, then signal to karpenter that the NodeClaim + // is no longer backed by a real machine. + if mdAnno, ok := nodeClaim.Annotations[machineDeploymentAnnotation]; ok { + if mdNamespace, mdName, parseErr := parseMachineAnnotation(mdAnno); parseErr == nil { + c.tryDecrementMachineDeploymentReplicas(ctx, mdNamespace, mdName) + } + } + return cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("Machine %q in namespace %q no longer exists for NodeClaim %q", machineName, machineNamespace, nodeClaim.Name)) + } return fmt.Errorf("error finding Machine %q in namespace %s to Delete NodeClaim %q: %w", machineName, machineNamespace, nodeClaim.Name, err) } } else { @@ -142,7 +156,19 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim) // check if already deleting if c.machineProvider.IsDeleting(machine) { - // Machine is already deleting, we do not need to annotate it or change the scalable resource replicas. + // The Machine is already being deleted (e.g. CAPI initiated a scale-down). + // We must still decrement the MachineDeployment replica count so that CAPI does not + // provision a replacement Machine to satisfy the current desired count. + machineDeployment, err := c.machineDeploymentFromMachine(ctx, machine) + if err != nil { + return fmt.Errorf("unable to delete NodeClaim %q, cannot find owner MachineDeployment for deleting Machine %q: %w", nodeClaim.Name, machine.Name, err) + } + if machineDeployment.Spec.Replicas != nil && *machineDeployment.Spec.Replicas > 0 { + machineDeployment.Spec.Replicas = ptr.To(*machineDeployment.Spec.Replicas - 1) + if err := c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil { + return fmt.Errorf("unable to delete NodeClaim %q, cannot update MachineDeployment %q replicas for deleting Machine %q: %w", nodeClaim.Name, machineDeployment.Name, machine.Name, err) + } + } return nil } @@ -296,6 +322,17 @@ func (c *CloudProvider) provisionMachine(ctx context.Context, nodeClaim *karpv1. machine, err := c.machineProvider.Get(ctx, machineName, machineNamespace) if err != nil { + if apierrors.IsNotFound(err) { + // The Machine no longer exists. The previous createMachine call already incremented + // the MachineDeployment replicas for a Machine that is now gone. Decrement that + // phantom replica before starting a fresh provisioning attempt to prevent escalation. + if mdAnno, ok := nodeClaim.Annotations[machineDeploymentAnnotation]; ok { + if mdNamespace, mdName, parseErr := parseMachineAnnotation(mdAnno); parseErr == nil { + c.tryDecrementMachineDeploymentReplicas(ctx, mdNamespace, mdName) + } + } + return c.createMachine(ctx, nodeClaim) + } return nil, nil, fmt.Errorf("failed to get NodeClaim's Machine %s : %w", machineName, err) } @@ -343,6 +380,29 @@ func (c *CloudProvider) createMachine(ctx context.Context, nodeClaim *karpv1.Nod return nil, nil, fmt.Errorf("cannot satisfy create, unable to update MachineDeployment %q replicas: %w", machineDeployment.Name, err) } + // rollbackReplicas resets the MachineDeployment replica count to originalReplicas. + // context.WithoutCancel preserves tracing/logging values from ctx while detaching its cancellation, + // so the rollback succeeds even if the outer ctx has already expired. + // We use machineDeployment directly on the first attempt since it already carries the fresh + // ResourceVersion from the successful increment update, avoiding a stale informer cache read. + // Only on conflict do we re-fetch to get the latest ResourceVersion before retrying. + rollbackCtx := context.WithoutCancel(ctx) + rollbackReplicas := func() { + md := machineDeployment + if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + md.Spec.Replicas = ptr.To(originalReplicas) + if err := c.machineDeploymentProvider.Update(rollbackCtx, md); err != nil { + if latest, getErr := c.machineDeploymentProvider.Get(rollbackCtx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace); getErr == nil { + md = latest + } + return err + } + return nil + }); retryErr != nil { + log.Println(fmt.Errorf("error while recovering replicas for MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, retryErr)) + } + } + // TODO (elmiko) it would be nice to have a more elegant solution to the asynchronous machine creation. // Initially, it appeared that we could have a Machine controller which could reconcile new Machines and // then associate them with NodeClaims by using a sentinel value for the Provider ID. But, this may not @@ -352,19 +412,7 @@ func (c *CloudProvider) createMachine(ctx context.Context, nodeClaim *karpv1.Nod machine, err := c.pollForUnclaimedMachineInMachineDeploymentWithTimeout(ctx, machineDeployment, time.Minute) if err != nil { // unable to find a Machine for the NodeClaim, this could be due to timeout or error, but the replica count needs to be reset. - // TODO (elmiko) this could probably use improvement to make it more resilient to errors. - defer func() { - machineDeployment, err = c.machineDeploymentProvider.Get(ctx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace) - if err != nil { - log.Println(fmt.Errorf("error while recovering from failure to find an unclaimed Machine, unable to find MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, err)) - } - - machineDeployment.Spec.Replicas = ptr.To(originalReplicas) - if err = c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil { - log.Println(fmt.Errorf("error while recovering from failure to find an unclaimed Machine: %w", err)) - } - }() - + rollbackReplicas() return nil, nil, fmt.Errorf("cannot satisfy create, unable to find an unclaimed Machine for MachineDeployment %q: %w", machineDeployment.Name, err) } @@ -373,21 +421,56 @@ func (c *CloudProvider) createMachine(ctx context.Context, nodeClaim *karpv1.Nod labels[providers.NodePoolMemberLabel] = "" machine.SetLabels(labels) if err := c.machineProvider.Update(ctx, machine); err != nil { - // if we can't update the Machine with the member label, we need to unwind the addition - // TODO (elmiko) add more logic here to fix the error, if we are in this state it's not clear how to fix, - // since we have a Machine, we should be reducing the replicas and annotating the Machine for deletion. + rollbackReplicas() return nil, nil, fmt.Errorf("cannot satisfy create, unable to label Machine %q as a member: %w", machine.Name, err) } - // Bind the NodeClaim with this machine. + // Bind the NodeClaim with this machine and its owning MachineDeployment. + // The MD annotation is used to decrement replicas if the Machine disappears before + // the NodeClaim is deleted, preventing replica escalation on re-provision. nodeClaim.Annotations[machineAnnotation] = fmt.Sprintf("%s/%s", machine.Namespace, machine.Name) + nodeClaim.Annotations[machineDeploymentAnnotation] = fmt.Sprintf("%s/%s", machineDeployment.Namespace, machineDeployment.Name) if err = c.kubeClient.Update(ctx, nodeClaim); err != nil { + // The Machine was already labeled as a member; remove that label before rolling back replicas + // so future polls can still find unclaimed Machines correctly. + // RetryOnConflict handles potential 409 Conflicts from a stale cached ResourceVersion. + if unlabelErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + m, err := c.machineProvider.Get(rollbackCtx, machine.Name, machine.Namespace) + if err != nil { + return err + } + delete(m.Labels, providers.NodePoolMemberLabel) + return c.machineProvider.Update(rollbackCtx, m) + }); unlabelErr != nil { + log.Println(fmt.Errorf("cannot satisfy create, unable to remove member label from Machine %q during cleanup: %w", machine.Name, unlabelErr)) + } + rollbackReplicas() return nil, nil, fmt.Errorf("cannot satisfy create, unable to update NodeClaim annotations %q: %w", nodeClaim.Name, err) } return machineDeployment, machine, nil } +// tryDecrementMachineDeploymentReplicas decrements the replica count of the named MachineDeployment +// by one. It is a best-effort operation: errors are logged but not returned, because it is used in +// cleanup paths where the caller cannot meaningfully handle a failure. +func (c *CloudProvider) tryDecrementMachineDeploymentReplicas(ctx context.Context, mdNamespace, mdName string) { + cleanupCtx := context.WithoutCancel(ctx) + if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + md, err := c.machineDeploymentProvider.Get(cleanupCtx, mdName, mdNamespace) + if err != nil { + return err + } + if md.Spec.Replicas != nil && *md.Spec.Replicas > 0 { + md.Spec.Replicas = ptr.To(*md.Spec.Replicas - 1) + return c.machineDeploymentProvider.Update(cleanupCtx, md) + } + return nil + }); retryErr != nil { + log.Println(fmt.Errorf("error decrementing replicas for MachineDeployment %q/%q: %w", mdNamespace, mdName, retryErr)) + } +} + func (c *CloudProvider) machineDeploymentFromMachine(ctx context.Context, machine *capiv1beta1.Machine) (*capiv1beta1.MachineDeployment, error) { mdName, found := machine.GetLabels()[capiv1beta1.MachineDeploymentNameLabel] if !found { @@ -487,12 +570,16 @@ func (c *CloudProvider) pollForUnclaimedMachineInMachineDeploymentWithTimeout(ct // this might need to ignore the error for the sake of the timeout return false, fmt.Errorf("error listing unclaimed Machines for MachineDeployment %q: %w", machineDeployment.Name, err) } - if len(machineList) == 0 { - return false, nil + for _, m := range machineList { + // Skip Machines that are already being deleted; claiming one would bind the NodeClaim + // to a Machine that will never become a Node, and the subsequent Delete call would + // not reduce the replica count (IsDeleting guard), leaving MD permanently elevated. + if m.GetDeletionTimestamp().IsZero() { + machine = m + return true, nil + } } - - machine = machineList[0] - return true, nil + return false, nil }) if err != nil { return nil, fmt.Errorf("error polling for an unclaimed Machine in MachineDeployment %q: %w", machineDeployment.Name, err)