diff --git a/manifests/supervisorcluster/1.32/cns-csi.yaml b/manifests/supervisorcluster/1.32/cns-csi.yaml index cd2ce0bac9..ba1bae8463 100644 --- a/manifests/supervisorcluster/1.32/cns-csi.yaml +++ b/manifests/supervisorcluster/1.32/cns-csi.yaml @@ -126,6 +126,15 @@ rules: - apiGroups: ["crd.nsx.vmware.com"] resources: ["networkinfos"] verbs: ["get", "watch", "list"] + - apiGroups: ["crd.nsx.vmware.com"] + resources: ["vpcnetworkconfigurations"] + verbs: ["get", "list", "watch"] + - apiGroups: ["fvs.vcf.broadcom.com"] + resources: ["filevolumes"] + verbs: ["get", "list", "watch", "create", "delete", "patch", "update"] + - apiGroups: ["fvs.vcf.broadcom.com"] + resources: ["filevolumeservices"] + verbs: ["get", "list", "watch"] - apiGroups: ["encryption.vmware.com"] resources: ["encryptionclasses"] verbs: ["get", "list", "watch"] diff --git a/manifests/supervisorcluster/1.33/cns-csi.yaml b/manifests/supervisorcluster/1.33/cns-csi.yaml index cd2ce0bac9..ba1bae8463 100644 --- a/manifests/supervisorcluster/1.33/cns-csi.yaml +++ b/manifests/supervisorcluster/1.33/cns-csi.yaml @@ -126,6 +126,15 @@ rules: - apiGroups: ["crd.nsx.vmware.com"] resources: ["networkinfos"] verbs: ["get", "watch", "list"] + - apiGroups: ["crd.nsx.vmware.com"] + resources: ["vpcnetworkconfigurations"] + verbs: ["get", "list", "watch"] + - apiGroups: ["fvs.vcf.broadcom.com"] + resources: ["filevolumes"] + verbs: ["get", "list", "watch", "create", "delete", "patch", "update"] + - apiGroups: ["fvs.vcf.broadcom.com"] + resources: ["filevolumeservices"] + verbs: ["get", "list", "watch"] - apiGroups: ["encryption.vmware.com"] resources: ["encryptionclasses"] verbs: ["get", "list", "watch"] diff --git a/manifests/supervisorcluster/1.34/cns-csi.yaml b/manifests/supervisorcluster/1.34/cns-csi.yaml index cd2ce0bac9..ba1bae8463 100644 --- a/manifests/supervisorcluster/1.34/cns-csi.yaml +++ b/manifests/supervisorcluster/1.34/cns-csi.yaml @@ -126,6 +126,15 @@ rules: - apiGroups: ["crd.nsx.vmware.com"] resources: ["networkinfos"] verbs: ["get", "watch", "list"] + - apiGroups: ["crd.nsx.vmware.com"] + resources: ["vpcnetworkconfigurations"] + verbs: ["get", "list", "watch"] + - apiGroups: ["fvs.vcf.broadcom.com"] + resources: ["filevolumes"] + verbs: ["get", "list", "watch", "create", "delete", "patch", "update"] + - apiGroups: ["fvs.vcf.broadcom.com"] + resources: ["filevolumeservices"] + verbs: ["get", "list", "watch"] - apiGroups: ["encryption.vmware.com"] resources: ["encryptionclasses"] verbs: ["get", "list", "watch"] diff --git a/pkg/csi/service/common/constants.go b/pkg/csi/service/common/constants.go index 4807a15e94..e869024cb4 100644 --- a/pkg/csi/service/common/constants.go +++ b/pkg/csi/service/common/constants.go @@ -90,6 +90,15 @@ const ( // AttributeStorageClassName represents name of the Storage Class. AttributeStorageClassName = "csi.storage.k8s.io/sc/name" + // StorageClassVsanFileServicePolicy is the supervisor StorageClass for vSAN file service (immediate binding). + StorageClassVsanFileServicePolicy = "vsan-file-service-policy" + // StorageClassVsanFileServicePolicyLateBinding is the supervisor StorageClass for vSAN file service (late binding). + StorageClassVsanFileServicePolicyLateBinding = "vsan-file-service-policy-latebinding" + + // FVSVolumeIDPrefix is the CSI volume ID prefix for the FVS FileVolume CR workflow + // (fv::). + FVSVolumeIDPrefix = "fv:" + // AttributeIsLinkedClone represents if this is a linked clone request AttributeIsLinkedClone = "csi.vsphere.volume/fast-provisioning" diff --git a/pkg/csi/service/common/vsphereutil.go b/pkg/csi/service/common/vsphereutil.go index 2b00776465..bec8a2d7cc 100644 --- a/pkg/csi/service/common/vsphereutil.go +++ b/pkg/csi/service/common/vsphereutil.go @@ -1217,7 +1217,7 @@ func GetCnsVolumeType(ctx context.Context, volumeId string) string { log := logger.GetLogger(ctx) var volumeType string // Determine volume type based on volume ID prefix - if strings.HasPrefix(volumeId, "file:") { + if strings.HasPrefix(volumeId, "file:") || strings.HasPrefix(volumeId, FVSVolumeIDPrefix) { volumeType = FileVolumeType } else { volumeType = BlockVolumeType diff --git a/pkg/csi/service/wcp/controller.go b/pkg/csi/service/wcp/controller.go index 0611fa5d0b..183d2cbf0e 100644 --- a/pkg/csi/service/wcp/controller.go +++ b/pkg/csi/service/wcp/controller.go @@ -41,9 +41,15 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ctrlconfig "sigs.k8s.io/controller-runtime/pkg/client/config" + fvsapis "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/filevolume" "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/crypto" cnsvolume "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/volume" cnsvsphere "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/vsphere" @@ -59,6 +65,7 @@ import ( "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeinfo" cnsvolumeinfov1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeinfo/v1alpha1" "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeoperationrequest" + k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes" ) const ( @@ -87,6 +94,8 @@ var ( isPodVMOnStretchSupervisorFSSEnabled bool // IsMultipleClustersPerVsphereZoneFSSEnabled is true when supports_multiple_clusters_per_zone FSS is enabled. IsMultipleClustersPerVsphereZoneFSSEnabled bool + // isVsanFileVolumeServiceFSSEnabled is true when supports_vsan_fileservice capability is enabled on the supervisor. + isVsanFileVolumeServiceFSSEnabled bool ) var getCandidateDatastores = cnsvsphere.GetCandidateDatastoresInCluster @@ -113,11 +122,14 @@ type snapshotLockManager struct { } type controller struct { - manager *common.Manager - authMgr common.AuthorizationService - topologyMgr commoncotypes.ControllerTopologyService - snapshotLockMgr *snapshotLockManager - k8sClient kubernetes.Interface + manager *common.Manager + authMgr common.AuthorizationService + topologyMgr commoncotypes.ControllerTopologyService + snapshotLockMgr *snapshotLockManager + k8sClient kubernetes.Interface + dynamicClient dynamic.Interface + namespaceLister corelisters.NamespaceLister + fileVolumeClient ctrlclient.Client csi.UnimplementedControllerServer csi.UnimplementedSnapshotMetadataServer } @@ -181,6 +193,12 @@ func (c *controller) Init(config *cnsconfig.Config, version string) error { common.FCDTransactionSupport) IsMultipleClustersPerVsphereZoneFSSEnabled = commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.MultipleClustersPerVsphereZone) + isVsanFileVolumeServiceFSSEnabled = commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, + common.VsanFileVolumeService) + if !commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.VsanFileVolumeService) { + go commonco.ContainerOrchestratorUtility.HandleLateEnablementOfCapability(ctx, cnstypes.CnsClusterFlavorWorkload, + common.VsanFileVolumeService, "", "") + } if !IsMultipleClustersPerVsphereZoneFSSEnabled { go commonco.ContainerOrchestratorUtility.HandleLateEnablementOfCapability(ctx, cnstypes.CnsClusterFlavorWorkload, common.MultipleClustersPerVsphereZone, "", "") @@ -195,10 +213,6 @@ func (c *controller) Init(config *cnsconfig.Config, version string) error { go commonco.ContainerOrchestratorUtility.HandleLateEnablementOfCapability(ctx, cnstypes.CnsClusterFlavorWorkload, common.SharedDiskFss, "", "") } - if !commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.VsanFileVolumeService) { - go commonco.ContainerOrchestratorUtility.HandleLateEnablementOfCapability(ctx, cnstypes.CnsClusterFlavorWorkload, - common.VsanFileVolumeService, "", "") - } if idempotencyHandlingEnabled { log.Info("CSI Volume manager idempotency handling feature flag is enabled.") operationStore, err = cnsvolumeoperationrequest.InitVolumeOperationRequestInterface(ctx, @@ -256,6 +270,36 @@ func (c *controller) Init(config *cnsconfig.Config, version string) error { } log.Info("Initialized Kubernetes client") + if isVsanFileVolumeServiceFSSEnabled { + c.dynamicClient, err = dynamic.NewForConfig(cfg) + if err != nil { + log.Errorf("failed to create dynamic Kubernetes client. err=%v", err) + return err + } + log.Info("Initialized dynamic Kubernetes client") + + fvsScheme := runtime.NewScheme() + if err = fvsapis.AddToScheme(fvsScheme); err != nil { + log.Errorf("failed to add FileVolume API types to scheme. err=%v", err) + return err + } + c.fileVolumeClient, err = ctrlclient.New(cfg, ctrlclient.Options{Scheme: fvsScheme}) + if err != nil { + log.Errorf("failed to create FileVolume Kubernetes client. err=%v", err) + return err + } + log.Info("Initialized FileVolume Kubernetes client") + + im := k8s.NewInformer(ctx, c.k8sClient, true) + im.InitNamespaceInformer() + im.Listen() + if nsSynced := im.NamespaceInformerSynced(); nsSynced != nil && !cache.WaitForCacheSync(ctx.Done(), nsSynced) { + return logger.LogNewErrorf(log, "FVS namespace informer cache sync failed") + } + c.namespaceLister = im.GetNamespaceLister() + log.Info("Namespace informer for FVS initialized") + } + vc, err := common.GetVCenter(ctx, c.manager) if err != nil { log.Errorf("failed to get vcenter. err=%v", err) @@ -1745,6 +1789,14 @@ func (c *controller) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequ return nil, csifault.CSIUnimplementedFault, logger.LogNewErrorCode(log, codes.Unimplemented, "file volume feature is disabled on the cluster") } + scName := req.Parameters[common.AttributeStorageClassName] + useFVS, err := shouldProvisionVsanFileVolumeViaFVS(ctx, scName) + if err != nil { + return nil, csifault.CSIInvalidArgumentFault, err + } + if useFVS { + return c.createFileVolumeViaFVS(ctx, req) + } // Block file volume provisioning if FSS Workload_Domain_Isolation_Supported is enabled but // 'fileVolumeActivated' field is set to false in vSphere config secret. if isWorkloadDomainIsolationEnabled && diff --git a/pkg/csi/service/wcp/fvs_filevolume.go b/pkg/csi/service/wcp/fvs_filevolume.go new file mode 100644 index 0000000000..719dde2b33 --- /dev/null +++ b/pkg/csi/service/wcp/fvs_filevolume.go @@ -0,0 +1,476 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wcp + +import ( + "context" + "fmt" + "math/rand" + "strings" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + fvv1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/filevolume/v1alpha1" + csifault "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/fault" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common/commonco" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger" + cnsoperatorutil "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util" +) + +const ( + fvsGroup = "fvs.vcf.broadcom.com" + fvsVersion = "v1alpha1" + // fvsWaitStep is the poll interval while waiting for the FileVolume CR to reach Ready and populate + // status (FVS controllers reconcile asynchronously; we re-read on this cadence instead of hammering + // the API). + fvsWaitStep = 5 * time.Second + // fvsWaitMax is the maximum time to wait before failing CreateVolume so the CSI RPC does not block indefinitely + // if the FileVolume never becomes Ready or status fields stay empty. + fvsWaitMax = 5 * time.Minute + // AnnotationVPCNetworkConfig is set on supervisor namespaces; value is the VPCNetworkConfiguration CR name. + AnnotationVPCNetworkConfig = "nsx.vmware.com/vpc_network_config" + // NamespaceLabelFVSInstance marks FVS instance namespaces (supervisor namespaces that host FileVolume CRs). + // Expected label value is "true". + NamespaceLabelFVSInstance = "fvs_instance_namespace" +) + +var ( + vpcNetworkConfigurationGVR = schema.GroupVersionResource{ + Group: "crd.nsx.vmware.com", + Version: "v1alpha1", + Resource: "vpcnetworkconfigurations", + } + fvsFileVolumeGVR = schema.GroupVersionResource{ + Group: fvsGroup, + Version: fvsVersion, + Resource: "filevolumes", + } + fvsFileVolumeServiceGVR = schema.GroupVersionResource{ + Group: fvsGroup, + Version: fvsVersion, + Resource: "filevolumeservices", + } +) + +// fvsZonesForNamespace resolves zone labels for a supervisor namespace for FVS filtering and topology. +// Unit tests may replace this hook; production uses the container orchestrator. +var fvsZonesForNamespace = func(ns string) map[string]struct{} { + return commonco.ContainerOrchestratorUtility.GetZonesForNamespace(ns) +} + +// isVsanFileServicePolicyStorageClass returns true for supervisor storage classes that route +// volume provisioning through the FVS FileVolume CR workflow when NSX_VPC and capability match. +func isVsanFileServicePolicyStorageClass(storageClassName string) bool { + switch storageClassName { + case common.StorageClassVsanFileServicePolicy, + common.StorageClassVsanFileServicePolicyLateBinding: + return true + default: + return false + } +} + +// vpcPathFromVPCNetworkConfiguration returns status.vpcs[0].vpcPath from a VPCNetworkConfiguration CR +// (crd.nsx.vmware.com/v1alpha1). VPC matching uses this path, not status.vpcs[0].name. +func vpcPathFromVPCNetworkConfiguration(obj *unstructured.Unstructured) string { + vpcs, found, err := unstructured.NestedSlice(obj.Object, "status", "vpcs") + if err == nil && found && len(vpcs) > 0 { + if vpc, ok := vpcs[0].(map[string]interface{}); ok { + if path, ok := vpc["vpcPath"].(string); ok && path != "" { + return path + } + } + } + return "" +} + +// findVPCNetworkConfigurationForNamespace loads the supervisor Namespace, reads the +// AnnotationVPCNetworkConfig annotation for the VPCNetworkConfiguration object name, and returns that +// cluster-scoped VPCNetworkConfiguration CR (crd.nsx.vmware.com/v1alpha1). +func (c *controller) findVPCNetworkConfigurationForNamespace(ctx context.Context, namespace string) ( + *unstructured.Unstructured, error) { + if c.namespaceLister == nil { + return nil, fmt.Errorf("namespace lister is not initialized") + } + ns, err := c.namespaceLister.Get(namespace) + if err != nil { + return nil, fmt.Errorf("get namespace %q from informer cache: %w", namespace, err) + } + crName := ns.Annotations[AnnotationVPCNetworkConfig] + if crName == "" { + return nil, fmt.Errorf("namespace %q has no %q annotation", namespace, AnnotationVPCNetworkConfig) + } + cr, err := c.dynamicClient.Resource(vpcNetworkConfigurationGVR).Get(ctx, crName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("get VPCNetworkConfiguration %q: %w", crName, err) + } + return cr, nil +} + +// getVPCPathForNamespace returns status.vpcs[0].vpcPath for a supervisor namespace from its +// VPCNetworkConfiguration CR referenced by the namespace annotation. +func (c *controller) getVPCPathForNamespace(ctx context.Context, namespace string) (string, error) { + cr, err := c.findVPCNetworkConfigurationForNamespace(ctx, namespace) + if err != nil { + return "", err + } + p := vpcPathFromVPCNetworkConfiguration(cr) + if p == "" { + return "", fmt.Errorf("no VPC path in status.vpcs for VPCNetworkConfiguration %q", cr.GetName()) + } + return p, nil +} + +// listFVSCandidateInstanceNamespaces returns supervisor namespaces that may host the FVS FileVolume CR: +// same VPC path as the consumer (status.vpcs[0].vpcPath on VPCNetworkConfiguration), excluding the PVC namespace, +// and with at least one requested zone present on the namespace (zone CRs). +// Each namespace must have AnnotationVPCNetworkConfig pointing at its cluster-scoped VPCNetworkConfiguration; +// namespaces without the annotation are skipped. +func (c *controller) listFVSCandidateInstanceNamespaces(ctx context.Context, pvcNamespace string, + requestedZones []string) ([]string, error) { + log := logger.GetLogger(ctx) + if len(requestedZones) == 0 { + return nil, fmt.Errorf("requested zones must be non-empty to select FVS instance namespaces") + } + + vpcPath, err := c.getVPCPathForNamespace(ctx, pvcNamespace) + if err != nil { + return nil, fmt.Errorf("failed to resolve VPC path for namespace %q: %w", pvcNamespace, err) + } + log.Infof("listFVSCandidateInstanceNamespaces: consumer namespace %q uses VPC path %q, requested zones %v", + pvcNamespace, vpcPath, requestedZones) + + if c.namespaceLister == nil { + return nil, fmt.Errorf("namespace lister is not initialized") + } + instanceNSSelector := labels.SelectorFromSet(labels.Set{ + NamespaceLabelFVSInstance: "true", + }) + nsObjs, err := c.namespaceLister.List(instanceNSSelector) + if err != nil { + return nil, fmt.Errorf("list namespaces from informer cache: %w", err) + } + + var candidateNamespaces []string + for _, ns := range nsObjs { + if ns.Name == pvcNamespace { + continue + } + crName := ns.Annotations[AnnotationVPCNetworkConfig] + if crName == "" { + log.Debugf("listFVSCandidateInstanceNamespaces: skipping namespace %q: missing %q annotation", + ns.Name, AnnotationVPCNetworkConfig) + continue + } + cr, err := c.dynamicClient.Resource(vpcNetworkConfigurationGVR).Get(ctx, crName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("get VPCNetworkConfiguration %q for namespace %q: %w", crName, ns.Name, err) + } + nsVPCPath := vpcPathFromVPCNetworkConfiguration(cr) + if nsVPCPath != vpcPath { + continue + } + log.Infof("listFVSCandidateInstanceNamespaces: namespace %q matches consumer VPC path %q "+ + "(VPCNetworkConfiguration %q)", + ns.Name, vpcPath, crName) + if !namespaceHasAnyRequestedZone(ns.Name, requestedZones) { + log.Infof("listFVSCandidateInstanceNamespaces: skipping namespace %q: "+ + "no overlap with requested zones %v", + ns.Name, requestedZones) + continue + } + log.Infof("listFVSCandidateInstanceNamespaces: adding namespace %q to FVS candidate list "+ + "(VPC path %q, requested zones %v)", + ns.Name, vpcPath, requestedZones) + candidateNamespaces = append(candidateNamespaces, ns.Name) + } + if len(candidateNamespaces) == 0 { + return nil, fmt.Errorf("no FVS candidate namespace for VPC path %q and zones %v (consumer namespace %q)", + vpcPath, requestedZones, pvcNamespace) + } + return candidateNamespaces, nil +} + +// instanceNamespaceHasReadyFileVolumeService returns nil if the namespace has at least one +// FileVolumeService (fvs.vcf.broadcom.com/v1alpha1) whose status.healthState is Ready (case-insensitive). +// Otherwise it returns an error describing a missing list, no CRs, or no healthy service. +func (c *controller) instanceNamespaceHasReadyFileVolumeService(ctx context.Context, instanceNS string) error { + list, err := c.dynamicClient.Resource(fvsFileVolumeServiceGVR).Namespace(instanceNS).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + if len(list.Items) == 0 { + return fmt.Errorf("no FileVolumeService in namespace %q", instanceNS) + } + for _, item := range list.Items { + obj := item.Object + if hs, found, _ := unstructured.NestedString(obj, "status", "healthState"); found && + strings.EqualFold(hs, "Ready") { + return nil + } + } + return fmt.Errorf("no healthy FileVolumeService in namespace %q", instanceNS) +} + +// namespaceHasAnyRequestedZone reports whether the supervisor namespace is associated with at least one +// of the topology zones requested for the volume (via ContainerOrchestratorUtility zone CRs). +func namespaceHasAnyRequestedZone(namespace string, requestedZones []string) bool { + if len(requestedZones) == 0 { + return true + } + zonesInNS := fvsZonesForNamespace(namespace) + for _, z := range requestedZones { + if _, ok := zonesInNS[z]; ok { + return true + } + } + return false +} + +// topologyListFromZoneMap builds a CSI Topology list from a set of zone names, with each entry +// labeling topology.kubernetes.io/zone to that zone value. +func topologyListFromZoneMap(zones map[string]struct{}) []*csi.Topology { + var out []*csi.Topology + for z := range zones { + out = append(out, &csi.Topology{ + Segments: map[string]string{v1.LabelTopologyZone: z}, + }) + } + return out +} + +// fvsAccessibleTopology sets CSI AccessibleTopology from zone assignments on the PVC (workload) +// namespace and the instance namespace (where the FileVolume CR runs). +// +// If the volume is placed where the PVC namespace and instance namespace share at least one zone (the +// requested topology aligns with a zone assigned to the PVC namespace and that zone is also on the +// instance namespace), those shared zones are set as AccessibleTopology for the volume. +// +// If the two namespaces share no zone (including when the volume would not sit on overlapping namespace +// zones), all zones assigned to the PVC namespace are set as AccessibleTopology for the volume. +func fvsAccessibleTopology(pvcNamespace, instanceNS string) ([]*csi.Topology, error) { + pvcNamespaceZones := fvsZonesForNamespace(pvcNamespace) + if len(pvcNamespaceZones) == 0 { + return nil, fmt.Errorf("no zones assigned to PVC namespace %q", pvcNamespace) + } + instanceNamespaceZones := fvsZonesForNamespace(instanceNS) + if len(instanceNamespaceZones) == 0 { + return nil, fmt.Errorf("no zones assigned to instance namespace %q", instanceNS) + } + + shared := make(map[string]struct{}) + for z := range instanceNamespaceZones { + if _, ok := pvcNamespaceZones[z]; ok { + shared[z] = struct{}{} + } + } + if len(shared) == 0 { + return topologyListFromZoneMap(pvcNamespaceZones), nil + } + return topologyListFromZoneMap(shared), nil +} + +// createFileVolumeViaFVS provisions a file volume by creating a FileVolume CR in the FVS instance namespace. +func (c *controller) createFileVolumeViaFVS(ctx context.Context, req *csi.CreateVolumeRequest) ( + *csi.CreateVolumeResponse, string, error) { + log := logger.GetLogger(ctx) + pvcNamespace := req.Parameters[common.AttributePvcNamespace] + pvcName := req.Parameters[common.AttributePvcName] + if pvcNamespace == "" || pvcName == "" { + return nil, csifault.CSIInvalidArgumentFault, logger.LogNewErrorCode(log, codes.InvalidArgument, + "missing PVC name or namespace in CreateVolume parameters") + } + + if req.GetAccessibilityRequirements() == nil { + return nil, csifault.CSIInvalidArgumentFault, logger.LogNewErrorCode(log, codes.InvalidArgument, + "accessibility requirements are required for FVS file volume provisioning") + } + + pvcUID, err := common.ExtractVolumeIDFromPVName(ctx, req.GetName()) + if err != nil { + return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.Internal, + "failed to extract PVC UID from volume name %q: %v", req.GetName(), err) + } + fvName := pvcUID + + volSizeBytes := int64(common.DefaultGbDiskSize * common.GbInBytes) + if req.GetCapacityRange() != nil && req.GetCapacityRange().RequiredBytes != 0 { + volSizeBytes = int64(req.GetCapacityRange().GetRequiredBytes()) + } + qty := resource.NewQuantity(volSizeBytes, resource.BinarySI) + + requestedZones, err := GetZonesFromAccessibilityRequirements(ctx, req.GetAccessibilityRequirements()) + if err != nil { + return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.Internal, + "failed to read zones from accessibility requirements: %v", err) + } + + candidateNamespaces, err := c.listFVSCandidateInstanceNamespaces(ctx, pvcNamespace, requestedZones) + if err != nil { + return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.FailedPrecondition, + "%v", err) + } + + // Phase 1: see if a FileVolume CR with this name (PVC UID) already exists in any candidate instance namespace. + var instanceNS string + for _, ns := range candidateNamespaces { + fvExisting := &fvv1alpha1.FileVolume{} + getErr := c.fileVolumeClient.Get(ctx, ctrlclient.ObjectKey{Namespace: ns, Name: fvName}, fvExisting) + if getErr == nil { + instanceNS = ns + log.Infof("reusing existing FileVolume %s/%s for PVC %s/%s", ns, fvName, pvcNamespace, pvcName) + break + } + if !apierrors.IsNotFound(getErr) { + log.Warnf("could not get FileVolume %q in namespace %q (will try next candidate): %v", + fvName, ns, getErr) + continue + } + } + + // Phase 2: no existing FileVolume — pick one candidate namespace and create the CR. + if instanceNS == "" { + var targetNS string + // Randomize order so repeated creates do not always probe the same instance namespace first (hotspot). + rand.Shuffle(len(candidateNamespaces), func(i, j int) { + candidateNamespaces[i], candidateNamespaces[j] = candidateNamespaces[j], candidateNamespaces[i] + }) + for _, ns := range candidateNamespaces { + if svcErr := c.instanceNamespaceHasReadyFileVolumeService(ctx, ns); svcErr != nil { + log.Debugf("skipping instance namespace %q for FileVolume create: %v", ns, svcErr) + continue + } + targetNS = ns + break + } + if targetNS == "" { + return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.FailedPrecondition, + "no instance namespace with a healthy FileVolumeService for requested zones %v", requestedZones) + } + + fvTyped := &fvv1alpha1.FileVolume{ + TypeMeta: metav1.TypeMeta{ + APIVersion: schema.GroupVersion{Group: fvsGroup, Version: fvsVersion}.String(), + Kind: "FileVolume", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fvName, + Namespace: targetNS, + }, + Spec: fvv1alpha1.FileVolumeSpec{ + PvcUID: pvcUID, + Size: *qty, + Protocols: []fvv1alpha1.FileVolumeProtocol{fvv1alpha1.FileVolumeProtocolNFSv4}, + }, + } + log.Infof("Creating FileVolume CR for PVC %s/%s: %+v", pvcNamespace, pvcName, fvTyped) + createErr := c.fileVolumeClient.Create(ctx, fvTyped) + if createErr != nil { + if !apierrors.IsAlreadyExists(createErr) { + return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.Internal, + "failed to create FileVolume %s/%s: %v", targetNS, fvName, createErr) + } + log.Infof("FileVolume %s/%s already exists, monitoring status for PVC %s/%s", + targetNS, fvName, pvcNamespace, pvcName) + } else { + log.Infof("Created FileVolume %s/%s for PVC %s/%s", targetNS, fvName, pvcNamespace, pvcName) + } + instanceNS = targetNS + } + + fv := &fvv1alpha1.FileVolume{} + var exportPath, endpoint string + err = wait.PollUntilContextTimeout(ctx, fvsWaitStep, fvsWaitMax, true, func(ctx context.Context) (bool, error) { + if err := c.fileVolumeClient.Get(ctx, ctrlclient.ObjectKey{Namespace: instanceNS, Name: fvName}, fv); err != nil { + return false, err + } + phase := fv.Status.Phase + if phase == "" { + return false, nil + } + if strings.EqualFold(string(phase), string(fvv1alpha1.FileVolumePhaseError)) { + return false, fmt.Errorf("FileVolume %s/%s entered Error phase", instanceNS, fvName) + } + if !strings.EqualFold(string(phase), string(fvv1alpha1.FileVolumePhaseReady)) { + return false, nil + } + exportPath = fv.Status.ExportPath + endpoint = fv.Status.Endpoint + return exportPath != "" && endpoint != "", nil + }) + if err != nil { + return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.DeadlineExceeded, + "timeout or error waiting for FileVolume %s/%s to become Ready with export path and endpoint: %v", + instanceNS, fvName, err) + } + + volumeID := fmt.Sprintf("%s%s:%s", common.FVSVolumeIDPrefix, instanceNS, fvName) + nfsv41Export := endpoint + ":" + exportPath + attributes := map[string]string{ + common.AttributeDiskType: common.DiskTypeFileVolume, + common.Nfsv4ExportPathAnnotationKey: nfsv41Export, + } + + resp := &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: volumeID, + CapacityBytes: volSizeBytes, + VolumeContext: attributes, + }, + } + + topo, err := fvsAccessibleTopology(pvcNamespace, instanceNS) + if err != nil { + return nil, csifault.CSIInternalFault, logger.LogNewErrorCodef(log, codes.Internal, + "failed to compute accessible topology: %v", err) + } + resp.Volume.AccessibleTopology = topo + + return resp, "", nil +} + +// shouldProvisionVsanFileVolumeViaFVS encapsulates routing to the FVS CR workflow. +func shouldProvisionVsanFileVolumeViaFVS(ctx context.Context, storageClassName string) (bool, error) { + if !isVsanFileServicePolicyStorageClass(storageClassName) { + return false, nil + } + np, err := cnsoperatorutil.GetNetworkProvider(ctx) + if err != nil { + return false, err + } + if np != cnsoperatorutil.VPCNetworkProvider { + return false, status.Errorf(codes.FailedPrecondition, + "storage class %q requires NSX_VPC network provider (current network provider: %q)", + storageClassName, np) + } + if !isVsanFileVolumeServiceFSSEnabled { + return false, nil + } + return true, nil +} diff --git a/pkg/csi/service/wcp/fvs_filevolume_test.go b/pkg/csi/service/wcp/fvs_filevolume_test.go new file mode 100644 index 0000000000..2a1185af30 --- /dev/null +++ b/pkg/csi/service/wcp/fvs_filevolume_test.go @@ -0,0 +1,438 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wcp + +import ( + "context" + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + k8sfake "k8s.io/client-go/kubernetes/fake" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + + csifault "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/fault" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/unittestcommon" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common/commonco" + cnsoperatorutil "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util" +) + +func TestMain(m *testing.M) { + co, err := unittestcommon.GetFakeContainerOrchestratorInterface(common.Kubernetes) + if err != nil { + panic(err) + } + commonco.ContainerOrchestratorUtility = co + m.Run() +} + +func TestIsVsanFileServicePolicyStorageClass(t *testing.T) { + require.True(t, isVsanFileServicePolicyStorageClass(common.StorageClassVsanFileServicePolicy)) + require.True(t, isVsanFileServicePolicyStorageClass(common.StorageClassVsanFileServicePolicyLateBinding)) + require.False(t, isVsanFileServicePolicyStorageClass("some-other-sc")) + require.False(t, isVsanFileServicePolicyStorageClass("")) +} + +func TestVPCPathFromVPCNetworkConfiguration(t *testing.T) { + obj := &unstructured.Unstructured{Object: map[string]interface{}{ + "status": map[string]interface{}{ + "vpcs": []interface{}{ + map[string]interface{}{ + "name": "fvs-vpc-1", + "vpcPath": "/orgs/default/projects/default/vpcs/fvs-vpc", + }, + }, + }, + }} + require.Equal(t, "/orgs/default/projects/default/vpcs/fvs-vpc", vpcPathFromVPCNetworkConfiguration(obj)) + + empty := &unstructured.Unstructured{Object: map[string]interface{}{}} + require.Equal(t, "", vpcPathFromVPCNetworkConfiguration(empty)) + + malformed := &unstructured.Unstructured{Object: map[string]interface{}{ + "status": map[string]interface{}{ + "vpcs": []interface{}{"not-a-map"}, + }, + }} + require.Equal(t, "", vpcPathFromVPCNetworkConfiguration(malformed)) + + nameOnly := &unstructured.Unstructured{Object: map[string]interface{}{ + "status": map[string]interface{}{ + "vpcs": []interface{}{ + map[string]interface{}{"name": "no-path"}, + }, + }, + }} + require.Equal(t, "", vpcPathFromVPCNetworkConfiguration(nameOnly)) +} + +func TestTopologyListFromZoneMap(t *testing.T) { + zones := map[string]struct{}{ + "zone-a": {}, + "zone-b": {}, + } + topo := topologyListFromZoneMap(zones) + require.Len(t, topo, 2) + seen := make(map[string]bool) + for _, tinfo := range topo { + z := tinfo.Segments[v1.LabelTopologyZone] + require.NotEmpty(t, z) + seen[z] = true + } + require.True(t, seen["zone-a"]) + require.True(t, seen["zone-b"]) +} + +func TestNamespaceHasAnyRequestedZone_EmptyRequested(t *testing.T) { + require.True(t, namespaceHasAnyRequestedZone("any-ns", nil)) + require.True(t, namespaceHasAnyRequestedZone("any-ns", []string{})) +} + +func TestNamespaceHasAnyRequestedZone_Intersection(t *testing.T) { + orig := fvsZonesForNamespace + defer func() { fvsZonesForNamespace = orig }() + fvsZonesForNamespace = func(ns string) map[string]struct{} { + if ns == "has-zone" { + return map[string]struct{}{"z1": {}, "z2": {}} + } + return nil + } + + require.True(t, namespaceHasAnyRequestedZone("has-zone", []string{"z1"})) + require.False(t, namespaceHasAnyRequestedZone("has-zone", []string{"z99"})) + require.False(t, namespaceHasAnyRequestedZone("other-ns", []string{"z1"})) +} + +func TestFvsAccessibleTopology(t *testing.T) { + orig := fvsZonesForNamespace + defer func() { fvsZonesForNamespace = orig }() + fvsZonesForNamespace = func(ns string) map[string]struct{} { + switch ns { + case "pvc-ns": + return map[string]struct{}{"z1": {}, "z2": {}} + case "inst-ns": + return map[string]struct{}{"z2": {}, "z3": {}} + case "inst-no-overlap": + // shares no zones with pvc-ns → publish all PVC namespace zones + return map[string]struct{}{"z9": {}} + case "no-zones": + return nil + default: + return nil + } + } + + topo, err := fvsAccessibleTopology("pvc-ns", "inst-ns") + require.NoError(t, err) + require.Len(t, topo, 1) + require.Equal(t, "z2", topo[0].Segments[v1.LabelTopologyZone]) + + topo2, err := fvsAccessibleTopology("pvc-ns", "inst-no-overlap") + require.NoError(t, err) + require.Len(t, topo2, 2) + + _, err = fvsAccessibleTopology("no-zones", "inst-ns") + require.Error(t, err) +} + +// testNamespaceLister builds a synced Namespace lister for unit tests (production uses controller init). +func testNamespaceLister(t *testing.T, k8s kubernetes.Interface) corelisters.NamespaceLister { + t.Helper() + factory := informers.NewSharedInformerFactory(k8s, 0) + stopCh := make(chan struct{}) + t.Cleanup(func() { close(stopCh) }) + nsInformer := factory.Core().V1().Namespaces().Informer() + factory.Start(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, nsInformer.HasSynced)) + return factory.Core().V1().Namespaces().Lister() +} + +func newTestDynamicClient(t *testing.T, objs ...runtime.Object) *fake.FakeDynamicClient { + t.Helper() + gvrToListKind := map[schema.GroupVersionResource]string{ + vpcNetworkConfigurationGVR: "VPCNetworkConfigurationList", + fvsFileVolumeGVR: "FileVolumeList", + fvsFileVolumeServiceGVR: "FileVolumeServiceList", + } + return fake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), gvrToListKind, objs...) +} + +func testVPCNetworkConfigurationCR(name, vpcPath string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "crd.nsx.vmware.com/v1alpha1", + "kind": "VPCNetworkConfiguration", + "metadata": map[string]interface{}{ + "name": name, + }, + "status": map[string]interface{}{ + "vpcs": []interface{}{ + map[string]interface{}{ + "name": "display-name-ignored-for-matching", + "vpcPath": vpcPath, + }, + }, + }, + }, + } +} + +func TestFindVPCNetworkConfigurationForNamespace(t *testing.T) { + ctx := context.Background() + vpcCR := "consumer-ns-11111111-1111-1111-1111-111111111111" + k8s := k8sfake.NewClientset(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "consumer-ns", + Annotations: map[string]string{ + AnnotationVPCNetworkConfig: vpcCR, + }, + }, + }) + dyn := newTestDynamicClient(t, testVPCNetworkConfigurationCR(vpcCR, "/orgs/default/projects/default/vpcs/vpc-a")) + + c := &controller{k8sClient: k8s, dynamicClient: dyn, namespaceLister: testNamespaceLister(t, k8s)} + obj, err := c.findVPCNetworkConfigurationForNamespace(ctx, "consumer-ns") + require.NoError(t, err) + require.Equal(t, vpcCR, obj.GetName()) + require.Equal(t, "/orgs/default/projects/default/vpcs/vpc-a", vpcPathFromVPCNetworkConfiguration(obj)) + + k8sNoAnn := k8sfake.NewClientset(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "n2"}}) + c2 := &controller{k8sClient: k8sNoAnn, dynamicClient: dyn, namespaceLister: testNamespaceLister(t, k8sNoAnn)} + _, err = c2.findVPCNetworkConfigurationForNamespace(ctx, "n2") + require.Error(t, err) +} + +func TestGetVPCPathForNamespace(t *testing.T) { + ctx := context.Background() + vpcCR := "ns1-22222222-2222-2222-2222-222222222222" + sharedPath := "/orgs/default/projects/default/vpcs/shared-vpc" + k8s := k8sfake.NewClientset(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ns1", + Annotations: map[string]string{ + AnnotationVPCNetworkConfig: vpcCR, + }, + }, + }) + dyn := newTestDynamicClient(t, testVPCNetworkConfigurationCR(vpcCR, sharedPath)) + + c := &controller{k8sClient: k8s, dynamicClient: dyn, namespaceLister: testNamespaceLister(t, k8s)} + path, err := c.getVPCPathForNamespace(ctx, "ns1") + require.NoError(t, err) + require.Equal(t, sharedPath, path) + + dynNoStatus := newTestDynamicClient(t, &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "crd.nsx.vmware.com/v1alpha1", + "kind": "VPCNetworkConfiguration", + "metadata": map[string]interface{}{"name": vpcCR}, + "status": map[string]interface{}{}, + }, + }) + c2 := &controller{k8sClient: k8s, dynamicClient: dynNoStatus, namespaceLister: testNamespaceLister(t, k8s)} + _, err = c2.getVPCPathForNamespace(ctx, "ns1") + require.Error(t, err) +} + +func TestListFVSCandidateInstanceNamespaces(t *testing.T) { + ctx := context.Background() + consumerAnn := "pvc-ns-33333333-3333-3333-3333-333333333333" + instAnn := "inst-ns-44444444-4444-4444-4444-444444444444" + + k8s := k8sfake.NewClientset( + &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-ns", + Annotations: map[string]string{ + AnnotationVPCNetworkConfig: consumerAnn, + }, + }, + }, + &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "inst-ns", + Labels: map[string]string{ + NamespaceLabelFVSInstance: "true", + }, + Annotations: map[string]string{ + AnnotationVPCNetworkConfig: instAnn, + }, + }, + }, + &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "kube-system"}}, + ) + sameVPCPath := "/orgs/default/projects/default/vpcs/same-vpc" + dyn := newTestDynamicClient(t, + testVPCNetworkConfigurationCR(consumerAnn, sameVPCPath), + testVPCNetworkConfigurationCR(instAnn, sameVPCPath), + ) + + origZones := fvsZonesForNamespace + defer func() { fvsZonesForNamespace = origZones }() + fvsZonesForNamespace = func(ns string) map[string]struct{} { + if ns == "pvc-ns" || ns == "inst-ns" { + return map[string]struct{}{"zone-a": {}} + } + return nil + } + + c := &controller{k8sClient: k8s, dynamicClient: dyn, namespaceLister: testNamespaceLister(t, k8s)} + out, err := c.listFVSCandidateInstanceNamespaces(ctx, "pvc-ns", []string{"zone-a"}) + require.NoError(t, err) + require.Equal(t, []string{"inst-ns"}, out) + + _, err = c.listFVSCandidateInstanceNamespaces(ctx, "pvc-ns", nil) + require.Error(t, err) +} + +func TestInstanceNamespaceHasReadyFileVolumeService(t *testing.T) { + ctx := context.Background() + fvs := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "fvs.vcf.broadcom.com/v1alpha1", + "kind": "FileVolumeService", + "metadata": map[string]interface{}{ + "name": "fvs1", + "namespace": "ns1", + }, + "status": map[string]interface{}{ + "healthState": "Ready", + }, + }, + } + dyn := newTestDynamicClient(t, fvs) + c := &controller{dynamicClient: dyn} + require.NoError(t, c.instanceNamespaceHasReadyFileVolumeService(ctx, "ns1")) + + notReady := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "fvs.vcf.broadcom.com/v1alpha1", + "kind": "FileVolumeService", + "metadata": map[string]interface{}{ + "name": "fvs2", + "namespace": "ns2", + }, + "status": map[string]interface{}{ + "healthState": "NotReady", + }, + }, + } + dyn2 := newTestDynamicClient(t, notReady) + c2 := &controller{dynamicClient: dyn2} + require.Error(t, c2.instanceNamespaceHasReadyFileVolumeService(ctx, "ns2")) + + dynEmpty := newTestDynamicClient(t) + c3 := &controller{dynamicClient: dynEmpty} + require.Error(t, c3.instanceNamespaceHasReadyFileVolumeService(ctx, "empty")) +} + +func TestCreateFileVolumeViaFVS_ValidationAndPVC(t *testing.T) { + ctx := context.Background() + c := &controller{k8sClient: k8sfake.NewClientset()} + + t.Run("missing PVC parameters", func(t *testing.T) { + req := &csi.CreateVolumeRequest{Parameters: map[string]string{}} + _, fault, err := c.createFileVolumeViaFVS(ctx, req) + require.Error(t, err) + require.Equal(t, csifault.CSIInvalidArgumentFault, fault) + }) + + t.Run("missing accessibility requirements", func(t *testing.T) { + req := &csi.CreateVolumeRequest{ + Parameters: map[string]string{ + common.AttributePvcNamespace: "ns", + common.AttributePvcName: "pvc", + }, + } + _, fault, err := c.createFileVolumeViaFVS(ctx, req) + require.Error(t, err) + require.Equal(t, csifault.CSIInvalidArgumentFault, fault) + }) + + t.Run("volume name without extractable PVC UID", func(t *testing.T) { + req := &csi.CreateVolumeRequest{ + Name: "invalid-volume-name", + Parameters: map[string]string{ + common.AttributePvcNamespace: "ns", + common.AttributePvcName: "pvc", + }, + AccessibilityRequirements: &csi.TopologyRequirement{ + Preferred: []*csi.Topology{{Segments: map[string]string{v1.LabelTopologyZone: "z1"}}}, + }, + } + _, fault, err := c.createFileVolumeViaFVS(ctx, req) + require.Error(t, err) + require.Equal(t, csifault.CSIInternalFault, fault) + require.Contains(t, err.Error(), "failed to extract PVC UID") + }) +} + +func TestShouldProvisionVsanFileVolumeViaFVS(t *testing.T) { + ctx := context.Background() + + ok, err := shouldProvisionVsanFileVolumeViaFVS(ctx, "other-sc") + require.NoError(t, err) + require.False(t, ok) + + t.Run("non-VPC network provider returns FailedPrecondition", func(t *testing.T) { + orig := cnsoperatorutil.GetNetworkProviderFunc + defer func() { cnsoperatorutil.GetNetworkProviderFunc = orig }() + cnsoperatorutil.GetNetworkProviderFunc = func(ctx context.Context) (string, error) { + return "NSX_T", nil + } + _, err := shouldProvisionVsanFileVolumeViaFVS(ctx, common.StorageClassVsanFileServicePolicy) + require.Error(t, err) + }) + + t.Run("VPC provider FSS disabled", func(t *testing.T) { + orig := cnsoperatorutil.GetNetworkProviderFunc + defer func() { cnsoperatorutil.GetNetworkProviderFunc = orig }() + cnsoperatorutil.GetNetworkProviderFunc = func(ctx context.Context) (string, error) { + return cnsoperatorutil.VPCNetworkProvider, nil + } + oldFSS := isVsanFileVolumeServiceFSSEnabled + defer func() { isVsanFileVolumeServiceFSSEnabled = oldFSS }() + isVsanFileVolumeServiceFSSEnabled = false + ok, err := shouldProvisionVsanFileVolumeViaFVS(ctx, common.StorageClassVsanFileServicePolicy) + require.NoError(t, err) + require.False(t, ok) + }) + + t.Run("VPC provider FSS enabled", func(t *testing.T) { + orig := cnsoperatorutil.GetNetworkProviderFunc + defer func() { cnsoperatorutil.GetNetworkProviderFunc = orig }() + cnsoperatorutil.GetNetworkProviderFunc = func(ctx context.Context) (string, error) { + return cnsoperatorutil.VPCNetworkProvider, nil + } + oldFSS := isVsanFileVolumeServiceFSSEnabled + defer func() { isVsanFileVolumeServiceFSSEnabled = oldFSS }() + isVsanFileVolumeServiceFSSEnabled = true + ok, err := shouldProvisionVsanFileVolumeViaFVS(ctx, common.StorageClassVsanFileServicePolicy) + require.NoError(t, err) + require.True(t, ok) + }) +} diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go index 4502980e8b..4b592b0b9e 100644 --- a/pkg/kubernetes/informers.go +++ b/pkg/kubernetes/informers.go @@ -185,14 +185,21 @@ func (im *InformerManager) AddPVListener(ctx context.Context, add func(obj inter return nil } +// InitNamespaceInformer registers the cluster Namespace shared informer with the factory (no event handlers). +// Idempotent. Call before Listen when only the Namespace lister/cache is needed (e.g. WCP FVS). +func (im *InformerManager) InitNamespaceInformer() { + if im.namespaceInformer != nil { + return + } + im.namespaceInformer = im.informerFactory.Core().V1().Namespaces().Informer() + im.namespaceSynced = im.namespaceInformer.HasSynced +} + // AddNamespaceListener hooks up add, update, delete callbacks. func (im *InformerManager) AddNamespaceListener(ctx context.Context, add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) error { log := logger.GetLogger(ctx) - if im.namespaceInformer == nil { - im.namespaceInformer = im.informerFactory.Core().V1().Namespaces().Informer() - } - im.namespaceSynced = im.namespaceInformer.HasSynced + im.InitNamespaceInformer() _, err := im.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: add, @@ -290,6 +297,22 @@ func (im *InformerManager) GetPodLister() corelisters.PodLister { return im.informerFactory.Core().V1().Pods().Lister() } +// Client returns the Kubernetes clientset associated with this informer manager. +func (im *InformerManager) Client() clientset.Interface { + return im.client +} + +// GetNamespaceLister returns the Namespace lister for the shared factory (register the informer via +// InitNamespaceInformer or AddNamespaceListener before relying on cache contents). +func (im *InformerManager) GetNamespaceLister() corelisters.NamespaceLister { + return im.informerFactory.Core().V1().Namespaces().Lister() +} + +// NamespaceInformerSynced returns the namespace informer's HasSynced func, or nil if not registered. +func (im *InformerManager) NamespaceInformerSynced() cache.InformerSynced { + return im.namespaceSynced +} + // Listen starts the Informers. func (im *InformerManager) Listen() (stopCh <-chan struct{}) { go im.informerFactory.Start(im.stopCh) diff --git a/pkg/syncer/cnsoperator/util/util.go b/pkg/syncer/cnsoperator/util/util.go index e7254c6cb8..6014dd90c8 100644 --- a/pkg/syncer/cnsoperator/util/util.go +++ b/pkg/syncer/cnsoperator/util/util.go @@ -259,11 +259,15 @@ func getSnatIpFromNamespaceNetworkInfo(ctx context.Context, dc dynamic.Interface return snatIP, nil } -// GetNetworkProvider reads the network-config configmap in Supervisor cluster. +// GetNetworkProviderFunc is the implementation used by GetNetworkProvider; tests may replace it to avoid +// monkey-patching the real Kubernetes client. +var GetNetworkProviderFunc = getNetworkProviderFromConfigMap + +// getNetworkProviderFromConfigMap reads the network-config configmap in Supervisor cluster. // Returns the network provider as NSXT_CONTAINER_PLUGIN for NSX-T, or // VSPHERE_NETWORK for VDS. Otherwise, returns an error, if network provider is // not present in the configmap. -func GetNetworkProvider(ctx context.Context) (string, error) { +func getNetworkProviderFromConfigMap(ctx context.Context) (string, error) { log := logger.GetLogger(ctx) k8sclient, err := k8s.NewClient(ctx) if err != nil { @@ -287,6 +291,11 @@ func GetNetworkProvider(ctx context.Context) (string, error) { wcpNetworkConfigMap, kubeSystemNamespace) } +// GetNetworkProvider reads the network-config configmap in Supervisor cluster. +func GetNetworkProvider(ctx context.Context) (string, error) { + return GetNetworkProviderFunc(ctx) +} + // GetVCDatacenterFromConfig returns datacenter registered for each vCenter func GetVCDatacentersFromConfig(cfg *config.Config) ([]string, string, error) { dcList := make([]string, 0)