diff --git a/internal/test/integration/k8s/manifests/05-hostpid-daemonset.yml b/internal/test/integration/k8s/manifests/05-hostpid-daemonset.yml new file mode 100644 index 0000000000..a5b2d6d1a9 --- /dev/null +++ b/internal/test/integration/k8s/manifests/05-hostpid-daemonset.yml @@ -0,0 +1,43 @@ +# A DaemonSet with hostPID=true that serves HTTP traffic. +# Used to test that spans from this DaemonSet are correctly attributed to it +# (and not to other pods sharing the host PID namespace). +apiVersion: v1 +kind: Service +metadata: + name: hostpid-httpserver +spec: + selector: + app: hostpid-httpserver + ports: + - port: 8082 + name: http + targetPort: http +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: hostpid-httpserver + labels: + app: hostpid-httpserver +spec: + selector: + matchLabels: + app: hostpid-httpserver + template: + metadata: + name: hostpid-httpserver + labels: + app: hostpid-httpserver + spec: + hostPID: true + containers: + - name: hostpid-httpserver + image: testserver:dev + imagePullPolicy: Never # loaded into Kind from localhost + ports: + - containerPort: 8082 + hostPort: 8082 + name: http + env: + - name: LOG_LEVEL + value: "DEBUG" diff --git a/internal/test/integration/k8s/manifests/06-obi-daemonset-sharedpidns.yml b/internal/test/integration/k8s/manifests/06-obi-daemonset-sharedpidns.yml new file mode 100644 index 0000000000..8744f8bb39 --- /dev/null +++ b/internal/test/integration/k8s/manifests/06-obi-daemonset-sharedpidns.yml @@ -0,0 +1,68 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: obi-config +data: + obi-config.yml: | + attributes: + kubernetes: + enable: true + log_level: debug + discovery: + instrument: + - k8s_deployment_name: testserver + - k8s_daemonset_name: hostpid-httpserver + routes: + patterns: + - /pingpong + unmatched: path + otel_traces_export: + endpoint: http://jaeger:4318 +--- +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: obi +spec: + selector: + matchLabels: + instrumentation: obi + template: + metadata: + labels: + instrumentation: obi + teardown: delete + spec: + hostPID: true + serviceAccountName: obi + volumes: + - name: obi-config + configMap: + name: obi-config + - name: testoutput + persistentVolumeClaim: + claimName: testoutput + containers: + - name: obi + image: obi:dev + imagePullPolicy: Never + args: ["--config=/config/obi-config.yml"] + securityContext: + privileged: true + runAsUser: 0 + volumeMounts: + - mountPath: /config + name: obi-config + - mountPath: /testoutput + name: testoutput + env: + - name: GOCOVERDIR + value: "/testoutput" + - name: OTEL_EBPF_DISCOVERY_POLL_INTERVAL + value: "500ms" + - name: OTEL_EBPF_METRICS_INTERVAL + value: "10ms" + - name: OTEL_EBPF_BPF_BATCH_TIMEOUT + value: "10ms" + - name: OTEL_EBPF_OTLP_TRACES_BATCH_TIMEOUT + value: "0ms" diff --git a/internal/test/integration/k8s/sharedpidns/k8s_sharedpidns_main_test.go b/internal/test/integration/k8s/sharedpidns/k8s_sharedpidns_main_test.go new file mode 100644 index 0000000000..e78060c88c --- /dev/null +++ b/internal/test/integration/k8s/sharedpidns/k8s_sharedpidns_main_test.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package sharedpidns tests that spans from pods sharing the host PID namespace +// (hostPID=true) are correctly attributed to their respective pods, rather than +// being misattributed to an arbitrary pod that happens to share the same +// PID namespace inode. +package sharedpidns + +import ( + "flag" + "fmt" + "log/slog" + "os" + "testing" + "time" + + "go.opentelemetry.io/obi/internal/test/integration/components/docker" + "go.opentelemetry.io/obi/internal/test/integration/components/kube" + k8s "go.opentelemetry.io/obi/internal/test/integration/k8s/common" + "go.opentelemetry.io/obi/internal/test/integration/k8s/common/testpath" + "go.opentelemetry.io/obi/internal/test/tools" +) + +const ( + testTimeout = 3 * time.Minute + jaegerQueryURL = "http://localhost:36686/api/traces" +) + +var cluster *kube.Kind + +func TestMain(m *testing.M) { + flag.Parse() + if testing.Short() { + fmt.Println("skipping integration tests in short mode") + return + } + + if err := docker.Build(os.Stdout, tools.ProjectDir(), + docker.ImageBuild{Tag: "testserver:dev", Dockerfile: k8s.DockerfileTestServer}, + docker.ImageBuild{Tag: "obi:dev", Dockerfile: k8s.DockerfileOBI}, + ); err != nil { + slog.Error("can't build docker images", "error", err) + os.Exit(-1) + } + + cluster = kube.NewKind("test-kind-cluster-sharedpidns", + kube.KindConfig(testpath.Manifests+"/00-kind.yml"), + kube.LocalImage("testserver:dev"), + kube.LocalImage("obi:dev"), + kube.Deploy(testpath.Manifests+"/01-volumes.yml"), + kube.Deploy(testpath.Manifests+"/01-serviceaccount.yml"), + kube.Deploy(testpath.Manifests+"/03-otelcol.yml"), + kube.Deploy(testpath.Manifests+"/04-jaeger.yml"), + // Deploy a normal Deployment (no hostPID) + kube.Deploy(testpath.Manifests+"/05-uninstrumented-service.yml"), + // Deploy a DaemonSet with hostPID=true serving HTTP on port 8082 + kube.Deploy(testpath.Manifests+"/05-hostpid-daemonset.yml"), + // Deploy OBI configured to instrument both + kube.Deploy(testpath.Manifests+"/06-obi-daemonset-sharedpidns.yml"), + ) + + cluster.Run(m) +} diff --git a/internal/test/integration/k8s/sharedpidns/k8s_sharedpidns_test.go b/internal/test/integration/k8s/sharedpidns/k8s_sharedpidns_test.go new file mode 100644 index 0000000000..cee623d784 --- /dev/null +++ b/internal/test/integration/k8s/sharedpidns/k8s_sharedpidns_test.go @@ -0,0 +1,150 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sharedpidns + +import ( + "context" + "encoding/json" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "sigs.k8s.io/e2e-framework/pkg/envconf" + "sigs.k8s.io/e2e-framework/pkg/features" + + "go.opentelemetry.io/obi/internal/test/integration/components/jaeger" + k8s "go.opentelemetry.io/obi/internal/test/integration/k8s/common" +) + +// TestSharedPIDNamespaceAttribution verifies that when a DaemonSet with +// hostPID=true runs alongside a normal Deployment, OBI correctly attributes +// spans to their respective pods. Before the fix in PodContainerByPIDNs, +// multiple containers sharing the host PID namespace (init_pid_ns inode +// 4026531836) would cause spans to be arbitrarily attributed to whichever +// pod happened to be iterated first in a Go map — leading to cross-pod +// misattribution of service.name, k8s.namespace.name, k8s.pod.name, etc. +func TestSharedPIDNamespaceAttribution(t *testing.T) { + feat := features.New("Spans from hostPID pods are not misattributed to other pods"). + Assess("spans from the Deployment get the Deployment's k8s metadata", + func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { + require.EventuallyWithT(t, func(ct *assert.CollectT) { + // Generate traffic to the Deployment's testserver (port 8080) + resp, err := http.Get("http://localhost:38080/pingpong") + require.NoError(ct, err) + if resp == nil { + return + } + + func() { + if resp.Body != nil { + defer func() { + require.NoError(ct, resp.Body.Close()) + }() + } + + require.Equal(ct, http.StatusOK, resp.StatusCode) + }() + + resp, err = http.Get(jaegerQueryURL + "?service=testserver") + require.NoError(ct, err) + if resp == nil { + return + } + + var tq jaeger.TracesQuery + func() { + if resp.Body != nil { + defer func() { + require.NoError(ct, resp.Body.Close()) + }() + } + + require.Equal(ct, http.StatusOK, resp.StatusCode) + require.NoError(ct, json.NewDecoder(resp.Body).Decode(&tq)) + }() + traces := tq.FindBySpan(jaeger.Tag{Key: "url.path", Type: "string", Value: "/pingpong"}) + require.NotEmpty(ct, traces) + trace := traces[0] + require.NotEmpty(ct, trace.Spans) + + res := trace.FindByOperationName("GET /pingpong", "server") + require.Len(ct, res, 1) + parent := res[0] + + // The Deployment's spans must carry the Deployment's k8s metadata, + // NOT the hostPID DaemonSet's metadata + sd := jaeger.DiffAsRegexp([]jaeger.Tag{ + {Key: "k8s.pod.name", Type: "string", Value: "^testserver-.*"}, + {Key: "k8s.container.name", Type: "string", Value: "testserver"}, + {Key: "k8s.deployment.name", Type: "string", Value: "^testserver$"}, + {Key: "k8s.namespace.name", Type: "string", Value: "^default$"}, + {Key: "k8s.node.name", Type: "string", Value: ".+-control-plane$"}, + {Key: "k8s.pod.uid", Type: "string", Value: k8s.UUIDRegex}, + {Key: "k8s.pod.start_time", Type: "string", Value: k8s.TimeRegex}, + {Key: "service.instance.id", Type: "string", Value: "^default\\.testserver-.+\\.testserver"}, + }, trace.Processes[parent.ProcessID].Tags) + require.Empty(ct, sd, sd.String()) + }, testTimeout, 100*time.Millisecond) + return ctx + }, + ). + Assess("spans from the hostPID DaemonSet get the DaemonSet's k8s metadata", + func(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { + require.EventuallyWithT(t, func(ct *assert.CollectT) { + // Generate traffic to the hostPID DaemonSet's testserver (port 8082) + trafficResp, err := http.Get("http://localhost:38082/pingpong") + require.NoError(ct, err) + if trafficResp == nil { + return + } + defer trafficResp.Body.Close() + require.Equal(ct, http.StatusOK, trafficResp.StatusCode) + + jaegerResp, err := http.Get(jaegerQueryURL + "?service=hostpid-httpserver") + require.NoError(ct, err) + if jaegerResp == nil { + return + } + defer jaegerResp.Body.Close() + require.Equal(ct, http.StatusOK, jaegerResp.StatusCode) + var tq jaeger.TracesQuery + require.NoError(ct, json.NewDecoder(jaegerResp.Body).Decode(&tq)) + traces := tq.FindBySpan(jaeger.Tag{Key: "url.path", Type: "string", Value: "/pingpong"}) + require.NotEmpty(ct, traces) + trace := traces[0] + require.NotEmpty(ct, trace.Spans) + + res := trace.FindByOperationName("GET /pingpong", "server") + require.Len(ct, res, 1) + parent := res[0] + + // The DaemonSet's spans must carry the DaemonSet's k8s metadata, + // NOT the Deployment's metadata + sd := jaeger.DiffAsRegexp([]jaeger.Tag{ + {Key: "k8s.pod.name", Type: "string", Value: "^hostpid-httpserver-.*"}, + {Key: "k8s.container.name", Type: "string", Value: "hostpid-httpserver"}, + {Key: "k8s.daemonset.name", Type: "string", Value: "^hostpid-httpserver$"}, + {Key: "k8s.namespace.name", Type: "string", Value: "^default$"}, + {Key: "k8s.node.name", Type: "string", Value: ".+-control-plane$"}, + {Key: "k8s.pod.uid", Type: "string", Value: k8s.UUIDRegex}, + {Key: "k8s.pod.start_time", Type: "string", Value: k8s.TimeRegex}, + {Key: "service.instance.id", Type: "string", Value: "^default\\.hostpid-httpserver-.+\\.hostpid-httpserver"}, + }, trace.Processes[parent.ProcessID].Tags) + require.Empty(ct, sd, sd.String()) + + // Verify no deployment metadata leaks onto DaemonSet spans + sd = jaeger.DiffAsRegexp([]jaeger.Tag{ + {Key: "k8s.deployment.name", Type: "string"}, + }, trace.Processes[parent.ProcessID].Tags) + require.Equal(ct, jaeger.DiffResult{ + {ErrType: jaeger.ErrTypeMissing, Expected: jaeger.Tag{Key: "k8s.deployment.name", Type: "string"}}, + }, sd) + }, testTimeout, 100*time.Millisecond) + return ctx + }, + ).Feature() + cluster.TestEnv().Test(t, feat) +} diff --git a/pkg/kube/store.go b/pkg/kube/store.go index eca41d25b9..4e0b957e7b 100644 --- a/pkg/kube/store.go +++ b/pkg/kube/store.go @@ -378,26 +378,62 @@ func (s *Store) PodByContainerID(cid string) *kube.CachedObjMeta { return s.podsByContainer[cid] } -// PodContainerByPIDNs second return value: container Name -func (s *Store) PodContainerByPIDNs(pidns uint32) (*kube.CachedObjMeta, string) { +// PodContainerByPIDNs returns the pod metadata and container name for the +// process identified by Linux PID namespace pidns and host-visible PID +// hostPID. +// +// When multiple containers share a PID namespace (e.g. pods with hostPID=true +// all collapse onto the host's init_pid_ns inode), the host-visible PID is +// used to disambiguate. If the exact PID is not found but all entries in the +// namespace belong to the same container, the result is unambiguous and +// returned. When container IDs differ and the PID can't be matched, nil is +// returned to avoid silently misattributing spans to the wrong pod. +// Second return value: container Name. +func (s *Store) PodContainerByPIDNs(pidns uint32, hostPID app.PID) (*kube.CachedObjMeta, string) { s.access.RLock() defer s.access.RUnlock() - if infos, ok := s.namespaces[pidns]; ok { - for _, info := range infos { - if om, ok := s.podsByContainer[info.ContainerID]; ok { - oID := fetchOwnerID(om.Meta) - containerName := "" - if containerInfo, ok := s.containersByOwner.Get(oID, info.ContainerID); ok { - containerName = containerInfo.Name - } - return om, containerName - } - // we break here, the namespace is the same for all pids in the container - // we need to check one only - break + infos, ok := s.namespaces[pidns] + if !ok { + return nil, "" + } + // Prefer exact match by host PID. This is the only way to correctly + // disambiguate when a PID namespace is shared across multiple containers. + if hostPID != 0 { + if info, ok := infos[hostPID]; ok { + return s.podContainerByInfo(info) + } + } + // Fallback: only safe when every registered PID in this namespace points + // to the same container (i.e. the namespace is not actually shared). + var pick *container.Info + for _, info := range infos { + if pick == nil { + pick = info + continue } + if info.ContainerID != pick.ContainerID { + s.log.Debug("cannot disambiguate shared PID namespace without matching host PID; skipping k8s decoration", + "pidNs", pidns, "lookupHostPID", hostPID, "processes", len(infos)) + return nil, "" + } + } + if pick == nil { + return nil, "" + } + return s.podContainerByInfo(pick) +} + +func (s *Store) podContainerByInfo(info *container.Info) (*kube.CachedObjMeta, string) { + om, ok := s.podsByContainer[info.ContainerID] + if !ok { + return nil, "" + } + containerName := "" + oID := fetchOwnerID(om.Meta) + if containerInfo, ok := s.containersByOwner.Get(oID, info.ContainerID); ok { + containerName = containerInfo.Name } - return nil, "" + return om, containerName } func (s *Store) ObjectMetaByIP(ip string) *kube.CachedObjMeta { diff --git a/pkg/kube/store_test.go b/pkg/kube/store_test.go index 2f14d92959..14de0fe01f 100644 --- a/pkg/kube/store_test.go +++ b/pkg/kube/store_test.go @@ -1159,37 +1159,113 @@ func TestStore_PodContainerByPIDNs_MultiPID(t *testing.T) { store.addObjectMeta(podMeta) - t.Run("PodContainerByPIDNs returns correct pod for any PID", func(t *testing.T) { - // Should return the same pod regardless of which PID is actually found first - // (since they're all in the same namespace and container) - pod, containerName := store.PodContainerByPIDNs(pidNS) + t.Run("exact PID match returns correct pod", func(t *testing.T) { + pod, containerName := store.PodContainerByPIDNs(pidNS, 1001) + require.NotNil(t, pod, "Should find pod for exact PID") + assert.Equal(t, "test-pod", pod.Meta.Name) + assert.Equal(t, "test-container-name", containerName) + }) - require.NotNil(t, pod, "Should find pod for namespace") + t.Run("fallback works when all PIDs share same container", func(t *testing.T) { + // hostPID=0 means no exact match, but all PIDs share the same container + pod, containerName := store.PodContainerByPIDNs(pidNS, 0) + require.NotNil(t, pod, "Should find pod via unambiguous fallback") assert.Equal(t, "test-pod", pod.Meta.Name) assert.Equal(t, "test-container-name", containerName) }) t.Run("after deleting some PIDs, still finds pod", func(t *testing.T) { - // Delete one PID store.DeleteProcess(1001) - pod, containerName := store.PodContainerByPIDNs(pidNS) + pod, containerName := store.PodContainerByPIDNs(pidNS, 1002) require.NotNil(t, pod, "Should still find pod after deleting one PID") assert.Equal(t, "test-pod", pod.Meta.Name) assert.Equal(t, "test-container-name", containerName) }) t.Run("after deleting all PIDs, doesn't find pod", func(t *testing.T) { - // Delete remaining PIDs store.DeleteProcess(1002) store.DeleteProcess(1003) - pod, containerName := store.PodContainerByPIDNs(pidNS) + pod, containerName := store.PodContainerByPIDNs(pidNS, 1001) assert.Nil(t, pod, "Should not find pod after deleting all PIDs") assert.Empty(t, containerName) }) } +func TestStore_PodContainerByPIDNs_SharedNamespace(t *testing.T) { + originalInfoForPID := InfoForPID + defer func() { InfoForPID = originalInfoForPID }() + + store := createTestStore() + + // Simulate shared PID namespace (e.g. hostPID=true) with two different containers + // from two different pods, both mapping to the same PID namespace inode + hostPIDNs := uint32(4026531836) // typical host init_pid_ns inode + + InfoForPID = func(pid app.PID) (container.Info, error) { + switch pid { + case 100: + return container.Info{ContainerID: "container-app", PIDNamespace: hostPIDNs}, nil + case 200: + return container.Info{ContainerID: "container-daemonset", PIDNamespace: hostPIDNs}, nil + default: + return container.Info{}, assert.AnError + } + } + + store.AddProcess(100) + store.AddProcess(200) + + appPod := &informer.ObjectMeta{ + Name: "my-app-pod", Namespace: "app-ns", Kind: "Pod", + Pod: &informer.PodInfo{ + Owners: []*informer.Owner{{Name: "my-app", Kind: "Deployment"}}, + Containers: []*informer.ContainerInfo{{Id: "container-app", Name: "app"}}, + }, + } + daemonsetPod := &informer.ObjectMeta{ + Name: "node-proxy-xyz", Namespace: "kube-system", Kind: "Pod", + Pod: &informer.PodInfo{ + Owners: []*informer.Owner{{Name: "node-proxy", Kind: "DaemonSet"}}, + Containers: []*informer.ContainerInfo{{Id: "container-daemonset", Name: "proxy"}}, + }, + } + store.addObjectMeta(appPod) + store.addObjectMeta(daemonsetPod) + + t.Run("exact PID match disambiguates to app pod", func(t *testing.T) { + pod, containerName := store.PodContainerByPIDNs(hostPIDNs, 100) + require.NotNil(t, pod) + assert.Equal(t, "my-app-pod", pod.Meta.Name) + assert.Equal(t, "app-ns", pod.Meta.Namespace) + assert.Equal(t, "app", containerName) + }) + + t.Run("exact PID match disambiguates to daemonset pod", func(t *testing.T) { + pod, containerName := store.PodContainerByPIDNs(hostPIDNs, 200) + require.NotNil(t, pod) + assert.Equal(t, "node-proxy-xyz", pod.Meta.Name) + assert.Equal(t, "kube-system", pod.Meta.Namespace) + assert.Equal(t, "proxy", containerName) + }) + + t.Run("unknown PID in shared namespace returns nil", func(t *testing.T) { + // PID 999 is not registered; since the namespace has multiple different + // container IDs, we cannot safely pick one + pod, containerName := store.PodContainerByPIDNs(hostPIDNs, 999) + assert.Nil(t, pod, "Should return nil when PID not found and namespace is ambiguous") + assert.Empty(t, containerName) + }) + + t.Run("zero PID in shared namespace returns nil", func(t *testing.T) { + // hostPID=0 means no disambiguation available; should not pick randomly + pod, containerName := store.PodContainerByPIDNs(hostPIDNs, 0) + assert.Nil(t, pod, "Should return nil when no PID given and namespace is ambiguous") + assert.Empty(t, containerName) + }) +} + func TestStore_MultiPID_ConcurrentAccess(t *testing.T) { originalInfoForPID := InfoForPID defer func() { InfoForPID = originalInfoForPID }() diff --git a/pkg/transform/k8s.go b/pkg/transform/k8s.go index 1c53513d99..9f5504609f 100644 --- a/pkg/transform/k8s.go +++ b/pkg/transform/k8s.go @@ -162,7 +162,7 @@ func (md *metadataDecorator) nodeLoop(ctx context.Context) { } func (md *metadataDecorator) do(span *request.Span) { - if podMeta, containerName := md.store.PodContainerByPIDNs(span.Pid.Namespace); podMeta != nil { + if podMeta, containerName := md.store.PodContainerByPIDNs(span.Pid.Namespace, span.Pid.HostPID); podMeta != nil { AppendKubeMetadata(md.store, &span.Service, podMeta, md.clusterName, containerName) } else if span.Service.Metadata == nil { // do not leave the service attributes map as nil @@ -293,7 +293,7 @@ mainLoop: } md.log.Debug("annotating process event", "event", pe) - if podMeta, containerName := md.store.PodContainerByPIDNs(pe.File.Ns); podMeta != nil { + if podMeta, containerName := md.store.PodContainerByPIDNs(pe.File.Ns, pe.File.Pid); podMeta != nil { AppendKubeMetadata(md.store, &pe.File.Service, podMeta, md.clusterName, containerName) } else { // do not leave the service attributes map as nil @@ -343,7 +343,7 @@ func (md *procEventMetadataDecorator) handlePodUpdateEvent(pod *informer.ObjectM if peMap, ok := md.tracker.info(cnt.Id); ok { md.log.Debug("found missed pid info", "containerId", cnt.Id) for _, pe := range peMap { - if podMeta, containerName := md.store.PodContainerByPIDNs(pe.File.Ns); podMeta != nil { + if podMeta, containerName := md.store.PodContainerByPIDNs(pe.File.Ns, pe.File.Pid); podMeta != nil { md.log.Debug("resubmitting process event", "event", pe) AppendKubeMetadata(md.store, &pe.File.Service, podMeta, md.clusterName, containerName) md.output.Send(*pe)