From e76608b39d09387fdc2f60219436f6fbc1f09d14 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Thu, 19 Feb 2026 11:17:16 +0100 Subject: [PATCH] WIP: device manager Signed-off-by: Francesco Romani --- pkg/device/attributes.go | 52 ++++++ pkg/device/consts.go | 32 ++++ pkg/device/grouped_numa.go | 133 +++++++++++++++ pkg/device/grouped_socket.go | 124 ++++++++++++++ pkg/device/individual.go | 140 ++++++++++++++++ pkg/driver/dra_hooks.go | 317 +---------------------------------- pkg/driver/driver.go | 60 ++++--- 7 files changed, 524 insertions(+), 334 deletions(-) create mode 100644 pkg/device/attributes.go create mode 100644 pkg/device/consts.go create mode 100644 pkg/device/grouped_numa.go create mode 100644 pkg/device/grouped_socket.go create mode 100644 pkg/device/individual.go diff --git a/pkg/device/attributes.go b/pkg/device/attributes.go new file mode 100644 index 0000000..6c9a924 --- /dev/null +++ b/pkg/device/attributes.go @@ -0,0 +1,52 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package device + +import ( + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpuinfo" + resourceapi "k8s.io/api/resource/v1" + "k8s.io/utils/cpuset" +) + +func MakeIndividualAttributes(cpu cpuinfo.CPUInfo) map[resourceapi.QualifiedName]resourceapi.DeviceAttribute { + numaNode := int64(cpu.NUMANodeID) + cacheL3ID := int64(cpu.UncoreCacheID) + socketID := int64(cpu.SocketID) + coreID := int64(cpu.CoreID) + cpuID := int64(cpu.CpuID) + coreType := cpu.CoreType.String() + return map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "dra.cpu/numaNodeID": {IntValue: &numaNode}, + "dra.cpu/cacheL3ID": {IntValue: &cacheL3ID}, + "dra.cpu/coreType": {StringValue: &coreType}, + "dra.cpu/socketID": {IntValue: &socketID}, + "dra.cpu/coreID": {IntValue: &coreID}, + "dra.cpu/cpuID": {IntValue: &cpuID}, + // TODO(pravk03): Remove. Hack to align with NIC (DRANet). We need some standard attribute to align other resources with CPU. + "dra.net/numaNode": {IntValue: &numaNode}, + } +} + +func MakeGroupedAttributes(topo *cpuinfo.CPUTopology, socketID int64, allocatableCPUs cpuset.CPUSet) map[resourceapi.QualifiedName]resourceapi.DeviceAttribute { + smtEnabled := topo.SMTEnabled + availableCPUs := int64(allocatableCPUs.Size()) + return map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ + "dra.cpu/socketID": {IntValue: &socketID}, + "dra.cpu/numCPUs": {IntValue: &availableCPUs}, + "dra.cpu/smtEnabled": {BoolValue: &smtEnabled}, + } +} diff --git a/pkg/device/consts.go b/pkg/device/consts.go new file mode 100644 index 0000000..a5d7dee --- /dev/null +++ b/pkg/device/consts.go @@ -0,0 +1,32 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package device + +const ( + // maxDevicesPerResourceSlice is the maximum number of devices that can be packed into a single + // ResourceSlice object. This is a hard limit defined in the Kubernetes API at + // https://github.com/kubernetes/kubernetes/blob/8e6d788887034b799f6c2a86991a68a080bb0576/pkg/apis/resource/types.go#L245 + maxDevicesPerResourceSlice = 128 + cpuDevicePrefix = "cpudev" + + // Grouped Mode + // cpuResourceQualifiedName is the qualified name for the CPU resource capacity. + cpuResourceQualifiedName = "dra.cpu/cpu" + + cpuDeviceSocketGroupedPrefix = "cpudevsocket" + cpuDeviceNUMAGroupedPrefix = "cpudevnuma" +) diff --git a/pkg/device/grouped_numa.go b/pkg/device/grouped_numa.go new file mode 100644 index 0000000..57a5583 --- /dev/null +++ b/pkg/device/grouped_numa.go @@ -0,0 +1,133 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package device + +import ( + "fmt" + + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpuinfo" + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpumanager" + resourceapi "k8s.io/api/resource/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + "k8s.io/utils/cpuset" + "k8s.io/utils/ptr" +) + +type NUMAGroupedManager struct { + driverName string + cpuTopology *cpuinfo.CPUTopology + reservedCPUs cpuset.CPUSet + getSharedCPUs func() cpuset.CPUSet + deviceNameToNUMANodeID map[string]int +} + +func NewNUMAGroupedManager(name string, topo *cpuinfo.CPUTopology, resv cpuset.CPUSet, getSharedCPUs func() cpuset.CPUSet) *NUMAGroupedManager { + return &NUMAGroupedManager{ + driverName: name, + cpuTopology: topo, + reservedCPUs: resv, + getSharedCPUs: getSharedCPUs, + deviceNameToNUMANodeID: make(map[string]int), + } +} + +func (mgr *NUMAGroupedManager) CreateSlices(_ klog.Logger) [][]resourceapi.Device { + klog.Info("Creating grouped CPU devices", "groupBy", "NUMANode") + var devices []resourceapi.Device + + numaNodeIDs := mgr.cpuTopology.CPUDetails.NUMANodes().List() + for _, numaIDInt := range numaNodeIDs { + numaID := int64(numaIDInt) + deviceName := fmt.Sprintf("%s%03d", cpuDeviceNUMAGroupedPrefix, numaIDInt) + numaNodeCPUSet := mgr.cpuTopology.CPUDetails.CPUsInNUMANodes(numaIDInt) + allocatableCPUs := numaNodeCPUSet.Difference(mgr.reservedCPUs) + availableCPUsInNUMANode := int64(allocatableCPUs.Size()) + + if allocatableCPUs.Size() == 0 { + continue + } + + // All CPUs in a NUMA node belong to the same socket. + anyCPU := allocatableCPUs.UnsortedList()[0] + socketID := int64(mgr.cpuTopology.CPUDetails[anyCPU].SocketID) + + deviceCapacity := map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + cpuResourceQualifiedName: {Value: *resource.NewQuantity(availableCPUsInNUMANode, resource.DecimalSI)}, + } + + mgr.deviceNameToNUMANodeID[deviceName] = numaIDInt + + deviceAttributes := MakeGroupedAttributes(mgr.cpuTopology, socketID, allocatableCPUs) + deviceAttributes["dra.cpu/numaNodeID"] = resourceapi.DeviceAttribute{IntValue: &numaID} + // TODO(pravk03): Remove. Hack to align with NIC (DRANet). We need some standard attribute to align other resources with CPU. + deviceAttributes["dra.net/numaNode"] = resourceapi.DeviceAttribute{IntValue: &numaID} + + devices = append(devices, resourceapi.Device{ + Name: deviceName, + Attributes: deviceAttributes, + Capacity: deviceCapacity, + AllowMultipleAllocations: ptr.To(true), + }) + } + + if len(devices) == 0 { + return nil + } + return [][]resourceapi.Device{devices} +} + +func (mgr *NUMAGroupedManager) AllocateCPUs(logger klog.Logger, claim *resourceapi.ResourceClaim) (cpuset.CPUSet, error) { + logger = klog.LoggerWithValues(logger, "claim", claim.Namespace+"/"+claim.Name) + + var cpuAssignment cpuset.CPUSet + + for _, alloc := range claim.Status.Allocation.Devices.Results { + claimCPUCount := int64(0) + if alloc.Driver != mgr.driverName { + continue + } + if quantity, ok := alloc.ConsumedCapacity[cpuResourceQualifiedName]; ok { + count := quantity.Value() + claimCPUCount = count + logger.Info("Found CPUs request", "CPUCount", count, "device", alloc.Device) + } + + var availableCPUsForDevice cpuset.CPUSet + numaNodeID, ok := mgr.deviceNameToNUMANodeID[alloc.Device] + if !ok { + return cpuset.CPUSet{}, fmt.Errorf("no valid NUMA node ID found for device %s", alloc.Device) + } + numaCPUs := mgr.cpuTopology.CPUDetails.CPUsInNUMANodes(numaNodeID) + availableCPUsForDevice = mgr.getSharedCPUs().Intersection(numaCPUs) + logger.Info("available CPUs", "NUMANode", numaNodeID, "totalCPUs", numaCPUs.String(), "availableCPUs", availableCPUsForDevice.String()) + + cur, err := cpumanager.TakeByTopologyNUMAPacked(logger, mgr.cpuTopology, availableCPUsForDevice, int(claimCPUCount), cpumanager.CPUSortingStrategyPacked, true) + if err != nil { + return cpuset.CPUSet{}, err + } + cpuAssignment = cpuAssignment.Union(cur) + logger.Info("CPU assignment", "device", alloc.Device, "partialCPUs", cur.String(), "totalCPUs", cpuAssignment.String()) + } + + if cpuAssignment.Size() == 0 { + logger.V(5).Info("AllocateCPUs no CPU allocations for this driver") + return cpuset.CPUSet{}, nil + } + + return cpuAssignment, nil +} diff --git a/pkg/device/grouped_socket.go b/pkg/device/grouped_socket.go new file mode 100644 index 0000000..6f7e793 --- /dev/null +++ b/pkg/device/grouped_socket.go @@ -0,0 +1,124 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package device + +import ( + "fmt" + + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpuinfo" + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpumanager" + resourceapi "k8s.io/api/resource/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + "k8s.io/utils/cpuset" + "k8s.io/utils/ptr" +) + +type SocketGroupedManager struct { + driverName string + cpuTopology *cpuinfo.CPUTopology + reservedCPUs cpuset.CPUSet + getSharedCPUs func() cpuset.CPUSet + deviceNameToSocketID map[string]int +} + +func NewSocketGroupedManager(name string, topo *cpuinfo.CPUTopology, resv cpuset.CPUSet, getSharedCPUs func() cpuset.CPUSet) *SocketGroupedManager { + return &SocketGroupedManager{ + driverName: name, + cpuTopology: topo, + reservedCPUs: resv, + getSharedCPUs: getSharedCPUs, + deviceNameToSocketID: make(map[string]int), + } +} + +func (mgr *SocketGroupedManager) CreateSlices(logger klog.Logger) [][]resourceapi.Device { + logger.Info("Creating grouped CPU devices", "groupBy", "Socket") + var devices []resourceapi.Device + + socketIDs := mgr.cpuTopology.CPUDetails.Sockets().List() + for _, socketIDInt := range socketIDs { + socketID := int64(socketIDInt) + deviceName := fmt.Sprintf("%s%03d", cpuDeviceSocketGroupedPrefix, socketIDInt) + socketCPUSet := mgr.cpuTopology.CPUDetails.CPUsInSockets(socketIDInt) + allocatableCPUs := socketCPUSet.Difference(mgr.reservedCPUs) + availableCPUsInSocket := int64(allocatableCPUs.Size()) + + if allocatableCPUs.Size() == 0 { + continue + } + + deviceCapacity := map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ + cpuResourceQualifiedName: {Value: *resource.NewQuantity(availableCPUsInSocket, resource.DecimalSI)}, + } + + mgr.deviceNameToSocketID[deviceName] = socketIDInt + + devices = append(devices, resourceapi.Device{ + Name: deviceName, + Attributes: MakeGroupedAttributes(mgr.cpuTopology, socketID, allocatableCPUs), + Capacity: deviceCapacity, + AllowMultipleAllocations: ptr.To(true), + }) + } + + if len(devices) == 0 { + return nil + } + return [][]resourceapi.Device{devices} +} + +func (mgr *SocketGroupedManager) AllocateCPUs(logger klog.Logger, claim *resourceapi.ResourceClaim) (cpuset.CPUSet, error) { + logger = klog.LoggerWithValues(logger, "claim", claim.Namespace+"/"+claim.Name) + + var cpuAssignment cpuset.CPUSet + + for _, alloc := range claim.Status.Allocation.Devices.Results { + claimCPUCount := int64(0) + if alloc.Driver != mgr.driverName { + continue + } + if quantity, ok := alloc.ConsumedCapacity[cpuResourceQualifiedName]; ok { + count := quantity.Value() + claimCPUCount = count + logger.Info("Found CPUs request", "CPUCount", count, "device", alloc.Device) + } + + var availableCPUsForDevice cpuset.CPUSet + socketID, ok := mgr.deviceNameToSocketID[alloc.Device] + if !ok { + return cpuset.CPUSet{}, fmt.Errorf("no valid socket ID found for device %s", alloc.Device) + } + socketCPUs := mgr.cpuTopology.CPUDetails.CPUsInSockets(socketID) + availableCPUsForDevice = mgr.getSharedCPUs().Intersection(socketCPUs) + logger.Info("available CPUs", "Socket", socketID, "totalCPUs", socketCPUs.String(), "availableCPUs", availableCPUsForDevice.String()) + + cur, err := cpumanager.TakeByTopologyNUMAPacked(logger, mgr.cpuTopology, availableCPUsForDevice, int(claimCPUCount), cpumanager.CPUSortingStrategyPacked, true) + if err != nil { + return cpuset.CPUSet{}, err + } + cpuAssignment = cpuAssignment.Union(cur) + logger.Info("CPU assignment", "device", alloc.Device, "partialCPUs", cur.String(), "totalCPUs", cpuAssignment.String()) + } + + if cpuAssignment.Size() == 0 { + logger.V(5).Info("AllocateCPUs no CPU allocations for this driver") + return cpuset.CPUSet{}, nil + } + + return cpuAssignment, nil +} diff --git a/pkg/device/individual.go b/pkg/device/individual.go new file mode 100644 index 0000000..c8450bf --- /dev/null +++ b/pkg/device/individual.go @@ -0,0 +1,140 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package device + +import ( + "fmt" + "slices" + "sort" + + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpuinfo" + resourceapi "k8s.io/api/resource/v1" + "k8s.io/klog/v2" + "k8s.io/utils/cpuset" +) + +// IndividualCoreManager manages Device objects based on the CPU topology. +// It groups CPUs by physical core to assign consecutive device IDs to hyperthreads. +// This allows the DRA scheduler, which requests resources in contiguous blocks, +// to co-locate workloads on hyperthreads of the same core. +type IndividualCoreManager struct { + driverName string + cpuTopology *cpuinfo.CPUTopology + reservedCPUs cpuset.CPUSet + deviceNameToCPUID map[string]int +} + +func NewIndividualCoreManager(name string, topo *cpuinfo.CPUTopology, resv cpuset.CPUSet) *IndividualCoreManager { + return &IndividualCoreManager{ + driverName: name, + cpuTopology: topo, + reservedCPUs: resv, + deviceNameToCPUID: make(map[string]int), + } +} + +func (mgr *IndividualCoreManager) CreateSlices(_ klog.Logger) [][]resourceapi.Device { + reservedCPUs := make(map[int]bool) + for _, cpuID := range mgr.reservedCPUs.List() { + reservedCPUs[cpuID] = true + } + + var availableCPUs []cpuinfo.CPUInfo + topo := mgr.cpuTopology + allCPUs := make([]cpuinfo.CPUInfo, 0, len(topo.CPUDetails)) + for _, cpu := range topo.CPUDetails { + allCPUs = append(allCPUs, cpu) + if !reservedCPUs[cpu.CpuID] { + availableCPUs = append(availableCPUs, cpu) + } + } + sort.Slice(availableCPUs, func(i, j int) bool { + return availableCPUs[i].CpuID < availableCPUs[j].CpuID + }) + + processedCpus := make(map[int]bool) + var coreGroups [][]cpuinfo.CPUInfo + cpuInfoMap := make(map[int]cpuinfo.CPUInfo) + for _, info := range allCPUs { + cpuInfoMap[info.CpuID] = info + } + + for _, cpu := range availableCPUs { + if processedCpus[cpu.CpuID] { + continue + } + if cpu.SiblingCpuID == -1 || reservedCPUs[cpu.SiblingCpuID] { + coreGroups = append(coreGroups, []cpuinfo.CPUInfo{cpu}) + processedCpus[cpu.CpuID] = true + } else { + coreGroups = append(coreGroups, []cpuinfo.CPUInfo{cpu, cpuInfoMap[cpu.SiblingCpuID]}) + processedCpus[cpu.CpuID] = true + processedCpus[cpu.SiblingCpuID] = true + } + } + + sort.Slice(coreGroups, func(i, j int) bool { + return coreGroups[i][0].CpuID < coreGroups[j][0].CpuID + }) + + devId := 0 + var allDevices []resourceapi.Device + for _, group := range coreGroups { + for _, cpu := range group { + deviceName := fmt.Sprintf("%s%03d", cpuDevicePrefix, devId) + devId++ + mgr.deviceNameToCPUID[deviceName] = cpu.CpuID + cpuDevice := resourceapi.Device{ + Name: deviceName, + Attributes: MakeIndividualAttributes(cpu), + Capacity: make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity), + } + allDevices = append(allDevices, cpuDevice) + } + } + + if len(allDevices) == 0 { + return nil + } + + // Chunk devices into slices of at most maxDevicesPerResourceSlice + return slices.Collect(slices.Chunk(allDevices, maxDevicesPerResourceSlice)) +} + +func (mgr *IndividualCoreManager) AllocateCPUs(logger klog.Logger, claim *resourceapi.ResourceClaim) (cpuset.CPUSet, error) { + logger = klog.LoggerWithValues(logger, "claim", claim.Namespace+"/"+claim.Name) + + claimCPUIDs := []int{} + + for _, alloc := range claim.Status.Allocation.Devices.Results { + if alloc.Driver != mgr.driverName { + continue + } + cpuID, ok := mgr.deviceNameToCPUID[alloc.Device] + if !ok { + return cpuset.CPUSet{}, fmt.Errorf("device %q not found in device to CPU ID map", alloc.Device) + } + claimCPUIDs = append(claimCPUIDs, cpuID) + } + + if len(claimCPUIDs) == 0 { + logger.V(5).Info("AllocateCPUs no CPU allocations for this driver") + return cpuset.CPUSet{}, nil + } + + return cpuset.New(claimCPUIDs...), nil +} diff --git a/pkg/driver/dra_hooks.go b/pkg/driver/dra_hooks.go index 76b33ed..9ac919b 100644 --- a/pkg/driver/dra_hooks.go +++ b/pkg/driver/dra_hooks.go @@ -19,219 +19,20 @@ package driver import ( "context" "fmt" - "slices" - "sort" - "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpuinfo" - "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpumanager" resourceapi "k8s.io/api/resource/v1" - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" "k8s.io/dynamic-resource-allocation/kubeletplugin" "k8s.io/dynamic-resource-allocation/resourceslice" "k8s.io/klog/v2" - "k8s.io/utils/cpuset" - "k8s.io/utils/ptr" cdiparser "tags.cncf.io/container-device-interface/pkg/parser" ) -const ( - // maxDevicesPerResourceSlice is the maximum number of devices that can be packed into a single - // ResourceSlice object. This is a hard limit defined in the Kubernetes API at - // https://github.com/kubernetes/kubernetes/blob/8e6d788887034b799f6c2a86991a68a080bb0576/pkg/apis/resource/types.go#L245 - maxDevicesPerResourceSlice = 128 - cpuDevicePrefix = "cpudev" - - // Grouped Mode - // cpuResourceQualifiedName is the qualified name for the CPU resource capacity. - cpuResourceQualifiedName = "dra.cpu/cpu" - - cpuDeviceSocketGroupedPrefix = "cpudevsocket" - cpuDeviceNUMAGroupedPrefix = "cpudevnuma" -) - -// createGroupedCPUDeviceSlices creates Device objects based on the CPU topology, grouped by a specific criteria. -func (cp *CPUDriver) createGroupedCPUDeviceSlices() [][]resourceapi.Device { - klog.Info("Creating grouped CPU devices", "groupBy", cp.cpuDeviceGroupBy) - var devices []resourceapi.Device - - topo := cp.cpuTopology - smtEnabled := topo.SMTEnabled - - switch cp.cpuDeviceGroupBy { - case GROUP_BY_SOCKET: - socketIDs := topo.CPUDetails.Sockets().List() - for _, socketIDInt := range socketIDs { - socketID := int64(socketIDInt) - deviceName := fmt.Sprintf("%s%03d", cpuDeviceSocketGroupedPrefix, socketIDInt) - socketCPUSet := topo.CPUDetails.CPUsInSockets(socketIDInt) - allocatableCPUs := socketCPUSet.Difference(cp.reservedCPUs) - availableCPUsInSocket := int64(allocatableCPUs.Size()) - - if allocatableCPUs.Size() == 0 { - continue - } - - deviceCapacity := map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ - cpuResourceQualifiedName: {Value: *resource.NewQuantity(availableCPUsInSocket, resource.DecimalSI)}, - } - - cp.deviceNameToSocketID[deviceName] = socketIDInt - - devices = append(devices, resourceapi.Device{ - Name: deviceName, - Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ - "dra.cpu/socketID": {IntValue: &socketID}, - "dra.cpu/numCPUs": {IntValue: &availableCPUsInSocket}, - "dra.cpu/smtEnabled": {BoolValue: &smtEnabled}, - }, - Capacity: deviceCapacity, - AllowMultipleAllocations: ptr.To(true), - }) - } - case GROUP_BY_NUMA_NODE: - numaNodeIDs := topo.CPUDetails.NUMANodes().List() - for _, numaIDInt := range numaNodeIDs { - numaID := int64(numaIDInt) - deviceName := fmt.Sprintf("%s%03d", cpuDeviceNUMAGroupedPrefix, numaIDInt) - numaNodeCPUSet := topo.CPUDetails.CPUsInNUMANodes(numaIDInt) - allocatableCPUs := numaNodeCPUSet.Difference(cp.reservedCPUs) - availableCPUsInNUMANode := int64(allocatableCPUs.Size()) - - if allocatableCPUs.Size() == 0 { - continue - } - - // All CPUs in a NUMA node belong to the same socket. - anyCPU := allocatableCPUs.UnsortedList()[0] - socketID := int64(topo.CPUDetails[anyCPU].SocketID) - - deviceCapacity := map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{ - cpuResourceQualifiedName: {Value: *resource.NewQuantity(availableCPUsInNUMANode, resource.DecimalSI)}, - } - - cp.deviceNameToNUMANodeID[deviceName] = numaIDInt - - devices = append(devices, resourceapi.Device{ - Name: deviceName, - Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ - "dra.cpu/numaNodeID": {IntValue: &numaID}, - "dra.cpu/socketID": {IntValue: &socketID}, - "dra.cpu/numCPUs": {IntValue: &availableCPUsInNUMANode}, - "dra.cpu/smtEnabled": {BoolValue: &smtEnabled}, - // TODO(pravk03): Remove. Hack to align with NIC (DRANet). We need some standard attribute to align other resources with CPU. - "dra.net/numaNode": {IntValue: &numaID}, - }, - Capacity: deviceCapacity, - AllowMultipleAllocations: ptr.To(true), - }) - } - } - - if len(devices) == 0 { - return nil - } - return [][]resourceapi.Device{devices} -} - -// CreateCPUDeviceSlices creates Device objects based on the CPU topology. -// It groups CPUs by physical core to assign consecutive device IDs to hyperthreads. -// This allows the DRA scheduler, which requests resources in contiguous blocks, -// to co-locate workloads on hyperthreads of the same core. -func (cp *CPUDriver) createCPUDeviceSlices() [][]resourceapi.Device { - reservedCPUs := make(map[int]bool) - for _, cpuID := range cp.reservedCPUs.List() { - reservedCPUs[cpuID] = true - } - - var availableCPUs []cpuinfo.CPUInfo - topo := cp.cpuTopology - allCPUs := make([]cpuinfo.CPUInfo, 0, len(topo.CPUDetails)) - for _, cpu := range topo.CPUDetails { - allCPUs = append(allCPUs, cpu) - if !reservedCPUs[cpu.CpuID] { - availableCPUs = append(availableCPUs, cpu) - } - } - sort.Slice(availableCPUs, func(i, j int) bool { - return availableCPUs[i].CpuID < availableCPUs[j].CpuID - }) - - processedCpus := make(map[int]bool) - var coreGroups [][]cpuinfo.CPUInfo - cpuInfoMap := make(map[int]cpuinfo.CPUInfo) - for _, info := range allCPUs { - cpuInfoMap[info.CpuID] = info - } - - for _, cpu := range availableCPUs { - if processedCpus[cpu.CpuID] { - continue - } - if cpu.SiblingCpuID == -1 || reservedCPUs[cpu.SiblingCpuID] { - coreGroups = append(coreGroups, []cpuinfo.CPUInfo{cpu}) - processedCpus[cpu.CpuID] = true - } else { - coreGroups = append(coreGroups, []cpuinfo.CPUInfo{cpu, cpuInfoMap[cpu.SiblingCpuID]}) - processedCpus[cpu.CpuID] = true - processedCpus[cpu.SiblingCpuID] = true - } - } - - sort.Slice(coreGroups, func(i, j int) bool { - return coreGroups[i][0].CpuID < coreGroups[j][0].CpuID - }) - - devId := 0 - var allDevices []resourceapi.Device - for _, group := range coreGroups { - for _, cpu := range group { - numaNode := int64(cpu.NUMANodeID) - cacheL3ID := int64(cpu.UncoreCacheID) - socketID := int64(cpu.SocketID) - coreID := int64(cpu.CoreID) - cpuID := int64(cpu.CpuID) - coreType := cpu.CoreType.String() - deviceName := fmt.Sprintf("%s%03d", cpuDevicePrefix, devId) - devId++ - cp.deviceNameToCPUID[deviceName] = cpu.CpuID - cpuDevice := resourceapi.Device{ - Name: deviceName, - Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{ - "dra.cpu/numaNodeID": {IntValue: &numaNode}, - "dra.cpu/cacheL3ID": {IntValue: &cacheL3ID}, - "dra.cpu/coreType": {StringValue: &coreType}, - "dra.cpu/socketID": {IntValue: &socketID}, - "dra.cpu/coreID": {IntValue: &coreID}, - "dra.cpu/cpuID": {IntValue: &cpuID}, - // TODO(pravk03): Remove. Hack to align with NIC (DRANet). We need some standard attribute to align other resources with CPU. - "dra.net/numaNode": {IntValue: &numaNode}, - }, - Capacity: make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity), - } - allDevices = append(allDevices, cpuDevice) - } - } - - if len(allDevices) == 0 { - return nil - } - - // Chunk devices into slices of at most maxDevicesPerResourceSlice - return slices.Collect(slices.Chunk(allDevices, maxDevicesPerResourceSlice)) -} - // PublishResources publishes ResourceSlice for CPU resources. func (cp *CPUDriver) PublishResources(ctx context.Context) { klog.Infof("Publishing resources") - var deviceChunks [][]resourceapi.Device - if cp.cpuDeviceMode == CPU_DEVICE_MODE_GROUPED { - deviceChunks = cp.createGroupedCPUDeviceSlices() - } else { - deviceChunks = cp.createCPUDeviceSlices() - } - + deviceChunks := cp.devMgr.CreateSlices(klog.FromContext(ctx)) if deviceChunks == nil { klog.Infof("No devices to publish or error occurred.") return @@ -266,14 +67,9 @@ func (cp *CPUDriver) PrepareResourceClaims(ctx context.Context, claims []*resour } for _, claim := range claims { - if cp.cpuDeviceMode == CPU_DEVICE_MODE_GROUPED { - klog.Infof("Claim %s/%s is for a grouped resource", claim.Namespace, claim.Name) - result[claim.UID] = cp.prepareGroupedResourceClaim(ctx, claim) - } else { - klog.Infof("Claim %s/%s is for an individual resource", claim.Namespace, claim.Name) - result[claim.UID] = cp.prepareResourceClaim(ctx, claim) - } + result[claim.UID] = cp.prepareResourceClaim(ctx, claim) } + return result, nil } @@ -281,7 +77,7 @@ func getCDIDeviceName(uid types.UID) string { return fmt.Sprintf("claim-%s", uid) } -func (cp *CPUDriver) prepareGroupedResourceClaim(ctx context.Context, claim *resourceapi.ResourceClaim) kubeletplugin.PrepareResult { +func (cp *CPUDriver) prepareResourceClaim(ctx context.Context, claim *resourceapi.ResourceClaim) kubeletplugin.PrepareResult { klog.Infof("prepareResourceClaim claim:%s/%s", claim.Namespace, claim.Name) if claim.Status.Allocation == nil { @@ -290,110 +86,13 @@ func (cp *CPUDriver) prepareGroupedResourceClaim(ctx context.Context, claim *res } } - var cpuAssignment cpuset.CPUSet - for _, alloc := range claim.Status.Allocation.Devices.Results { - claimCPUCount := int64(0) - if alloc.Driver != cp.driverName { - continue - } - if quantity, ok := alloc.ConsumedCapacity[cpuResourceQualifiedName]; ok { - count := quantity.Value() - claimCPUCount = count - klog.Infof("Found request for %d CPUs in device %s for claim %s", count, alloc.Device, claim.Name) - } - - topo := cp.cpuTopology - - var availableCPUsForDevice cpuset.CPUSet - if cp.cpuDeviceGroupBy == GROUP_BY_SOCKET { - socketID, ok := cp.deviceNameToSocketID[alloc.Device] - if !ok { - return kubeletplugin.PrepareResult{Err: fmt.Errorf("no valid socket ID found for device %s", alloc.Device)} - } - socketCPUs := topo.CPUDetails.CPUsInSockets(socketID) - availableCPUsForDevice = cp.cpuAllocationStore.GetSharedCPUs().Intersection(socketCPUs) - klog.Infof("Socket %d CPUs:%s available CPUs: %s", socketID, socketCPUs.String(), availableCPUsForDevice.String()) - } else { // numanode - numaNodeID, ok := cp.deviceNameToNUMANodeID[alloc.Device] - if !ok { - return kubeletplugin.PrepareResult{Err: fmt.Errorf("no valid NUMA node ID found for device %s", alloc.Device)} - } - numaCPUs := topo.CPUDetails.CPUsInNUMANodes(numaNodeID) - availableCPUsForDevice = cp.cpuAllocationStore.GetSharedCPUs().Intersection(numaCPUs) - klog.Infof("NUMA node %d CPUs:%s available CPUs: %s", numaNodeID, numaCPUs.String(), availableCPUsForDevice.String()) - } - - logger := klog.FromContext(ctx) - cur, err := cpumanager.TakeByTopologyNUMAPacked(logger, topo, availableCPUsForDevice, int(claimCPUCount), cpumanager.CPUSortingStrategyPacked, true) - if err != nil { - return kubeletplugin.PrepareResult{Err: err} - } - cpuAssignment = cpuAssignment.Union(cur) - klog.Infof("CPU assignment for device %s: %s. All cpus assigned:%s", alloc.Device, cur.String(), cpuAssignment.String()) - } - - if cpuAssignment.Size() == 0 { - klog.V(5).Infof("prepareResourceClaim claim:%s/%s has no CPU allocations for this driver", claim.Namespace, claim.Name) - return kubeletplugin.PrepareResult{} - } - - cp.cpuAllocationStore.AddResourceClaimAllocation(claim.UID, cpuAssignment) - - deviceName := getCDIDeviceName(claim.UID) - envVar := fmt.Sprintf("%s_%s=%s", cdiEnvVarPrefix, claim.UID, cpuAssignment.String()) - if err := cp.cdiMgr.AddDevice(deviceName, envVar); err != nil { + claimCPUSet, err := cp.devMgr.AllocateCPUs(klog.FromContext(ctx), claim) + if err != nil { return kubeletplugin.PrepareResult{Err: err} } - qualifiedName := cdiparser.QualifiedName(cdiVendor, cdiClass, deviceName) - klog.Infof("prepareResourceClaim CDIDeviceName:%s envVar:%s qualifiedName:%v", deviceName, envVar, qualifiedName) - preparedDevices := []kubeletplugin.Device{} - for _, allocResult := range claim.Status.Allocation.Devices.Results { - preparedDevice := kubeletplugin.Device{ - PoolName: allocResult.Pool, - DeviceName: allocResult.Device, - CDIDeviceIDs: []string{qualifiedName}, - Requests: []string{allocResult.Request}, - } - preparedDevices = append(preparedDevices, preparedDevice) - } - - klog.Infof("prepareResourceClaim preparedDevices:%+v", preparedDevices) - return kubeletplugin.PrepareResult{ - Devices: preparedDevices, - } -} - -func (cp *CPUDriver) prepareResourceClaim(_ context.Context, claim *resourceapi.ResourceClaim) kubeletplugin.PrepareResult { - klog.Infof("prepareResourceClaim claim:%s/%s", claim.Namespace, claim.Name) - - if claim.Status.Allocation == nil { - return kubeletplugin.PrepareResult{ - Err: fmt.Errorf("claim %s/%s has no allocation", claim.Namespace, claim.Name), - } - } - - claimCPUIDs := []int{} - for _, alloc := range claim.Status.Allocation.Devices.Results { - if alloc.Driver != cp.driverName { - continue - } - cpuID, ok := cp.deviceNameToCPUID[alloc.Device] - if !ok { - return kubeletplugin.PrepareResult{ - Err: fmt.Errorf("device %q not found in device to CPU ID map", alloc.Device), - } - } - claimCPUIDs = append(claimCPUIDs, cpuID) - } - - if len(claimCPUIDs) == 0 { - klog.V(5).Infof("prepareResourceClaim claim:%s/%s has no CPU allocations for this driver", claim.Namespace, claim.Name) - return kubeletplugin.PrepareResult{} - } - - claimCPUSet := cpuset.New(claimCPUIDs...) cp.cpuAllocationStore.AddResourceClaimAllocation(claim.UID, claimCPUSet) + deviceName := getCDIDeviceName(claim.UID) envVar := fmt.Sprintf("%s_%s=%s", cdiEnvVarPrefix, claim.UID, claimCPUSet.String()) if err := cp.cdiMgr.AddDevice(deviceName, envVar); err != nil { @@ -408,10 +107,12 @@ func (cp *CPUDriver) prepareResourceClaim(_ context.Context, claim *resourceapi. PoolName: allocResult.Pool, DeviceName: allocResult.Device, CDIDeviceIDs: []string{qualifiedName}, + Requests: []string{allocResult.Request}, } preparedDevices = append(preparedDevices, preparedDevice) } + klog.Infof("prepareResourceClaim preparedDevices:%+v", preparedDevices) return kubeletplugin.PrepareResult{ Devices: preparedDevices, } diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 769cbb0..31a84a1 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -25,7 +25,9 @@ import ( "github.com/containerd/nri/pkg/stub" "github.com/kubernetes-sigs/dra-driver-cpu/pkg/cpuinfo" + "github.com/kubernetes-sigs/dra-driver-cpu/pkg/device" "github.com/kubernetes-sigs/dra-driver-cpu/pkg/store" + resourceapi "k8s.io/api/resource/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/dynamic-resource-allocation/kubeletplugin" @@ -72,24 +74,25 @@ type CPUInfoProvider interface { GetCPUTopology() (*cpuinfo.CPUTopology, error) } +type deviceManager interface { + CreateSlices(klog.Logger) [][]resourceapi.Device + AllocateCPUs(klog.Logger, *resourceapi.ResourceClaim) (cpuset.CPUSet, error) +} + // CPUDriver is the structure that holds all the driver runtime information. type CPUDriver struct { - driverName string - nodeName string - kubeClient kubernetes.Interface - draPlugin KubeletPlugin - nriPlugin stub.Stub - podConfigStore *store.PodConfig - cpuAllocationStore *store.CPUAllocation - cdiMgr cdiManager - cpuTopology *cpuinfo.CPUTopology - deviceNameToCPUID map[string]int - deviceNameToSocketID map[string]int - deviceNameToNUMANodeID map[string]int - reservedCPUs cpuset.CPUSet - cpuDeviceMode string - cpuDeviceGroupBy string - claimTracker *store.ClaimTracker + driverName string + nodeName string + kubeClient kubernetes.Interface + draPlugin KubeletPlugin + nriPlugin stub.Stub + podConfigStore *store.PodConfig + cpuAllocationStore *store.CPUAllocation + cdiMgr cdiManager + cpuTopology *cpuinfo.CPUTopology + reservedCPUs cpuset.CPUSet + claimTracker *store.ClaimTracker + devMgr deviceManager } // Config is the configuration for the CPUDriver. @@ -104,16 +107,11 @@ type Config struct { // Start creates and starts a new CPUDriver. func Start(ctx context.Context, clientset kubernetes.Interface, config *Config) (*CPUDriver, error) { plugin := &CPUDriver{ - driverName: config.DriverName, - nodeName: config.NodeName, - kubeClient: clientset, - deviceNameToCPUID: make(map[string]int), - deviceNameToSocketID: make(map[string]int), - deviceNameToNUMANodeID: make(map[string]int), - reservedCPUs: config.ReservedCPUs, - cpuDeviceMode: config.CpuDeviceMode, - cpuDeviceGroupBy: config.CPUDeviceGroupBy, - claimTracker: store.NewClaimTracker(), + driverName: config.DriverName, + nodeName: config.NodeName, + kubeClient: clientset, + reservedCPUs: config.ReservedCPUs, + claimTracker: store.NewClaimTracker(), } cpuInfoProvider := cpuinfo.NewSystemCPUInfo() topo, err := cpuInfoProvider.GetCPUTopology() @@ -127,6 +125,16 @@ func Start(ctx context.Context, clientset kubernetes.Interface, config *Config) plugin.cpuAllocationStore = store.NewCPUAllocation(plugin.cpuTopology, config.ReservedCPUs) plugin.podConfigStore = store.NewPodConfig() + if config.CpuDeviceMode == CPU_DEVICE_MODE_INDIVIDUAL { + plugin.devMgr = device.NewIndividualCoreManager(config.DriverName, topo, config.ReservedCPUs) + } else { + if config.CPUDeviceGroupBy == GROUP_BY_SOCKET { + plugin.devMgr = device.NewSocketGroupedManager(config.DriverName, topo, config.ReservedCPUs, plugin.cpuAllocationStore.GetSharedCPUs) + } else { + plugin.devMgr = device.NewNUMAGroupedManager(config.DriverName, topo, config.ReservedCPUs, plugin.cpuAllocationStore.GetSharedCPUs) + } + } + driverPluginPath := filepath.Join(kubeletPluginPath, config.DriverName) if err := os.MkdirAll(driverPluginPath, 0750); err != nil { return nil, fmt.Errorf("failed to create plugin path %s: %w", driverPluginPath, err)