Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 111 additions & 24 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ import (
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
capiv1beta1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -58,7 +60,8 @@ const (
taintsKey = "capacity.cluster-autoscaler.kubernetes.io/taints"
maxPodsKey = "capacity.cluster-autoscaler.kubernetes.io/maxPods"

machineAnnotation = "cluster.x-k8s.io/machine"
machineAnnotation = "cluster.x-k8s.io/machine"
machineDeploymentAnnotation = "cluster.x-k8s.io/machine-deployment"
)

func NewCloudProvider(ctx context.Context, kubeClient client.Client, machineProvider machine.Provider, machineDeploymentProvider machinedeployment.Provider) *CloudProvider {
Expand Down Expand Up @@ -130,6 +133,17 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim)

machine, err = c.machineProvider.Get(ctx, machineName, machineNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
// Machine is already gone. Decrement the MD replicas using the MD annotation so
// the replica count stays accurate, then signal to karpenter that the NodeClaim
// is no longer backed by a real machine.
if mdAnno, ok := nodeClaim.Annotations[machineDeploymentAnnotation]; ok {
if mdNamespace, mdName, parseErr := parseMachineAnnotation(mdAnno); parseErr == nil {
c.tryDecrementMachineDeploymentReplicas(ctx, mdNamespace, mdName)
}
}
return cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("Machine %q in namespace %q no longer exists for NodeClaim %q", machineName, machineNamespace, nodeClaim.Name))
}
return fmt.Errorf("error finding Machine %q in namespace %s to Delete NodeClaim %q: %w", machineName, machineNamespace, nodeClaim.Name, err)
}
} else {
Expand All @@ -142,7 +156,19 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim)

// check if already deleting
if c.machineProvider.IsDeleting(machine) {
// Machine is already deleting, we do not need to annotate it or change the scalable resource replicas.
// The Machine is already being deleted (e.g. CAPI initiated a scale-down).
// We must still decrement the MachineDeployment replica count so that CAPI does not
// provision a replacement Machine to satisfy the current desired count.
machineDeployment, err := c.machineDeploymentFromMachine(ctx, machine)
if err != nil {
return fmt.Errorf("unable to delete NodeClaim %q, cannot find owner MachineDeployment for deleting Machine %q: %w", nodeClaim.Name, machine.Name, err)
}
if machineDeployment.Spec.Replicas != nil && *machineDeployment.Spec.Replicas > 0 {
machineDeployment.Spec.Replicas = ptr.To(*machineDeployment.Spec.Replicas - 1)
if err := c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
return fmt.Errorf("unable to delete NodeClaim %q, cannot update MachineDeployment %q replicas for deleting Machine %q: %w", nodeClaim.Name, machineDeployment.Name, machine.Name, err)
}
}
return nil
}

Expand Down Expand Up @@ -296,6 +322,17 @@ func (c *CloudProvider) provisionMachine(ctx context.Context, nodeClaim *karpv1.

machine, err := c.machineProvider.Get(ctx, machineName, machineNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
// The Machine no longer exists. The previous createMachine call already incremented
// the MachineDeployment replicas for a Machine that is now gone. Decrement that
// phantom replica before starting a fresh provisioning attempt to prevent escalation.
if mdAnno, ok := nodeClaim.Annotations[machineDeploymentAnnotation]; ok {
if mdNamespace, mdName, parseErr := parseMachineAnnotation(mdAnno); parseErr == nil {
c.tryDecrementMachineDeploymentReplicas(ctx, mdNamespace, mdName)
}
}
return c.createMachine(ctx, nodeClaim)
}
return nil, nil, fmt.Errorf("failed to get NodeClaim's Machine %s : %w", machineName, err)
}

Expand Down Expand Up @@ -343,6 +380,29 @@ func (c *CloudProvider) createMachine(ctx context.Context, nodeClaim *karpv1.Nod
return nil, nil, fmt.Errorf("cannot satisfy create, unable to update MachineDeployment %q replicas: %w", machineDeployment.Name, err)
}

