diff --git a/devdocs/config/CONFIG.md b/devdocs/config/CONFIG.md index efa35cf299..f3c3c08a8b 100644 --- a/devdocs/config/CONFIG.md +++ b/devdocs/config/CONFIG.md @@ -257,6 +257,12 @@ HTTPParsingPolicy defines the default action for http enrichment rules. |---|---|---|---|---|---|---| | `ebpf.payload_extraction.http.genai.gemini.enabled` | `boolean` | `OTEL_EBPF_HTTP_GEMINI_ENABLED` | `false` | | | Enable Google AI Studio (Gemini) payload extraction and parsing | +#### `ebpf.payload_extraction.http.genai.mcp` + +| YAML Path | Type | Env Var | Default | Values | Deprecated | Description | +|---|---|---|---|---|---|---| +| `ebpf.payload_extraction.http.genai.mcp.enabled` | `boolean` | `OTEL_EBPF_HTTP_MCP_ENABLED` | `false` | | | Enable Model Context Protocol (MCP) payload extraction and parsing | + #### `ebpf.payload_extraction.http.genai.openai` | YAML Path | Type | Env Var | Default | Values | Deprecated | Description | diff --git a/devdocs/config/config-schema.json b/devdocs/config/config-schema.json index f8a176ee78..cc8a450938 100644 --- a/devdocs/config/config-schema.json +++ b/devdocs/config/config-schema.json @@ -843,6 +843,10 @@ "$ref": "#/$defs/GeminiConfig", "description": "Google AI Studio (Gemini) payload extraction and parsing" }, + "mcp": { + "$ref": "#/$defs/MCPConfig", + "description": "Model Context Protocol (MCP) payload extraction and parsing" + }, "openai": { "$ref": "#/$defs/OpenAIConfig", "description": "Payload extraction and parsing" @@ -1482,6 +1486,17 @@ }, "type": "object" }, + "MCPConfig": { + "properties": { + "enabled": { + "type": "boolean", + "description": "Enable Model Context Protocol (MCP) payload extraction and parsing", + "default": false, + "x-env-var": "OTEL_EBPF_HTTP_MCP_ENABLED" + } + }, + "type": "object" + }, "MapsConfig": { "properties": { "global_scale_factor": { diff --git a/internal/test/integration/components/pythonmcp/Dockerfile b/internal/test/integration/components/pythonmcp/Dockerfile new file mode 100644 index 0000000000..dd86c5ec65 --- /dev/null +++ b/internal/test/integration/components/pythonmcp/Dockerfile @@ -0,0 +1,4 @@ +FROM python:3.14@sha256:61346539f7b26521a230e72c11da5ebd872924745074b19736e7d65ba748c366 +EXPOSE 8080 +COPY main.py /main.py +CMD ["python", "main.py"] diff --git a/internal/test/integration/components/pythonmcp/main.py b/internal/test/integration/components/pythonmcp/main.py new file mode 100644 index 0000000000..1e702473a6 --- /dev/null +++ b/internal/test/integration/components/pythonmcp/main.py @@ -0,0 +1,163 @@ +""" +MCP (Model Context Protocol) server for integration testing. + +Implements a subset of MCP methods over JSON-RPC 2.0 / HTTP +using only the Python standard library. +""" + +import json +import uuid +from http.server import BaseHTTPRequestHandler, HTTPServer + +SESSION_ID = str(uuid.uuid4()) + +KNOWN_TOOLS = { + "get-weather": "Sunny, 72\u00b0F in the requested location", + "calculator": "42", +} + +KNOWN_RESOURCES = { + "file:///home/user/documents/report.pdf": { + "uri": "file:///home/user/documents/report.pdf", + "mimeType": "application/pdf", + "text": "Sample report content", + }, +} + +KNOWN_PROMPTS = { + "analyze-code": { + "description": "Analyzes code for potential issues", + "messages": [ + { + "role": "user", + "content": {"type": "text", "text": "Analyze this code"}, + } + ], + }, +} + + +def make_response(result, req_id): + return {"jsonrpc": "2.0", "result": result, "id": req_id} + + +def make_error(code, message, req_id): + return { + "jsonrpc": "2.0", + "error": {"code": code, "message": message}, + "id": req_id, + } + + +def handle_initialize(params, req_id): + return make_response( + { + "protocolVersion": "2025-03-26", + "capabilities": {"tools": {}, "resources": {}, "prompts": {}}, + "serverInfo": {"name": "test-mcp-server", "version": "1.0"}, + }, + req_id, + ) + + +def handle_tools_list(_params, req_id): + tools = [{"name": name} for name in KNOWN_TOOLS] + return make_response({"tools": tools}, req_id) + + +def handle_tools_call(params, req_id): + name = params.get("name", "") if params else "" + if name not in KNOWN_TOOLS: + return make_error(-32602, f"Unknown tool: {name}", req_id) + content = [{"type": "text", "text": KNOWN_TOOLS[name]}] + return make_response({"content": content}, req_id) + + +def handle_resources_read(params, req_id): + uri = params.get("uri", "") if params else "" + if uri not in KNOWN_RESOURCES: + return make_error(-32602, f"Unknown resource: {uri}", req_id) + return make_response({"contents": [KNOWN_RESOURCES[uri]]}, req_id) + + +def handle_prompts_get(params, req_id): + name = params.get("name", "") if params else "" + if name not in KNOWN_PROMPTS: + return make_error(-32602, f"Unknown prompt: {name}", req_id) + return make_response(KNOWN_PROMPTS[name], req_id) + + +def handle_ping(_params, req_id): + return make_response({}, req_id) + + +DISPATCH = { + "initialize": handle_initialize, + "tools/list": handle_tools_list, + "tools/call": handle_tools_call, + "resources/read": handle_resources_read, + "prompts/get": handle_prompts_get, + "ping": handle_ping, +} + + +class Handler(BaseHTTPRequestHandler): + def do_POST(self): + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length).decode("utf-8") + + try: + req = json.loads(body) + except json.JSONDecodeError: + self._send_json( + make_error(-32700, "Parse error", None), 200 + ) + return + + if req.get("jsonrpc") != "2.0": + self._send_json( + make_error(-32600, "Invalid Request: missing jsonrpc 2.0", + req.get("id")), + 200, + ) + return + + method = req.get("method", "") + params = req.get("params") + req_id = req.get("id") + + handler = DISPATCH.get(method) + if handler is None: + self._send_json( + make_error(-32601, f"Method not found: {method}", req_id), 200 + ) + return + + resp = handler(params, req_id) + self._send_json(resp, 200) + + def _send_json(self, obj, status): + resp_body = json.dumps(obj).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(resp_body))) + self.send_header("Mcp-Session-Id", SESSION_ID) + self.end_headers() + self.wfile.write(resp_body) + + def do_GET(self): + if self.path == "/smoke": + self.send_response(200) + self.end_headers() + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format, *args): + print(f"[mcp-server] {format % args}") + + +if __name__ == "__main__": + server = HTTPServer(("0.0.0.0", 8080), Handler) + print("MCP server running on port 8080") + server.serve_forever() diff --git a/internal/test/integration/docker-compose-python-mcp.yml b/internal/test/integration/docker-compose-python-mcp.yml new file mode 100644 index 0000000000..27a86baeb0 --- /dev/null +++ b/internal/test/integration/docker-compose-python-mcp.yml @@ -0,0 +1,93 @@ +version: "3.8" + +services: + testserver: + build: + context: ../../../internal/test/integration/components/pythonmcp/ + dockerfile: Dockerfile + image: hatest-testserver-python-mcp + ports: + - "${TEST_SERVICE_PORTS}" + depends_on: + otelcol: + condition: service_started + + obi: + build: + context: ../../.. + dockerfile: ./internal/test/integration/components/obi/Dockerfile + volumes: + - ./configs/:/configs + - ./system/sys/kernel/security:/sys/kernel/security + - ../../../testoutput:/coverage + - ../../../testoutput/run-python-mcp:/var/run/obi + image: hatest-obi + privileged: true + network_mode: "service:testserver" + pid: "service:testserver" + environment: + OTEL_EBPF_CONFIG_PATH: "/configs/obi-config.yml" + GOCOVERDIR: "/coverage" + OTEL_EBPF_TRACE_PRINTER: "text" + OTEL_EBPF_OPEN_PORT: "${OTEL_EBPF_OPEN_PORT}" + OTEL_EBPF_DISCOVERY_POLL_INTERVAL: 500ms + OTEL_EBPF_EXECUTABLE_PATH: "${OTEL_EBPF_EXECUTABLE_PATH}" + OTEL_EBPF_SERVICE_NAMESPACE: "integration-test" + OTEL_EBPF_METRICS_INTERVAL: "10ms" + OTEL_EBPF_BPF_BATCH_TIMEOUT: "10ms" + OTEL_EBPF_OTLP_TRACES_BATCH_TIMEOUT: "1ms" + OTEL_EBPF_LOG_LEVEL: "DEBUG" + OTEL_EBPF_BPF_DEBUG: "TRUE" + OTEL_EBPF_HOSTNAME: "obi" + OTEL_EBPF_BPF_HTTP_REQUEST_TIMEOUT: "5s" + OTEL_EBPF_PROCESSES_INTERVAL: "100ms" + OTEL_EBPF_METRICS_FEATURES: "application" + OTEL_EBPF_BPF_BUFFER_SIZE_HTTP: 1024 + OTEL_EBPF_HTTP_MCP_ENABLED: true + depends_on: + testserver: + condition: service_started + + # OpenTelemetry Collector + otelcol: + image: otel/opentelemetry-collector-contrib:0.150.1@sha256:a516c26968aa1feb5e5fc0562e3338ea13755cb4f373603226bcc4e276374ad0 + container_name: otel-col + deploy: + resources: + limits: + memory: 125M + restart: unless-stopped + command: ["--config=/etc/otelcol-config/otelcol-config.yml"] + volumes: + - ./configs/:/etc/otelcol-config + ports: + - "4317" # OTLP over gRPC receiver + - "4318:4318" # OTLP over HTTP receiver + - "9464" # Prometheus exporter + - "8888" # metrics endpoint + depends_on: + prometheus: + condition: service_started + + # Prometheus + prometheus: + image: quay.io/prometheus/prometheus:v3.11.0@sha256:131bf4c9d8a0337782ea8b753249f4903afac01379f3cced87ceaf8ca82ab9f3 + container_name: prometheus + command: + - --config.file=/etc/prometheus/prometheus-config.yml + - --web.enable-lifecycle + - --web.route-prefix=/ + volumes: + - ./configs/:/etc/prometheus + ports: + - "9090:9090" + + jaeger: + image: jaegertracing/all-in-one:1.60@sha256:4fd2d70fa347d6a47e79fcb06b1c177e6079f92cba88b083153d56263082135e + ports: + - "16686:16686" # Query frontend + - "4317" # OTEL GRPC traces collector + - "4318" # OTEL HTTP traces collector + environment: + - COLLECTOR_OTLP_ENABLED=true + - LOG_LEVEL=debug diff --git a/internal/test/integration/red_test_python_jsonrpc.go b/internal/test/integration/red_test_python_jsonrpc.go index c6524d8199..460337c49a 100644 --- a/internal/test/integration/red_test_python_jsonrpc.go +++ b/internal/test/integration/red_test_python_jsonrpc.go @@ -66,10 +66,11 @@ func testPythonJSONRPCServer(t *testing.T) { require.GreaterOrEqual(ct, len(traces), 1) lastTrace := traces[len(traces)-1] - require.GreaterOrEqual(ct, len(lastTrace.Spans), 1) - span := lastTrace.Spans[0] - - assert.Equal(ct, "tools/list", span.OperationName) + // The trace may contain child spans ("in queue", "processing"); + // locate the JSON-RPC server span by its expected operation name. + res := lastTrace.FindByOperationName("tools/list", "server") + require.GreaterOrEqual(ct, len(res), 1) + span := res[0] tag, found := jaeger.FindIn(span.Tags, "rpc.method") assert.True(ct, found, "rpc.method tag not found") @@ -108,11 +109,9 @@ func testPythonJSONRPCServer(t *testing.T) { require.GreaterOrEqual(ct, len(traces), 1) lastTrace := traces[len(traces)-1] - require.GreaterOrEqual(ct, len(lastTrace.Spans), 1) - span := lastTrace.Spans[0] - - // Span name should be the JSON-RPC method - assert.Equal(ct, "nonexistent/method", span.OperationName) + res := lastTrace.FindByOperationName("nonexistent/method", "server") + require.GreaterOrEqual(ct, len(res), 1) + span := res[0] // Span status should be error tag, found := jaeger.FindIn(span.Tags, "otel.status_code") diff --git a/internal/test/integration/red_test_python_mcp.go b/internal/test/integration/red_test_python_mcp.go new file mode 100644 index 0000000000..7794fbcaae --- /dev/null +++ b/internal/test/integration/red_test_python_mcp.go @@ -0,0 +1,202 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package integration // import "go.opentelemetry.io/obi/internal/test/integration" + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + neturl "net/url" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/internal/test/integration/components/jaeger" +) + +// mcpCall sends a JSON-RPC 2.0 MCP request over HTTP and returns the response. +// Optional headers are applied as key-value pairs to the outgoing request. +func mcpCall(url, method string, id int, params any, headers ...string) (*http.Response, error) { + reqBody := map[string]any{ + "jsonrpc": "2.0", + "method": method, + "id": id, + } + if params != nil { + reqBody["params"] = params + } + body, err := json.Marshal(reqBody) + if err != nil { + return nil, err + } + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body)) //nolint:noctx + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + for i := 0; i+1 < len(headers); i += 2 { + req.Header.Set(headers[i], headers[i+1]) + } + return http.DefaultClient.Do(req) +} + +func testPythonMCPServer(t *testing.T) { + const ( + comm = "python3.14" + address = "http://localhost:8381/mcp" + ) + + var tq jaeger.TracesQuery + params := neturl.Values{} + params.Add("service", comm) + fullJaegerURL := fmt.Sprintf("%s?%s", jaegerQueryURL, params.Encode()) + + // Test 1: tools/call with a known tool — verify MCP span attributes. + require.EventuallyWithT(t, func(ct *assert.CollectT) { + resp, err := mcpCall(address, "tools/call", 1, map[string]any{"name": "get-weather"}, + "Mcp-Session-Id", "test-session-42") + require.NoError(ct, err) + require.Equal(ct, http.StatusOK, resp.StatusCode) + + resp, err = http.Get(fullJaegerURL) //nolint:noctx + require.NoError(ct, err) + if resp == nil { + return + } + require.Equal(ct, http.StatusOK, resp.StatusCode) + + require.NoError(ct, json.NewDecoder(resp.Body).Decode(&tq)) + + // Find traces with MCP method attribute + traces := tq.FindBySpan(jaeger.Tag{Key: "mcp.method.name", Type: "string", Value: "tools/call"}) + require.GreaterOrEqual(ct, len(traces), 1) + + lastTrace := traces[len(traces)-1] + // The trace may contain child spans ("in queue", "processing"); + // locate the MCP server span by its expected operation name. + res := lastTrace.FindByOperationName("execute_tool get-weather", "server") + require.GreaterOrEqual(ct, len(res), 1) + span := res[0] + + tag, found := jaeger.FindIn(span.Tags, "mcp.method.name") + assert.True(ct, found, "mcp.method.name tag not found") + assert.Equal(ct, "tools/call", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "gen_ai.operation.name") + assert.True(ct, found, "gen_ai.operation.name tag not found") + assert.Equal(ct, "execute_tool", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "gen_ai.tool.name") + assert.True(ct, found, "gen_ai.tool.name tag not found") + assert.Equal(ct, "get-weather", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "mcp.session.id") + assert.True(ct, found, "mcp.session.id tag not found") + assert.Equal(ct, "test-session-42", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "jsonrpc.request.id") + assert.True(ct, found, "jsonrpc.request.id tag not found") + assert.Equal(ct, "1", tag.Value) + }, testTimeout, 100*time.Millisecond) + + // Test 2: tools/call with an unknown tool — verify MCP error span attributes. + var tqErr jaeger.TracesQuery + require.EventuallyWithT(t, func(ct *assert.CollectT) { + resp, err := mcpCall(address, "tools/call", 2, map[string]any{"name": "nonexistent"}) + require.NoError(ct, err) + require.Equal(ct, http.StatusOK, resp.StatusCode) + + resp, err = http.Get(fullJaegerURL) //nolint:noctx + require.NoError(ct, err) + if resp == nil { + return + } + require.Equal(ct, http.StatusOK, resp.StatusCode) + + require.NoError(ct, json.NewDecoder(resp.Body).Decode(&tqErr)) + + // Find traces with the error tool call + traces := tqErr.FindBySpan( + jaeger.Tag{Key: "mcp.method.name", Type: "string", Value: "tools/call"}, + jaeger.Tag{Key: "gen_ai.tool.name", Type: "string", Value: "nonexistent"}, + ) + require.GreaterOrEqual(ct, len(traces), 1) + + lastTrace := traces[len(traces)-1] + res := lastTrace.FindByOperationName("execute_tool nonexistent", "server") + require.GreaterOrEqual(ct, len(res), 1) + span := res[0] + + // Span status should be error (MCP JSON-RPC error) + tag, found := jaeger.FindIn(span.Tags, "otel.status_code") + assert.True(ct, found, "otel.status_code tag not found") + assert.Equal(ct, "ERROR", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "rpc.response.status_code") + assert.True(ct, found, "rpc.response.status_code tag not found") + assert.Equal(ct, "-32602", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "error.message") + assert.True(ct, found, "error.message tag not found") + assert.Contains(ct, tag.Value.(string), "Unknown tool") + }, testTimeout, 100*time.Millisecond) +} + +func testPythonMCPInitialize(t *testing.T) { + const ( + comm = "python3.14" + address = "http://localhost:8381/mcp" + ) + + var tq jaeger.TracesQuery + params := neturl.Values{} + params.Add("service", comm) + fullJaegerURL := fmt.Sprintf("%s?%s", jaegerQueryURL, params.Encode()) + + require.EventuallyWithT(t, func(ct *assert.CollectT) { + resp, err := mcpCall(address, "initialize", 10, map[string]any{ + "protocolVersion": "2025-03-26", + "capabilities": map[string]any{}, + "clientInfo": map[string]any{"name": "TestClient", "version": "1.0"}, + }) + require.NoError(ct, err) + require.Equal(ct, http.StatusOK, resp.StatusCode) + + resp, err = http.Get(fullJaegerURL) //nolint:noctx + require.NoError(ct, err) + if resp == nil { + return + } + require.Equal(ct, http.StatusOK, resp.StatusCode) + + require.NoError(ct, json.NewDecoder(resp.Body).Decode(&tq)) + + traces := tq.FindBySpan(jaeger.Tag{Key: "mcp.method.name", Type: "string", Value: "initialize"}) + require.GreaterOrEqual(ct, len(traces), 1) + + lastTrace := traces[len(traces)-1] + res := lastTrace.FindByOperationName("initialize", "server") + require.GreaterOrEqual(ct, len(res), 1) + span := res[0] + + tag, found := jaeger.FindIn(span.Tags, "mcp.method.name") + assert.True(ct, found, "mcp.method.name tag not found") + assert.Equal(ct, "initialize", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "gen_ai.operation.name") + assert.True(ct, found, "gen_ai.operation.name tag not found") + assert.Equal(ct, "initialize", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "mcp.protocol.version") + assert.True(ct, found, "mcp.protocol.version tag not found") + assert.Equal(ct, "2025-03-26", tag.Value) + + tag, found = jaeger.FindIn(span.Tags, "jsonrpc.request.id") + assert.True(ct, found, "jsonrpc.request.id tag not found") + assert.Equal(ct, "10", tag.Value) + }, testTimeout, 100*time.Millisecond) +} diff --git a/internal/test/integration/suites_test.go b/internal/test/integration/suites_test.go index 65c8dee626..d4c9e7a0a9 100644 --- a/internal/test/integration/suites_test.go +++ b/internal/test/integration/suites_test.go @@ -639,6 +639,17 @@ func TestSuite_PythonJsonRPC(t *testing.T) { require.NoError(t, compose.Close()) } +func TestSuite_PythonMCP(t *testing.T) { + compose, err := docker.ComposeSuite("docker-compose-python-mcp.yml", path.Join(pathOutput, "test-suite-python-mcp.log")) + require.NoError(t, err) + + compose.Env = append(compose.Env, `OTEL_EBPF_OPEN_PORT=8080`, `OTEL_EBPF_EXECUTABLE_PATH=`, `TEST_SERVICE_PORTS=8381:8080`) + require.NoError(t, compose.Up()) + t.Run("Python MCP server span", testPythonMCPServer) + t.Run("Python MCP initialize", testPythonMCPInitialize) + require.NoError(t, compose.Close()) +} + func TestSuite_PythonElasticsearch(t *testing.T) { compose, err := docker.ComposeSuite("docker-compose-elasticsearch.yml", path.Join(pathOutput, "test-suite-elasticsearch.log")) require.NoError(t, err) diff --git a/pkg/appolly/app/request/span.go b/pkg/appolly/app/request/span.go index d7ccb1263b..ff761a1be3 100644 --- a/pkg/appolly/app/request/span.go +++ b/pkg/appolly/app/request/span.go @@ -97,13 +97,15 @@ const ( HTTPSubtypeGemini = 8 // http + Google AI Studio (Gemini) HTTPSubtypeJSONRPC = 9 // http + JSON-RPC HTTPSubtypeAWSBedrock = 10 // http + AWS Bedrock + HTTPSubtypeMCP = 11 // http + Model Context Protocol ) func IsGenAISubtype(subtype int) bool { return subtype == HTTPSubtypeOpenAI || subtype == HTTPSubtypeAnthropic || subtype == HTTPSubtypeGemini || - subtype == HTTPSubtypeAWSBedrock + subtype == HTTPSubtypeAWSBedrock || + subtype == HTTPSubtypeMCP } //nolint:cyclop @@ -253,6 +255,7 @@ type GenAI struct { Anthropic *VendorAnthropic Gemini *VendorGemini Bedrock *VendorBedrock + MCP *MCPCall } type OpenAIUsage struct { @@ -609,6 +612,28 @@ func (b *VendorBedrock) GetStopReason() string { return "" } +// MCPCall holds parsed data from a Model Context Protocol request/response. +type MCPCall struct { + Method string `json:"method"` // mcp.method.name + ToolName string `json:"toolName,omitempty"` // gen_ai.tool.name (tools/call) + ResourceURI string `json:"resourceUri,omitempty"` // mcp.resource.uri (resources/read) + PromptName string `json:"promptName,omitempty"` // gen_ai.prompt.name (prompts/get) + SessionID string `json:"sessionId,omitempty"` // mcp.session.id + ProtocolVer string `json:"protocolVer,omitempty"` // mcp.protocol.version + RequestID string `json:"requestId,omitempty"` // jsonrpc.request.id + ErrorCode int `json:"errorCode,omitempty"` // JSON-RPC error code + ErrorMessage string `json:"errorMessage,omitempty"` // JSON-RPC error message +} + +// OperationName returns the GenAI operation name for the MCP method. +// tools/call maps to execute_tool; other methods return the method name as-is. +func (m *MCPCall) OperationName() string { + if m.Method == "tools/call" { + return "execute_tool" + } + return m.Method +} + type JSONRPC struct { Method string `json:"method"` Version string `json:"version"` @@ -1017,10 +1042,16 @@ func SpanStatusMessage(span *Span) string { if span.SubType == HTTPSubtypeJSONRPC && span.JSONRPC != nil && span.JSONRPC.ErrorMessage != "" { return span.JSONRPC.ErrorMessage } + if span.SubType == HTTPSubtypeMCP && span.GenAI != nil && span.GenAI.MCP != nil && span.GenAI.MCP.ErrorMessage != "" { + return span.GenAI.MCP.ErrorMessage + } case EventTypeHTTP: if span.SubType == HTTPSubtypeJSONRPC && span.JSONRPC != nil && span.JSONRPC.ErrorMessage != "" { return span.JSONRPC.ErrorMessage } + if span.SubType == HTTPSubtypeMCP && span.GenAI != nil && span.GenAI.MCP != nil && span.GenAI.MCP.ErrorMessage != "" { + return span.GenAI.MCP.ErrorMessage + } } return "" } @@ -1036,6 +1067,11 @@ func HTTPSpanStatusCode(span *Span) string { return StatusCodeError } + // MCP errors are signaled in the JSON-RPC response body. + if span.SubType == HTTPSubtypeMCP && span.GenAI != nil && span.GenAI.MCP != nil && span.GenAI.MCP.ErrorCode != 0 { + return StatusCodeError + } + if span.Type == EventTypeHTTPClient { if span.Status < 400 { // this is possibly not needed, because in my experiments they @@ -1254,6 +1290,14 @@ func (s *Span) TraceName() string { return "invoke_model" } + if s.SubType == HTTPSubtypeMCP && s.GenAI != nil && s.GenAI.MCP != nil { + op := s.GenAI.MCP.OperationName() + if s.GenAI.MCP.ToolName != "" { + return op + " " + s.GenAI.MCP.ToolName + } + return op + } + if s.SubType == HTTPSubtypeJSONRPC && s.JSONRPC != nil { if s.JSONRPC.Method != "" { return s.JSONRPC.Method @@ -1537,6 +1581,9 @@ func (s *Span) GenAIOperationName() string { if s.GenAI.Bedrock != nil { return "invoke_model" } + if s.GenAI.MCP != nil { + return s.GenAI.MCP.OperationName() + } return "" } diff --git a/pkg/appolly/app/request/span_test.go b/pkg/appolly/app/request/span_test.go index 0a0614a5f9..bbedbac8c8 100644 --- a/pkg/appolly/app/request/span_test.go +++ b/pkg/appolly/app/request/span_test.go @@ -296,6 +296,106 @@ func TestSpanStatusMessage_JSONRPC(t *testing.T) { } } +func TestSpanStatusCode_MCP(t *testing.T) { + tests := []struct { + name string + span *Span + expectedCode string + }{ + { + name: "server span with MCP error", + span: &Span{ + Type: EventTypeHTTP, + Status: 200, + SubType: HTTPSubtypeMCP, + GenAI: &GenAI{MCP: &MCPCall{Method: "tools/call", ErrorCode: -32602, ErrorMessage: "Unknown tool"}}, + }, + expectedCode: StatusCodeError, + }, + { + name: "server span without MCP error", + span: &Span{ + Type: EventTypeHTTP, + Status: 200, + SubType: HTTPSubtypeMCP, + GenAI: &GenAI{MCP: &MCPCall{Method: "tools/call"}}, + }, + expectedCode: StatusCodeUnset, + }, + { + name: "client span with MCP error", + span: &Span{ + Type: EventTypeHTTPClient, + Status: 200, + SubType: HTTPSubtypeMCP, + GenAI: &GenAI{MCP: &MCPCall{Method: "tools/call", ErrorCode: -32600, ErrorMessage: "Invalid Request"}}, + }, + expectedCode: StatusCodeError, + }, + { + name: "client span without MCP error", + span: &Span{ + Type: EventTypeHTTPClient, + Status: 200, + SubType: HTTPSubtypeMCP, + GenAI: &GenAI{MCP: &MCPCall{Method: "tools/call"}}, + }, + expectedCode: StatusCodeUnset, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expectedCode, SpanStatusCode(tt.span)) + }) + } +} + +func TestSpanStatusMessage_MCP(t *testing.T) { + tests := []struct { + name string + span *Span + expectedMessage string + }{ + { + name: "server span with MCP error message", + span: &Span{ + Type: EventTypeHTTP, + Status: 200, + SubType: HTTPSubtypeMCP, + GenAI: &GenAI{MCP: &MCPCall{Method: "tools/call", ErrorCode: -32602, ErrorMessage: "Unknown tool"}}, + }, + expectedMessage: "Unknown tool", + }, + { + name: "client span with MCP error message", + span: &Span{ + Type: EventTypeHTTPClient, + Status: 200, + SubType: HTTPSubtypeMCP, + GenAI: &GenAI{MCP: &MCPCall{Method: "tools/call", ErrorCode: -32600, ErrorMessage: "Invalid Request"}}, + }, + expectedMessage: "Invalid Request", + }, + { + name: "server span without MCP error", + span: &Span{ + Type: EventTypeHTTP, + Status: 200, + SubType: HTTPSubtypeMCP, + GenAI: &GenAI{MCP: &MCPCall{Method: "tools/call"}}, + }, + expectedMessage: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expectedMessage, SpanStatusMessage(tt.span)) + }) + } +} + type jsonObject = map[string]any func deserializeJSONObject(data []byte) (jsonObject, error) { diff --git a/pkg/config/payload_extraction.go b/pkg/config/payload_extraction.go index 95eb0ad7b0..16207227d8 100644 --- a/pkg/config/payload_extraction.go +++ b/pkg/config/payload_extraction.go @@ -77,11 +77,14 @@ type GenAIConfig struct { Gemini GeminiConfig `yaml:"gemini"` // AWS Bedrock payload extraction and parsing Bedrock BedrockConfig `yaml:"bedrock"` + // Model Context Protocol (MCP) payload extraction and parsing + MCP MCPConfig `yaml:"mcp"` } func (g *GenAIConfig) Enabled() bool { return g.Anthropic.Enabled || g.OpenAI.Enabled || - g.Gemini.Enabled || g.Bedrock.Enabled + g.Gemini.Enabled || g.Bedrock.Enabled || + g.MCP.Enabled } type OpenAIConfig struct { @@ -104,6 +107,11 @@ type BedrockConfig struct { Enabled bool `yaml:"enabled" env:"OTEL_EBPF_HTTP_BEDROCK_ENABLED" validate:"boolean"` } +type MCPConfig struct { + // Enable Model Context Protocol (MCP) payload extraction and parsing + Enabled bool `yaml:"enabled" env:"OTEL_EBPF_HTTP_MCP_ENABLED" validate:"boolean"` +} + type JSONRPCConfig struct { // Enable JSON-RPC payload extraction and parsing Enabled bool `yaml:"enabled" env:"OTEL_EBPF_HTTP_JSONRPC_ENABLED" validate:"boolean"` diff --git a/pkg/config/payload_extraction_test.go b/pkg/config/payload_extraction_test.go index 8632c0e29d..c229042b0b 100644 --- a/pkg/config/payload_extraction_test.go +++ b/pkg/config/payload_extraction_test.go @@ -225,3 +225,23 @@ func TestEnrichmentConfig_Validate_EmptyRules(t *testing.T) { cfg := EnrichmentConfig{} assert.NoError(t, cfg.Validate()) } + +func TestGenAIConfig_Enabled(t *testing.T) { + tests := []struct { + name string + cfg GenAIConfig + enabled bool + }{ + {name: "all disabled", cfg: GenAIConfig{}, enabled: false}, + {name: "openai", cfg: GenAIConfig{OpenAI: OpenAIConfig{Enabled: true}}, enabled: true}, + {name: "anthropic", cfg: GenAIConfig{Anthropic: AnthropicConfig{Enabled: true}}, enabled: true}, + {name: "gemini", cfg: GenAIConfig{Gemini: GeminiConfig{Enabled: true}}, enabled: true}, + {name: "bedrock", cfg: GenAIConfig{Bedrock: BedrockConfig{Enabled: true}}, enabled: true}, + {name: "mcp", cfg: GenAIConfig{MCP: MCPConfig{Enabled: true}}, enabled: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.enabled, tt.cfg.Enabled()) + }) + } +} diff --git a/pkg/ebpf/common/http/mcp.go b/pkg/ebpf/common/http/mcp.go new file mode 100644 index 0000000000..6d92e72655 --- /dev/null +++ b/pkg/ebpf/common/http/mcp.go @@ -0,0 +1,240 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ebpfcommon // import "go.opentelemetry.io/obi/pkg/ebpf/common/http" + +import ( + "bytes" + "encoding/json" + "io" + "log/slog" + "net/http" + + "go.opentelemetry.io/obi/pkg/appolly/app/request" +) + +// mcpMethods enumerates known MCP JSON-RPC method names. +var mcpMethods = map[string]struct{}{ + "initialize": {}, + "notifications/initialized": {}, + "tools/call": {}, + "tools/list": {}, + "resources/read": {}, + "resources/list": {}, + "resources/subscribe": {}, + "resources/unsubscribe": {}, + "resources/templates/list": {}, + "prompts/get": {}, + "prompts/list": {}, + "completion/complete": {}, + "logging/setLevel": {}, + "notifications/cancelled": {}, + "notifications/resources/updated": {}, + "notifications/tools/list_changed": {}, + "notifications/prompts/list_changed": {}, + "ping": {}, +} + +// ambiguousMethods lists JSON-RPC method names shared with other protocols +// (e.g. LSP). Each entry maps to a disambiguator function that returns true +// when the request carries an MCP-specific signal beyond the method name. +// The Mcp-Session-Id header is checked before consulting this map; entries +// here only need to handle the no-session-header case. +var ambiguousMethods = map[string]func(json.RawMessage) bool{ + "initialize": hasMCPProtocolVersion, + "ping": func(json.RawMessage) bool { return false }, +} + +// mcpSessionHeader is the HTTP header that carries the MCP session identifier. +const mcpSessionHeader = "Mcp-Session-Id" + +// mcpRequest is the JSON-RPC 2.0 request envelope used by MCP. +type mcpRequest struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + ID json.RawMessage `json:"id,omitempty"` + Params json.RawMessage `json:"params,omitempty"` +} + +// mcpResponse is the JSON-RPC 2.0 response envelope used by MCP. +type mcpResponse struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *jsonRPCError `json:"error,omitempty"` +} + +// Param structures for extracting method-specific fields. + +type mcpToolCallParams struct { + Name string `json:"name"` +} + +type mcpResourceParams struct { + URI string `json:"uri"` +} + +type mcpPromptParams struct { + Name string `json:"name"` +} + +type mcpInitializeParams struct { + ProtocolVersion string `json:"protocolVersion"` +} + +type mcpInitializeResult struct { + ProtocolVersion string `json:"protocolVersion"` +} + +// MCPSpan detects and parses an MCP JSON-RPC request/response pair. +// It returns the enriched span and true when the request is a valid MCP call, +// or the original span and false otherwise. +func MCPSpan(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) { + if req.Method != http.MethodPost { + return *baseSpan, false + } + + sessionID := req.Header.Get(mcpSessionHeader) + if sessionID == "" && resp != nil && resp.Header != nil { + sessionID = resp.Header.Get(mcpSessionHeader) + } + + reqB, err := io.ReadAll(req.Body) + if err != nil { + return *baseSpan, false + } + req.Body = io.NopCloser(bytes.NewBuffer(reqB)) + + reqB = bytes.TrimSpace(reqB) + // NOTE: JSON-RPC 2.0 also permits batch requests (arrays), but MCP + // batch support is not implemented yet. Only single-object requests + // are handled here; batch requests fall through to the generic + // JSON-RPC parser in jsonrpc.go. + if len(reqB) == 0 || reqB[0] != '{' { + return *baseSpan, false + } + + var rpcReq mcpRequest + if err := json.Unmarshal(reqB, &rpcReq); err != nil { + return *baseSpan, false + } + + // MCP requires JSON-RPC 2.0. + if rpcReq.JSONRPC != "2.0" { + return *baseSpan, false + } + + if _, known := mcpMethods[rpcReq.Method]; !known { + // Not a recognized MCP method. Check whether the session header + // was present — that still qualifies the request as MCP even if + // the method is unknown (e.g. a custom extension method). + if sessionID == "" { + return *baseSpan, false + } + } else if disambiguate, ambiguous := ambiguousMethods[rpcReq.Method]; ambiguous && sessionID == "" { + // Generic method names like "initialize" and "ping" are shared + // with other JSON-RPC protocols (e.g. LSP). Without the MCP + // session header, consult the per-method disambiguator. + if !disambiguate(rpcReq.Params) { + return *baseSpan, false + } + } + + slog.Debug("MCP", "method", rpcReq.Method, "session", sessionID) + + result := &request.MCPCall{ + Method: rpcReq.Method, + SessionID: sessionID, + } + + if len(rpcReq.ID) > 0 && string(rpcReq.ID) != "null" { + result.RequestID = rawIDString(rpcReq.ID) + } + + parseMCPParams(rpcReq, result) + + // Parse response for error and protocol version. + if resp != nil && resp.Body != nil { + respB, err := getResponseBody(resp) + if err == nil && len(respB) > 0 { + parseMCPResponse(respB, result) + } + } + + baseSpan.SubType = request.HTTPSubtypeMCP + baseSpan.GenAI = &request.GenAI{ + MCP: result, + } + + return *baseSpan, true +} + +// hasMCPProtocolVersion checks whether the params contain a protocolVersion +// field, which is specific to MCP's initialize method. +func hasMCPProtocolVersion(params json.RawMessage) bool { + if len(params) == 0 { + return false + } + var p mcpInitializeParams + return json.Unmarshal(params, &p) == nil && p.ProtocolVersion != "" +} + +// parseMCPParams extracts method-specific fields from the request params. +func parseMCPParams(rpcReq mcpRequest, result *request.MCPCall) { + if len(rpcReq.Params) == 0 { + return + } + + switch rpcReq.Method { + case "tools/call": + var p mcpToolCallParams + if json.Unmarshal(rpcReq.Params, &p) == nil { + result.ToolName = p.Name + } + case "resources/read", "resources/subscribe", "resources/unsubscribe": + var p mcpResourceParams + if json.Unmarshal(rpcReq.Params, &p) == nil { + result.ResourceURI = p.URI + } + case "prompts/get": + var p mcpPromptParams + if json.Unmarshal(rpcReq.Params, &p) == nil { + result.PromptName = p.Name + } + case "initialize": + var p mcpInitializeParams + if json.Unmarshal(rpcReq.Params, &p) == nil { + result.ProtocolVer = p.ProtocolVersion + } + } +} + +// parseMCPResponse extracts error information and protocol version from the response. +func parseMCPResponse(data []byte, result *request.MCPCall) { + data = bytes.TrimSpace(data) + if len(data) == 0 || data[0] != '{' { + return + } + + var resp mcpResponse + if err := json.Unmarshal(data, &resp); err != nil { + return + } + + if resp.JSONRPC != "2.0" { + return + } + + if resp.Error != nil { + result.ErrorCode = resp.Error.Code + result.ErrorMessage = resp.Error.Message + } + + // For initialize responses, extract the negotiated protocol version. + if result.Method == "initialize" && len(resp.Result) > 0 { + var initResult mcpInitializeResult + if json.Unmarshal(resp.Result, &initResult) == nil && initResult.ProtocolVersion != "" { + result.ProtocolVer = initResult.ProtocolVersion + } + } +} diff --git a/pkg/ebpf/common/http/mcp_test.go b/pkg/ebpf/common/http/mcp_test.go new file mode 100644 index 0000000000..2749a1acce --- /dev/null +++ b/pkg/ebpf/common/http/mcp_test.go @@ -0,0 +1,432 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ebpfcommon + +import ( + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/obi/pkg/appolly/app/request" +) + +func makeMCPRequest(t *testing.T, method, url, body string) *http.Request { + t.Helper() + req, err := http.NewRequest(method, url, strings.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + return req +} + +const mcpToolCallRequest = `{ + "jsonrpc": "2.0", + "method": "tools/call", + "params": { + "name": "get-weather", + "arguments": {"location": "San Francisco"} + }, + "id": 1 +}` + +const mcpToolCallResponse = `{ + "jsonrpc": "2.0", + "result": { + "content": [{"type": "text", "text": "Sunny, 72°F"}] + }, + "id": 1 +}` + +const mcpToolCallErrorResponse = `{ + "jsonrpc": "2.0", + "error": { + "code": -32602, + "message": "Unknown tool: nonexistent" + }, + "id": 2 +}` + +const mcpResourceReadRequest = `{ + "jsonrpc": "2.0", + "method": "resources/read", + "params": { + "uri": "file:///home/user/documents/report.pdf" + }, + "id": 3 +}` + +const mcpResourceReadResponse = `{ + "jsonrpc": "2.0", + "result": { + "contents": [{"uri": "file:///home/user/documents/report.pdf", "mimeType": "application/pdf", "text": "..."}] + }, + "id": 3 +}` + +const mcpPromptGetRequest = `{ + "jsonrpc": "2.0", + "method": "prompts/get", + "params": { + "name": "analyze-code" + }, + "id": 4 +}` + +const mcpPromptGetResponse = `{ + "jsonrpc": "2.0", + "result": { + "description": "Analyzes code for potential issues", + "messages": [{"role": "user", "content": {"type": "text", "text": "Analyze this code"}}] + }, + "id": 4 +}` + +const mcpInitializeRequest = `{ + "jsonrpc": "2.0", + "method": "initialize", + "params": { + "protocolVersion": "2025-03-26", + "capabilities": {}, + "clientInfo": {"name": "TestClient", "version": "1.0"} + }, + "id": 5 +}` + +const mcpInitializeResponse = `{ + "jsonrpc": "2.0", + "result": { + "protocolVersion": "2025-03-26", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "TestServer", "version": "1.0"} + }, + "id": 5 +}` + +const mcpToolsListRequest = `{ + "jsonrpc": "2.0", + "method": "tools/list", + "params": {}, + "id": 6 +}` + +const mcpToolsListResponse = `{ + "jsonrpc": "2.0", + "result": { + "tools": [{"name": "get-weather", "description": "Get weather info"}] + }, + "id": 6 +}` + +const mcpPingRequest = `{ + "jsonrpc": "2.0", + "method": "ping", + "id": 7 +}` + +const mcpPingResponse = `{ + "jsonrpc": "2.0", + "result": {}, + "id": 7 +}` + +func mcpHeaders() http.Header { + h := http.Header{} + h.Set("Content-Type", "application/json") + return h +} + +func TestMCPSpan_ToolCall(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpToolCallRequest) + req.Header.Set("Mcp-Session-Id", "sess-abc-123") + resp := makePlainResponse(http.StatusOK, mcpHeaders(), mcpToolCallResponse) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, request.HTTPSubtypeMCP, span.SubType) + assert.Equal(t, "tools/call", mcp.Method) + assert.Equal(t, "get-weather", mcp.ToolName) + assert.Equal(t, "sess-abc-123", mcp.SessionID) + assert.Equal(t, "1", mcp.RequestID) + assert.Equal(t, 0, mcp.ErrorCode) + assert.Empty(t, mcp.ErrorMessage) + assert.Equal(t, "execute_tool", mcp.OperationName()) +} + +func TestMCPSpan_ToolCallError(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpToolCallRequest) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), mcpToolCallErrorResponse) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, "tools/call", mcp.Method) + assert.Equal(t, "get-weather", mcp.ToolName) + assert.Equal(t, -32602, mcp.ErrorCode) + assert.Equal(t, "Unknown tool: nonexistent", mcp.ErrorMessage) +} + +func TestMCPSpan_ResourceRead(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpResourceReadRequest) + req.Header.Set("Mcp-Session-Id", "sess-xyz-456") + resp := makePlainResponse(http.StatusOK, mcpHeaders(), mcpResourceReadResponse) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, "resources/read", mcp.Method) + assert.Equal(t, "file:///home/user/documents/report.pdf", mcp.ResourceURI) + assert.Equal(t, "sess-xyz-456", mcp.SessionID) + assert.Equal(t, "3", mcp.RequestID) + assert.Equal(t, "resources/read", mcp.OperationName()) +} + +func TestMCPSpan_PromptGet(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpPromptGetRequest) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), mcpPromptGetResponse) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, "prompts/get", mcp.Method) + assert.Equal(t, "analyze-code", mcp.PromptName) + assert.Equal(t, "4", mcp.RequestID) + assert.Equal(t, "prompts/get", mcp.OperationName()) +} + +func TestMCPSpan_Initialize(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpInitializeRequest) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), mcpInitializeResponse) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, "initialize", mcp.Method) + assert.Equal(t, "2025-03-26", mcp.ProtocolVer) + assert.Equal(t, "5", mcp.RequestID) + assert.Equal(t, "initialize", mcp.OperationName()) +} + +func TestMCPSpan_InitializeSessionIDFromResponseHeader(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpInitializeRequest) + headers := mcpHeaders() + headers.Set("Mcp-Session-Id", "sess-from-response") + resp := makePlainResponse(http.StatusOK, headers, mcpInitializeResponse) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, "initialize", mcp.Method) + assert.Equal(t, "sess-from-response", mcp.SessionID) + assert.Equal(t, "5", mcp.RequestID) +} + +func TestMCPSpan_ToolsList(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpToolsListRequest) + req.Header.Set("Mcp-Session-Id", "sess-tools-list") + resp := makePlainResponse(http.StatusOK, mcpHeaders(), mcpToolsListResponse) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, "tools/list", mcp.Method) + assert.Equal(t, "sess-tools-list", mcp.SessionID) + assert.Empty(t, mcp.ToolName) +} + +func TestMCPSpan_Ping(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpPingRequest) + req.Header.Set("Mcp-Session-Id", "sess-ping") + resp := makePlainResponse(http.StatusOK, mcpHeaders(), mcpPingResponse) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, "ping", mcp.Method) + assert.Equal(t, "sess-ping", mcp.SessionID) + assert.Equal(t, "7", mcp.RequestID) +} + +func TestMCPSpan_NotMCP_UnknownMethod(t *testing.T) { + body := `{"jsonrpc": "2.0", "method": "eth_getBalance", "params": ["0xabc", "latest"], "id": 1}` + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8545", body) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), `{"jsonrpc":"2.0","result":"0x1","id":1}`) + + base := &request.Span{} + _, ok := MCPSpan(base, req, resp) + + assert.False(t, ok) +} + +func TestMCPSpan_NotMCP_GetMethod(t *testing.T) { + req := makeMCPRequest(t, http.MethodGet, "http://localhost:8080/mcp", "") + resp := makePlainResponse(http.StatusOK, mcpHeaders(), "{}") + + base := &request.Span{} + _, ok := MCPSpan(base, req, resp) + + assert.False(t, ok) +} + +func TestMCPSpan_NotMCP_InvalidJSON(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", "not json") + resp := makePlainResponse(http.StatusOK, mcpHeaders(), "{}") + + base := &request.Span{} + _, ok := MCPSpan(base, req, resp) + + assert.False(t, ok) +} + +func TestMCPSpan_NotMCP_EmptyBody(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", "") + resp := makePlainResponse(http.StatusOK, mcpHeaders(), "{}") + + base := &request.Span{} + _, ok := MCPSpan(base, req, resp) + + assert.False(t, ok) +} + +func TestMCPSpan_NotMCP_NotJSONRPC2(t *testing.T) { + // A valid JSON object with an MCP method but without jsonrpc: "2.0" should be rejected. + body := `{"method": "tools/call", "params": {"name": "get-weather"}, "id": 1}` + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", body) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), "{}") + + base := &request.Span{} + _, ok := MCPSpan(base, req, resp) + + assert.False(t, ok) +} + +func TestMCPSpan_NotMCP_PingWithoutSession(t *testing.T) { + // "ping" is a generic JSON-RPC method shared with other protocols. + // Without the Mcp-Session-Id header it must not be classified as MCP. + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpPingRequest) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), mcpPingResponse) + + base := &request.Span{} + _, ok := MCPSpan(base, req, resp) + + assert.False(t, ok) +} + +func TestMCPSpan_NotMCP_InitializeWithoutProtocolVersion(t *testing.T) { + // "initialize" without protocolVersion in params and without session + // header looks like a generic JSON-RPC initialize (e.g. LSP). + body := `{"jsonrpc": "2.0", "method": "initialize", "params": {"capabilities": {}}, "id": 1}` + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", body) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), `{"jsonrpc":"2.0","result":{},"id":1}`) + + base := &request.Span{} + _, ok := MCPSpan(base, req, resp) + + assert.False(t, ok) +} + +func TestMCPSpan_UnknownMethodWithSessionHeader(t *testing.T) { + // An unknown method should still be detected as MCP if the session header is present. + body := `{"jsonrpc": "2.0", "method": "custom/extension", "params": {}, "id": 10}` + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", body) + req.Header.Set("Mcp-Session-Id", "sess-custom") + resp := makePlainResponse(http.StatusOK, mcpHeaders(), `{"jsonrpc":"2.0","result":{},"id":10}`) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + + mcp := span.GenAI.MCP + assert.Equal(t, "custom/extension", mcp.Method) + assert.Equal(t, "sess-custom", mcp.SessionID) + assert.Equal(t, "10", mcp.RequestID) +} + +func TestMCPSpan_StringRequestID(t *testing.T) { + body := `{"jsonrpc": "2.0", "method": "tools/list", "params": {}, "id": "req-abc-42"}` + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", body) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), `{"jsonrpc":"2.0","result":{"tools":[]},"id":"req-abc-42"}`) + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + assert.Equal(t, "req-abc-42", span.GenAI.MCP.RequestID) +} + +func TestMCPSpan_NoResponseBody(t *testing.T) { + req := makeMCPRequest(t, http.MethodPost, "http://localhost:8080/mcp", mcpToolCallRequest) + resp := makePlainResponse(http.StatusOK, mcpHeaders(), "") + + base := &request.Span{} + span, ok := MCPSpan(base, req, resp) + + require.True(t, ok) + require.NotNil(t, span.GenAI.MCP) + assert.Equal(t, "tools/call", span.GenAI.MCP.Method) + assert.Equal(t, "get-weather", span.GenAI.MCP.ToolName) + assert.Equal(t, 0, span.GenAI.MCP.ErrorCode) +} + +func TestMCPCall_OperationName(t *testing.T) { + tests := []struct { + method string + want string + }{ + {method: "tools/call", want: "execute_tool"}, + {method: "tools/list", want: "tools/list"}, + {method: "resources/read", want: "resources/read"}, + {method: "prompts/get", want: "prompts/get"}, + {method: "initialize", want: "initialize"}, + {method: "ping", want: "ping"}, + } + + for _, tt := range tests { + t.Run(tt.method, func(t *testing.T) { + mcp := &request.MCPCall{Method: tt.method} + assert.Equal(t, tt.want, mcp.OperationName()) + }) + } +} + +func TestIsGenAISubtype_MCP(t *testing.T) { + assert.True(t, request.IsGenAISubtype(request.HTTPSubtypeMCP)) +} diff --git a/pkg/ebpf/common/http_transform.go b/pkg/ebpf/common/http_transform.go index a79d1d7010..da1a49e4ff 100644 --- a/pkg/ebpf/common/http_transform.go +++ b/pkg/ebpf/common/http_transform.go @@ -197,6 +197,13 @@ func httpRequestResponseToSpan(parseCtx *EBPFParseContext, event *BPFHTTPInfo, r } } + if parseCtx != nil && parseCtx.payloadExtraction.HTTP.GenAI.MCP.Enabled { + span, ok := ebpfhttp.MCPSpan(&httpSpan, req, resp) + if ok { + return span + } + } + if parseCtx != nil && parseCtx.payloadExtraction.HTTP.JSONRPC.Enabled { span, ok := ebpfhttp.JSONRPCSpan(&httpSpan, req, resp) if ok { diff --git a/pkg/export/attributes/names/attrs.go b/pkg/export/attributes/names/attrs.go index 7be54cb879..bf0a54c630 100644 --- a/pkg/export/attributes/names/attrs.go +++ b/pkg/export/attributes/names/attrs.go @@ -245,6 +245,8 @@ const ( GenAIOutput = Name(semconv.GenAIOutputMessagesKey) GenAIMetadata = Name("gen_ai.metadata") GenAITools = Name(semconv.GenAIToolDefinitionsKey) + GenAIToolName = Name("gen_ai.tool.name") + GenAIPromptName = Name("gen_ai.prompt.name") ) // OBI specific GPU events @@ -260,6 +262,14 @@ const ( RPCResponseStatusCode = Name("rpc.response.status_code") ) +// MCP (Model Context Protocol) attributes +const ( + MCPMethodName = Name("mcp.method.name") + MCPSessionID = Name("mcp.session.id") + MCPProtocolVersion = Name("mcp.protocol.version") + MCPResourceURI = Name("mcp.resource.uri") +) + // DNS events const ( DNSQuestionName = Name(semconv.DNSQuestionNameKey) diff --git a/pkg/export/otel/traces_test.go b/pkg/export/otel/traces_test.go index 102b234f4d..f84f9ef3fc 100644 --- a/pkg/export/otel/traces_test.go +++ b/pkg/export/otel/traces_test.go @@ -1635,6 +1635,118 @@ func TestGenerateTracesAttributes(t *testing.T) { ensureTraceStrAttr(t, attrs, "jsonrpc.request.id", "42") ensureTraceStrAttr(t, attrs, "rpc.response.status_code", "-32600") }) + t.Run("test MCP server span with tool call success", func(t *testing.T) { + span := request.Span{ + Type: request.EventTypeHTTP, + Method: "POST", + Path: "/mcp", + Route: "/mcp", + Status: 200, + SubType: request.HTTPSubtypeMCP, + GenAI: &request.GenAI{ + MCP: &request.MCPCall{ + Method: "tools/call", + ToolName: "get-weather", + SessionID: "sess-abc", + ProtocolVer: "2025-03-26", + RequestID: "1", + }, + }, + } + tAttrs := tracesgen.TraceAttributesSelector(&span, map[attr.Name]struct{}{}) + traces := tracesgen.GenerateTracesWithAttributes(cache, &span.Service, []attribute.KeyValue{}, hostID, groupFromSpanAndAttributes(&span, tAttrs), reporterName) + + spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + topSpan := spans.At(spans.Len() - 1) + attrs := topSpan.Attributes() + status := topSpan.Status() + + assert.Equal(t, "execute_tool get-weather", topSpan.Name()) + assert.Equal(t, ptrace.StatusCodeUnset, status.Code()) + assert.Empty(t, status.Message()) + + ensureTraceStrAttr(t, attrs, "mcp.method.name", "tools/call") + ensureTraceStrAttr(t, attrs, "gen_ai.operation.name", "execute_tool") + ensureTraceStrAttr(t, attrs, "gen_ai.tool.name", "get-weather") + ensureTraceStrAttr(t, attrs, "mcp.session.id", "sess-abc") + ensureTraceStrAttr(t, attrs, "mcp.protocol.version", "2025-03-26") + ensureTraceStrAttr(t, attrs, "jsonrpc.request.id", "1") + ensureTraceAttrNotExists(t, attrs, "rpc.response.status_code") + }) + t.Run("test MCP server span with error", func(t *testing.T) { + span := request.Span{ + Type: request.EventTypeHTTP, + Method: "POST", + Path: "/mcp", + Route: "/mcp", + Status: 200, + SubType: request.HTTPSubtypeMCP, + GenAI: &request.GenAI{ + MCP: &request.MCPCall{ + Method: "tools/call", + ToolName: "nonexistent", + SessionID: "sess-abc", + RequestID: "2", + ErrorCode: -32602, + ErrorMessage: "Unknown tool: nonexistent", + }, + }, + } + tAttrs := tracesgen.TraceAttributesSelector(&span, map[attr.Name]struct{}{}) + traces := tracesgen.GenerateTracesWithAttributes(cache, &span.Service, []attribute.KeyValue{}, hostID, groupFromSpanAndAttributes(&span, tAttrs), reporterName) + + spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + topSpan := spans.At(spans.Len() - 1) + attrs := topSpan.Attributes() + status := topSpan.Status() + + assert.Equal(t, "execute_tool nonexistent", topSpan.Name()) + assert.Equal(t, ptrace.StatusCodeError, status.Code()) + assert.Equal(t, "Unknown tool: nonexistent", status.Message()) + + ensureTraceStrAttr(t, attrs, "mcp.method.name", "tools/call") + ensureTraceStrAttr(t, attrs, "gen_ai.tool.name", "nonexistent") + ensureTraceStrAttr(t, attrs, "mcp.session.id", "sess-abc") + ensureTraceStrAttr(t, attrs, "jsonrpc.request.id", "2") + ensureTraceStrAttr(t, attrs, "rpc.response.status_code", "-32602") + ensureTraceStrAttr(t, attrs, "error.message", "Unknown tool: nonexistent") + }) + t.Run("test MCP client span with error", func(t *testing.T) { + span := request.Span{ + Type: request.EventTypeHTTPClient, + Method: "POST", + Path: "/mcp", + Status: 200, + SubType: request.HTTPSubtypeMCP, + GenAI: &request.GenAI{ + MCP: &request.MCPCall{ + Method: "tools/call", + ToolName: "get-weather", + RequestID: "3", + ErrorCode: -32600, + ErrorMessage: "Invalid Request", + }, + }, + } + tAttrs := tracesgen.TraceAttributesSelector(&span, map[attr.Name]struct{}{}) + traces := tracesgen.GenerateTracesWithAttributes(cache, &span.Service, []attribute.KeyValue{}, hostID, groupFromSpanAndAttributes(&span, tAttrs), reporterName) + + spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + assert.Equal(t, 1, spans.Len()) + topSpan := spans.At(0) + attrs := topSpan.Attributes() + status := topSpan.Status() + + assert.Equal(t, "execute_tool get-weather", topSpan.Name()) + assert.Equal(t, ptrace.StatusCodeError, status.Code()) + assert.Equal(t, "Invalid Request", status.Message()) + + ensureTraceStrAttr(t, attrs, "mcp.method.name", "tools/call") + ensureTraceStrAttr(t, attrs, "gen_ai.tool.name", "get-weather") + ensureTraceStrAttr(t, attrs, "jsonrpc.request.id", "3") + ensureTraceStrAttr(t, attrs, "rpc.response.status_code", "-32600") + ensureTraceStrAttr(t, attrs, "error.message", "Invalid Request") + }) t.Run("test HTTP span without headers has no header attributes", func(t *testing.T) { span := request.Span{ Type: request.EventTypeHTTP, diff --git a/pkg/export/otel/tracesgen/tracesgen.go b/pkg/export/otel/tracesgen/tracesgen.go index 62b50d9d7e..0264161426 100644 --- a/pkg/export/otel/tracesgen/tracesgen.go +++ b/pkg/export/otel/tracesgen/tracesgen.go @@ -305,6 +305,43 @@ var ( spanMetricsSkip = attribute.Bool(string(attr.SkipSpanMetrics), true) ) +// mcpAttributes returns MCP span attributes following the OTEL MCP semantic conventions. +func mcpAttributes(span *request.Span) []attribute.KeyValue { + if span.SubType != request.HTTPSubtypeMCP || span.GenAI == nil || span.GenAI.MCP == nil { + return nil + } + mcp := span.GenAI.MCP + attrs := []attribute.KeyValue{ + attribute.String(string(attr.MCPMethodName), mcp.Method), + semconv.GenAIOperationNameKey.String(mcp.OperationName()), + } + if mcp.ToolName != "" { + attrs = append(attrs, attribute.String(string(attr.GenAIToolName), mcp.ToolName)) + } + if mcp.ResourceURI != "" { + attrs = append(attrs, attribute.String(string(attr.MCPResourceURI), mcp.ResourceURI)) + } + if mcp.PromptName != "" { + attrs = append(attrs, attribute.String(string(attr.GenAIPromptName), mcp.PromptName)) + } + if mcp.SessionID != "" { + attrs = append(attrs, attribute.String(string(attr.MCPSessionID), mcp.SessionID)) + } + if mcp.ProtocolVer != "" { + attrs = append(attrs, attribute.String(string(attr.MCPProtocolVersion), mcp.ProtocolVer)) + } + if mcp.RequestID != "" { + attrs = append(attrs, attribute.String(string(attr.JSONRPCRequestID), mcp.RequestID)) + } + if mcp.ErrorCode != 0 { + attrs = append(attrs, attribute.String(string(attr.RPCResponseStatusCode), strconv.Itoa(mcp.ErrorCode))) + if mcp.ErrorMessage != "" { + attrs = append(attrs, semconv.ErrorMessage(mcp.ErrorMessage)) + } + } + return attrs +} + // jsonRPCAttributes returns JSON-RPC span attributes following the OTEL RPC semantic conventions. func jsonRPCAttributes(span *request.Span) []attribute.KeyValue { if span.SubType != request.HTTPSubtypeJSONRPC || span.JSONRPC == nil { @@ -371,6 +408,7 @@ func TraceAttributesSelector(span *request.Span, optionalAttrs map[attr.Name]str attrs = append(attrs, semconv.GraphQLOperationName(span.GraphQL.OperationName)) attrs = append(attrs, request.GraphqlOperationType(span.GraphQL.OperationType)) } + attrs = append(attrs, mcpAttributes(span)...) attrs = append(attrs, jsonRPCAttributes(span)...) attrs = append(attrs, httpEnrichmentAttributes(span)...) case request.EventTypeGRPC: @@ -667,6 +705,8 @@ func TraceAttributesSelector(span *request.Span, optionalAttrs map[attr.Name]str } } + attrs = append(attrs, mcpAttributes(span)...) + attrs = append(attrs, jsonRPCAttributes(span)...) attrs = append(attrs, httpEnrichmentAttributes(span)...) case request.EventTypeGRPCClient: diff --git a/pkg/obi/config.go b/pkg/obi/config.go index a97443d6ac..a5fa71d81c 100644 --- a/pkg/obi/config.go +++ b/pkg/obi/config.go @@ -175,6 +175,9 @@ var DefaultConfig = Config{ Bedrock: config.BedrockConfig{ Enabled: false, }, + MCP: config.MCPConfig{ + Enabled: false, + }, }, Enrichment: config.EnrichmentConfig{ Enabled: false,