Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 51 additions & 35 deletions pkg/controller/ipam/multi_cidr_range_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func NewMultiCIDRRangeAllocator(

// testCIDRMap is only set for testing purposes.
if len(testCIDRMap) > 0 {
ra.lock.Lock()
ra.cidrMap = testCIDRMap
ra.lock.Unlock()
logger.Info("TestCIDRMap should only be set for testing purposes, if this is seen in production logs, it might be a misconfiguration or a bug")
}

Expand Down Expand Up @@ -256,26 +258,29 @@ func NewMultiCIDRRangeAllocator(
logger.Info("failed to add event handler to clusterCIDRInformer", "err", err)
}

ra.lock.Lock()
if allocatorParams.ServiceCIDR != nil {
ra.filterOutServiceRange(logger, allocatorParams.ServiceCIDR)
ra.filterOutServiceRange(logger, allocatorParams.ServiceCIDR, ra.cidrMap)
} else {
logger.Info("No Service CIDR provided. Skipping filtering out service addresses")
}

if allocatorParams.SecondaryServiceCIDR != nil {
ra.filterOutServiceRange(logger, allocatorParams.SecondaryServiceCIDR)
ra.filterOutServiceRange(logger, allocatorParams.SecondaryServiceCIDR, ra.cidrMap)
} else {
logger.Info("No Secondary Service CIDR provided. Skipping filtering out secondary service addresses")
}
ra.lock.Unlock()

if nodeList != nil {
ra.lock.Lock()
for _, node := range nodeList.Items {
if len(node.Spec.PodCIDRs) == 0 {
logger.V(4).Info("Node has no CIDR, ignoring", "node", klog.KObj(&node))
continue
}
logger.Info("Node has CIDR, occupying it in CIDR map", "node", klog.KObj(&node), "podCIDRs", node.Spec.PodCIDRs)
if err := ra.occupyCIDRs(logger, &node); err != nil {
if err := ra.occupyCIDRs(logger, &node, ra.cidrMap); err != nil {
// This will happen if:
// 1. We find garbage in the podCIDRs field. Retrying is useless.
// 2. CIDR out of range: This means ClusterCIDR is not yet created
Expand All @@ -284,6 +289,7 @@ func NewMultiCIDRRangeAllocator(
logger.Info("Node CIDR has no associated ClusterCIDR, skipping", "node", klog.KObj(&node), "error", err)
}
}
ra.lock.Unlock()
}

_, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -510,11 +516,12 @@ func (r *multiCIDRRangeAllocator) syncClusterCIDR(ctx context.Context, key strin
}

// occupyCIDRs marks node.PodCIDRs[...] as used in allocator's tracked cidrSet.
func (r *multiCIDRRangeAllocator) occupyCIDRs(logger klog.Logger, node *corev1.Node) error {
// Requires the caller to hold r.lock.
func (r *multiCIDRRangeAllocator) occupyCIDRs(logger klog.Logger, node *corev1.Node, cidrMap map[string][]*cidrset.ClusterCIDR) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment on these functions would be more useful, to point out that access to cidrMap should be protected by a lock before passing in for this and the similar functions below.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too. i will make a comment on the functions

if len(node.Spec.PodCIDRs) == 0 {
return nil
}
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true)
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true, cidrMap)
if err != nil {
return err
}
Expand Down Expand Up @@ -617,10 +624,10 @@ func (r *multiCIDRRangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node
}

if len(node.Spec.PodCIDRs) > 0 {
return r.occupyCIDRs(logger, node)
return r.occupyCIDRs(logger, node, r.cidrMap)
}

cidrs, clusterCIDR, err := r.prioritizedCIDRs(logger, node)
cidrs, clusterCIDR, err := r.prioritizedCIDRs(logger, node, r.cidrMap)
if err != nil {
controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to get cidrs for node %s: %w", node.Name, err)
Expand Down Expand Up @@ -652,7 +659,7 @@ func (r *multiCIDRRangeAllocator) ReleaseCIDR(logger klog.Logger, node *corev1.N
return nil
}

clusterCIDR, err := r.allocatedClusterCIDR(node)
clusterCIDR, err := r.allocatedClusterCIDR(node, r.cidrMap)
if err != nil {
return err
}
Expand All @@ -677,13 +684,14 @@ func (r *multiCIDRRangeAllocator) ReleaseCIDR(logger klog.Logger, node *corev1.N

// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used across all cidrs
// so that they won't be assignable.
func (r *multiCIDRRangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *net.IPNet) {
// filterOutServiceRange requires the caller to hold r.lock.
func (r *multiCIDRRangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *net.IPNet, cidrMap map[string][]*cidrset.ClusterCIDR) {
// Checks if service CIDR has a nonempty intersection with cluster
// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
// clusterCIDR's Mask applied (this means that clusterCIDR contains
// serviceCIDR) or vice versa (which means that serviceCIDR contains
// clusterCIDR).
for _, clusterCIDRList := range r.cidrMap {
for _, clusterCIDRList := range cidrMap {
for _, clusterCIDR := range clusterCIDRList {
if err := r.occupyServiceCIDR(clusterCIDR, serviceCIDR); err != nil {
logger.Error(err, "Unable to occupy service CIDR")
Expand Down Expand Up @@ -795,21 +803,22 @@ func defaultNodeSelector() *corev1.NodeSelector {
}
}

// prioritizedCIDRs requires the caller to hold r.lock.
// prioritizedCIDRs returns a list of CIDRs to be allocated to the node.
// Returns 1 CIDR if single stack.
// Returns 2 CIDRs , 1 from each ip family if dual stack.
func (r *multiCIDRRangeAllocator) prioritizedCIDRs(
logger klog.Logger, node *corev1.Node,
logger klog.Logger, node *corev1.Node, cidrMap map[string][]*cidrset.ClusterCIDR,
) ([]*net.IPNet, *cidrset.ClusterCIDR, error) {
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true)
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, true, cidrMap)
if err != nil {
return nil, nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err)
}

for _, clusterCIDR := range clusterCIDRList {
cidrs := make([]*net.IPNet, 0)
if clusterCIDR.IPv4CIDRSet != nil {
cidr, err := r.allocateCIDR(logger, clusterCIDR, clusterCIDR.IPv4CIDRSet)
cidr, err := r.allocateCIDR(logger, clusterCIDR, clusterCIDR.IPv4CIDRSet, cidrMap)
if err != nil {
logger.V(3).Info("Unable to allocate IPv4 CIDR, trying next range", "err", err)
continue
Expand All @@ -818,7 +827,7 @@ func (r *multiCIDRRangeAllocator) prioritizedCIDRs(
}

if clusterCIDR.IPv6CIDRSet != nil {
cidr, err := r.allocateCIDR(logger, clusterCIDR, clusterCIDR.IPv6CIDRSet)
cidr, err := r.allocateCIDR(logger, clusterCIDR, clusterCIDR.IPv6CIDRSet, cidrMap)
if err != nil {
logger.V(3).Info("Unable to allocate IPv6 CIDR, trying next range", "err", err)
continue
Expand All @@ -831,8 +840,9 @@ func (r *multiCIDRRangeAllocator) prioritizedCIDRs(
return nil, nil, fmt.Errorf("unable to get a clusterCIDR for node %s, no available CIDRs", node.Name)
}

// allocateCIDR requires the caller to hold r.lock.
func (r *multiCIDRRangeAllocator) allocateCIDR(
logger klog.Logger, clusterCIDR *cidrset.ClusterCIDR, cidrSet *cidrset.MultiCIDRSet,
logger klog.Logger, clusterCIDR *cidrset.ClusterCIDR, cidrSet *cidrset.MultiCIDRSet, cidrMap map[string][]*cidrset.ClusterCIDR,
) (*net.IPNet, error) {
for evaluated := 0; evaluated < cidrSet.MaxCIDRs; evaluated++ {
candidate, lastEvaluated, err := cidrSet.NextCandidate()
Expand All @@ -842,12 +852,12 @@ func (r *multiCIDRRangeAllocator) allocateCIDR(

evaluated += lastEvaluated

if r.cidrInAllocatedList(logger, candidate) {
if r.cidrInAllocatedList(logger, candidate, cidrMap) {
continue
}

// Deep Check.
if r.cidrOverlapWithAllocatedList(logger, candidate) {
if r.cidrOverlapWithAllocatedList(logger, candidate, cidrMap) {
continue
}

Expand All @@ -864,8 +874,9 @@ func (r *multiCIDRRangeAllocator) allocateCIDR(
}
}

func (r *multiCIDRRangeAllocator) cidrInAllocatedList(logger klog.Logger, cidr *net.IPNet) bool {
for _, clusterCIDRList := range r.cidrMap {
// cidrInAllocatedList requires the caller to hold r.lock.
func (r *multiCIDRRangeAllocator) cidrInAllocatedList(logger klog.Logger, cidr *net.IPNet, cidrMap map[string][]*cidrset.ClusterCIDR) bool {
for _, clusterCIDRList := range cidrMap {
for _, clusterCIDR := range clusterCIDRList {
cidrSet, err := r.associatedCIDRSet(clusterCIDR, cidr)
if err != nil {
Expand All @@ -882,8 +893,9 @@ func (r *multiCIDRRangeAllocator) cidrInAllocatedList(logger klog.Logger, cidr *
return false
}

func (r *multiCIDRRangeAllocator) cidrOverlapWithAllocatedList(logger klog.Logger, cidr *net.IPNet) bool {
for _, clusterCIDRList := range r.cidrMap {
// cidrOverlapWithAllocatedList requires the caller to hold r.lock.
func (r *multiCIDRRangeAllocator) cidrOverlapWithAllocatedList(logger klog.Logger, cidr *net.IPNet, cidrMap map[string][]*cidrset.ClusterCIDR) bool {
for _, clusterCIDRList := range cidrMap {
for _, clusterCIDR := range clusterCIDRList {
cidrSet, err := r.associatedCIDRSet(clusterCIDR, cidr)
if err != nil {
Expand All @@ -900,9 +912,10 @@ func (r *multiCIDRRangeAllocator) cidrOverlapWithAllocatedList(logger klog.Logge
return false
}

// allocatedClusterCIDR requires the caller to hold r.lock.
// allocatedClusterCIDR returns the ClusterCIDR from which the node CIDRs were allocated.
func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *corev1.Node) (*cidrset.ClusterCIDR, error) {
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, false)
func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *corev1.Node, cidrMap map[string][]*cidrset.ClusterCIDR) (*cidrset.ClusterCIDR, error) {
clusterCIDRList, err := r.orderedMatchingClusterCIDRs(node, false, cidrMap)
if err != nil {
return nil, fmt.Errorf("unable to get a clusterCIDR for node %s: %w", node.Name, err)
}
Expand All @@ -915,6 +928,7 @@ func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *corev1.Node) (*cidr
return nil, fmt.Errorf("no clusterCIDR found associated with node: %s", node.Name)
}

// orderedMatchingClusterCIDRs requires the caller to hold r.lock.
// orderedMatchingClusterCIDRs returns a list of all the ClusterCIDRs matching the node labels.
// The list is ordered with the following priority, which act as tie-breakers.
// P0: ClusterCIDR with higher number of matching labels has the highest priority.
Expand All @@ -926,11 +940,11 @@ func (r *multiCIDRRangeAllocator) allocatedClusterCIDR(node *corev1.Node) (*cidr
// orderedMatchingClusterCIDRs takes `occupy` as an argument, it determines whether the function
// is called during an occupy or a release operation. For a release operation, a ClusterCIDR must
// be added to the matching ClusterCIDRs list, irrespective of whether the ClusterCIDR is terminating.
func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *corev1.Node, occupy bool) ([]*cidrset.ClusterCIDR, error) {
func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *corev1.Node, occupy bool, cidrMap map[string][]*cidrset.ClusterCIDR) ([]*cidrset.ClusterCIDR, error) {
matchingCIDRs := make([]*cidrset.ClusterCIDR, 0)
pq := make(PriorityQueue, 0)

for label, clusterCIDRList := range r.cidrMap {
for label, clusterCIDRList := range cidrMap {
labelsMatch, matchCnt, err := r.matchCIDRLabels(node, label)
if err != nil {
return nil, err
Expand Down Expand Up @@ -968,7 +982,7 @@ func (r *multiCIDRRangeAllocator) orderedMatchingClusterCIDRs(node *corev1.Node,
if err != nil {
return nil, err
}
if clusterCIDRList, ok := r.cidrMap[defaultSelector.String()]; ok {
if clusterCIDRList, ok := cidrMap[defaultSelector.String()]; ok {
matchingCIDRs = append(matchingCIDRs, clusterCIDRList...)
}
return matchingCIDRs, nil
Expand Down Expand Up @@ -1081,7 +1095,7 @@ func (r *multiCIDRRangeAllocator) reconcileCreate(ctx context.Context, clusterCI

logger := klog.FromContext(ctx)
logger.V(3).Info("Reconciling ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
if err := r.createClusterCIDR(ctx, clusterCIDR, false); err != nil {
if err := r.createClusterCIDR(ctx, clusterCIDR, false, r.cidrMap); err != nil {
logger.Error(err, "failed to reconcile ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
return err
}
Expand All @@ -1104,16 +1118,17 @@ func (r *multiCIDRRangeAllocator) reconcileBootstrap(ctx context.Context, cluste
}

logger.V(2).Info("Creating ClusterCIDR during bootstrap", "clusterCIDR", clusterCIDR.Name)
if err := r.createClusterCIDR(ctx, clusterCIDR, terminating); err != nil {
if err := r.createClusterCIDR(ctx, clusterCIDR, terminating, r.cidrMap); err != nil {
logger.Error(err, "Unable to create ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
return err
}

return nil
}

// createClusterCIDR requires the caller to hold r.lock.
// createClusterCIDR creates and maps the cidrSets in the cidrMap.
func (r *multiCIDRRangeAllocator) createClusterCIDR(ctx context.Context, clusterCIDR *v1.ClusterCIDR, terminating bool) error {
func (r *multiCIDRRangeAllocator) createClusterCIDR(ctx context.Context, clusterCIDR *v1.ClusterCIDR, terminating bool, cidrMap map[string][]*cidrset.ClusterCIDR) error {
nodeSelector, err := r.nodeSelectorKey(clusterCIDR)
if err != nil {
return fmt.Errorf("unable to get labelSelector key: %w", err)
Expand All @@ -1128,7 +1143,7 @@ func (r *multiCIDRRangeAllocator) createClusterCIDR(ctx context.Context, cluster
return errors.New("invalid ClusterCIDR: must provide IPv4 and/or IPv6 config")
}

if err := r.mapClusterCIDRSet(r.cidrMap, nodeSelector, clusterCIDRSet); err != nil {
if err := r.mapClusterCIDRSet(cidrMap, nodeSelector, clusterCIDRSet); err != nil {
return fmt.Errorf("unable to map clusterCIDRSet: %w", err)
}

Expand Down Expand Up @@ -1222,7 +1237,7 @@ func (r *multiCIDRRangeAllocator) reconcileDelete(ctx context.Context, clusterCI
logger := klog.FromContext(ctx)
if slices.Contains(clusterCIDR.GetFinalizers(), clusterCIDRFinalizer) {
logger.V(2).Info("Releasing ClusterCIDR", "clusterCIDR", clusterCIDR.Name)
if err := r.deleteClusterCIDR(logger, clusterCIDR); err != nil {
if err := r.deleteClusterCIDR(logger, clusterCIDR, r.cidrMap); err != nil {
logger.V(2).Info("Error while deleting ClusterCIDR", "err", err)
return err
}
Expand All @@ -1241,14 +1256,15 @@ func (r *multiCIDRRangeAllocator) reconcileDelete(ctx context.Context, clusterCI
return nil
}

// deleteClusterCIDR requires the caller to hold r.lock.
// deleteClusterCIDR Deletes and unmaps the ClusterCIDRs from the cidrMap.
func (r *multiCIDRRangeAllocator) deleteClusterCIDR(logger klog.Logger, clusterCIDR *v1.ClusterCIDR) error {
func (r *multiCIDRRangeAllocator) deleteClusterCIDR(logger klog.Logger, clusterCIDR *v1.ClusterCIDR, cidrMap map[string][]*cidrset.ClusterCIDR) error {
labelSelector, err := r.nodeSelectorKey(clusterCIDR)
if err != nil {
return fmt.Errorf("unable to delete cidr: %w", err)
}

clusterCIDRSetList, ok := r.cidrMap[labelSelector]
clusterCIDRSetList, ok := cidrMap[labelSelector]
if !ok {
logger.Info("Label not found in CIDRMap, proceeding with delete", "labelSelector", labelSelector)
return nil
Expand All @@ -1270,12 +1286,12 @@ func (r *multiCIDRRangeAllocator) deleteClusterCIDR(logger klog.Logger, clusterC
// Remove the label from the map if this was the only clusterCIDR associated
// with it.
if len(clusterCIDRSetList) == 1 {
delete(r.cidrMap, labelSelector)
delete(cidrMap, labelSelector)
return nil
}

clusterCIDRSetList = append(clusterCIDRSetList[:i], clusterCIDRSetList[i+1:]...)
r.cidrMap[labelSelector] = clusterCIDRSetList
cidrMap[labelSelector] = clusterCIDRSetList
return nil
}
logger.V(2).Info("clusterCIDR not found, proceeding with delete", "clusterCIDR", clusterCIDR.Name, "label", labelSelector)
Expand Down
22 changes: 20 additions & 2 deletions pkg/controller/ipam/multi_cidr_range_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,9 @@ func TestMultiCIDRAllocateOrOccupyCIDRSuccess(t *testing.T) {
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
}

rangeAllocator.lock.Lock()
clusterCIDRList, err := getClusterCIDRList("node0", rangeAllocator.cidrMap)
rangeAllocator.lock.Unlock()
if err != nil {
t.Fatalf("%v: unexpected error when getting associated clusterCIDR for node %v %v", tc.description, "node0", err)
}
Expand Down Expand Up @@ -1349,7 +1351,9 @@ func TestMultiCIDRAllocateOrOccupyCIDRFailure(t *testing.T) {
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
}

rangeAllocator.lock.Lock()
clusterCIDRList, err := getClusterCIDRList("node0", rangeAllocator.cidrMap)
rangeAllocator.lock.Unlock()
if err != nil {
t.Fatalf("%v: unexpected error when getting associated clusterCIDR for node %v %v", tc.description, "node0", err)
}
Expand Down Expand Up @@ -1538,7 +1542,9 @@ func TestMultiCIDRReleaseCIDRSuccess(t *testing.T) {
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
}

rangeAllocator.lock.Lock()
clusterCIDRList, err := getClusterCIDRList("node0", rangeAllocator.cidrMap)
rangeAllocator.lock.Unlock()
if err != nil {
t.Fatalf("%v: unexpected error when getting associated clusterCIDR for node %v %v", tc.description, "node0", err)
}
Expand Down Expand Up @@ -1826,8 +1832,10 @@ func TestSyncClusterCIDRDeleteWithNodesAssociated(t *testing.T) {

// Mock the IPAM controller behavior associating node with ClusterCIDR.
nodeSelectorKey, _ := cccController.nodeSelectorKey(testCCC)
cccController.lock.Lock()
clusterCIDRs := cccController.cidrMap[nodeSelectorKey]
clusterCIDRs[0].AssociatedNodes["test-node"] = true
cccController.lock.Unlock()

createdCCC, err := client.NetworkingV1().ClusterCIDRs().Get(context.TODO(), testCCC.Name, metav1.GetOptions{})
assert.Nil(t, err, "Expected no error getting clustercidr object")
Expand Down Expand Up @@ -1866,8 +1874,18 @@ func TestMultiCIDRSetDataRace(t *testing.T) {
wg.Add(4)
go func() { defer wg.Done(); cidrSet.Occupy(lookupCIDR) }()
go func() { defer wg.Done(); cidrSet.Release(lookupCIDR) }()
go func() { defer wg.Done(); ra.cidrInAllocatedList(logger, lookupCIDR) }()
go func() { defer wg.Done(); ra.cidrOverlapWithAllocatedList(logger, lookupCIDR) }()
go func() {
defer wg.Done()
ra.lock.Lock()
ra.cidrInAllocatedList(logger, lookupCIDR, ra.cidrMap)
ra.lock.Unlock()
}()
go func() {
defer wg.Done()
ra.lock.Lock()
ra.cidrOverlapWithAllocatedList(logger, lookupCIDR, ra.cidrMap)
ra.lock.Unlock()
}()
wg.Wait()
}

Expand Down