// rollbackReplicas resets the MachineDeployment replica count to originalReplicas.
// context.WithoutCancel preserves tracing/logging values from ctx while detaching its cancellation,
// so the rollback succeeds even if the outer ctx has already expired.
// We use machineDeployment directly on the first attempt since it already carries the fresh
// ResourceVersion from the successful increment update, avoiding a stale informer cache read.
// Only on conflict do we re-fetch to get the latest ResourceVersion before retrying.
rollbackCtx := context.WithoutCancel(ctx)
rollbackReplicas := func() {
md := machineDeployment
if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
md.Spec.Replicas = ptr.To(originalReplicas)
if err := c.machineDeploymentProvider.Update(rollbackCtx, md); err != nil {
if latest, getErr := c.machineDeploymentProvider.Get(rollbackCtx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace); getErr == nil {
md = latest
}
return err
}
return nil
}); retryErr != nil {
log.Println(fmt.Errorf("error while recovering replicas for MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, retryErr))
}
}

// TODO (elmiko) it would be nice to have a more elegant solution to the asynchronous machine creation.
// Initially, it appeared that we could have a Machine controller which could reconcile new Machines and
// then associate them with NodeClaims by using a sentinel value for the Provider ID. But, this may not
Expand All @@ -352,19 +412,7 @@ func (c *CloudProvider) createMachine(ctx context.Context, nodeClaim *karpv1.Nod
machine, err := c.pollForUnclaimedMachineInMachineDeploymentWithTimeout(ctx, machineDeployment, time.Minute)
if err != nil {
// unable to find a Machine for the NodeClaim, this could be due to timeout or error, but the replica count needs to be reset.
// TODO (elmiko) this could probably use improvement to make it more resilient to errors.
defer func() {
machineDeployment, err = c.machineDeploymentProvider.Get(ctx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace)
if err != nil {
log.Println(fmt.Errorf("error while recovering from failure to find an unclaimed Machine, unable to find MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, err))
}

machineDeployment.Spec.Replicas = ptr.To(originalReplicas)
if err = c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
log.Println(fmt.Errorf("error while recovering from failure to find an unclaimed Machine: %w", err))
}
}()

rollbackReplicas()
return nil, nil, fmt.Errorf("cannot satisfy create, unable to find an unclaimed Machine for MachineDeployment %q: %w", machineDeployment.Name, err)
}

Expand All @@ -373,21 +421,56 @@ func (c *CloudProvider) createMachine(ctx context.Context, nodeClaim *karpv1.Nod
labels[providers.NodePoolMemberLabel] = ""
machine.SetLabels(labels)
if err := c.machineProvider.Update(ctx, machine); err != nil {
// if we can't update the Machine with the member label, we need to unwind the addition
// TODO (elmiko) add more logic here to fix the error, if we are in this state it's not clear how to fix,
// since we have a Machine, we should be reducing the replicas and annotating the Machine for deletion.
rollbackReplicas()
return nil, nil, fmt.Errorf("cannot satisfy create, unable to label Machine %q as a member: %w", machine.Name, err)
}

// Bind the NodeClaim with this machine.
// Bind the NodeClaim with this machine and its owning MachineDeployment.
// The MD annotation is used to decrement replicas if the Machine disappears before
// the NodeClaim is deleted, preventing replica escalation on re-provision.
nodeClaim.Annotations[machineAnnotation] = fmt.Sprintf("%s/%s", machine.Namespace, machine.Name)
nodeClaim.Annotations[machineDeploymentAnnotation] = fmt.Sprintf("%s/%s", machineDeployment.Namespace, machineDeployment.Name)
if err = c.kubeClient.Update(ctx, nodeClaim); err != nil {
// The Machine was already labeled as a member; remove that label before rolling back replicas
// so future polls can still find unclaimed Machines correctly.
// RetryOnConflict handles potential 409 Conflicts from a stale cached ResourceVersion.
if unlabelErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
m, err := c.machineProvider.Get(rollbackCtx, machine.Name, machine.Namespace)
if err != nil {
return err
}
delete(m.Labels, providers.NodePoolMemberLabel)
return c.machineProvider.Update(rollbackCtx, m)
}); unlabelErr != nil {
log.Println(fmt.Errorf("cannot satisfy create, unable to remove member label from Machine %q during cleanup: %w", machine.Name, unlabelErr))
}
rollbackReplicas()
return nil, nil, fmt.Errorf("cannot satisfy create, unable to update NodeClaim annotations %q: %w", nodeClaim.Name, err)
}

return machineDeployment, machine, nil
}

// tryDecrementMachineDeploymentReplicas decrements the replica count of the named MachineDeployment
// by one. It is a best-effort operation: errors are logged but not returned, because it is used in
// cleanup paths where the caller cannot meaningfully handle a failure.
func (c *CloudProvider) tryDecrementMachineDeploymentReplicas(ctx context.Context, mdNamespace, mdName string) {
cleanupCtx := context.WithoutCancel(ctx)
if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
md, err := c.machineDeploymentProvider.Get(cleanupCtx, mdName, mdNamespace)
if err != nil {
return err
}
if md.Spec.Replicas != nil && *md.Spec.Replicas > 0 {
md.Spec.Replicas = ptr.To(*md.Spec.Replicas - 1)
return c.machineDeploymentProvider.Update(cleanupCtx, md)
}
return nil
}); retryErr != nil {
log.Println(fmt.Errorf("error decrementing replicas for MachineDeployment %q/%q: %w", mdNamespace, mdName, retryErr))
}
}

func (c *CloudProvider) machineDeploymentFromMachine(ctx context.Context, machine *capiv1beta1.Machine) (*capiv1beta1.MachineDeployment, error) {
mdName, found := machine.GetLabels()[capiv1beta1.MachineDeploymentNameLabel]
if !found {
Expand Down Expand Up @@ -487,12 +570,16 @@ func (c *CloudProvider) pollForUnclaimedMachineInMachineDeploymentWithTimeout(ct
// this might need to ignore the error for the sake of the timeout
return false, fmt.Errorf("error listing unclaimed Machines for MachineDeployment %q: %w", machineDeployment.Name, err)
}
if len(machineList) == 0 {
return false, nil
for _, m := range machineList {
// Skip Machines that are already being deleted; claiming one would bind the NodeClaim
// to a Machine that will never become a Node, and the subsequent Delete call would
// not reduce the replica count (IsDeleting guard), leaving MD permanently elevated.
if m.GetDeletionTimestamp().IsZero() {
machine = m
return true, nil
}
}

machine = machineList[0]
return true, nil
return false, nil
})
if err != nil {
return nil, fmt.Errorf("error polling for an unclaimed Machine in MachineDeployment %q: %w", machineDeployment.Name, err)
Expand Down
Loading