diff --git a/bpf/common/common.c b/bpf/common/common.c index 65dfc6680d..23dfdfe9f5 100644 --- a/bpf/common/common.c +++ b/bpf/common/common.c @@ -27,3 +27,4 @@ const tcp_large_buffer_t *unused_9 __attribute__((unused)); const otel_span_t *unused_10 __attribute__((unused)); const mongo_go_client_req_t *unused_11 __attribute__((unused)); const dns_req_t *unused_12 __attribute__((unused)); +const channel_link_trace_t *unused_13 __attribute__((unused)); diff --git a/bpf/common/common.h b/bpf/common/common.h index 82c0124f04..86a6719157 100644 --- a/bpf/common/common.h +++ b/bpf/common/common.h @@ -239,9 +239,16 @@ typedef struct otel_span { span_description_t span_description; pid_info pid; otel_attributes_t span_attrs; - u8 _epad[6]; + u8 _span_pad[6]; } otel_span_t; +typedef struct channel_link_trace { + u8 type; // Must be first + u8 _pad[7]; + tp_info_t span_tp; + tp_info_t linked_span_tp; +} channel_link_trace_t; + typedef struct mongo_go_client_req { u8 type; // Must be first u8 err; diff --git a/bpf/common/event_defs.h b/bpf/common/event_defs.h index 20f3f19cbb..2c4026d324 100644 --- a/bpf/common/event_defs.h +++ b/bpf/common/event_defs.h @@ -21,3 +21,4 @@ #define EVENT_GO_MONGO 14 #define EVENT_FAILED_CONNECT 15 #define EVENT_DNS_REQUEST 16 +#define EVENT_GO_CHANNEL_LINK 17 diff --git a/bpf/gotracer/go_common.h b/bpf/gotracer/go_common.h index 875e6f089e..c6a0ad02a8 100644 --- a/bpf/gotracer/go_common.h +++ b/bpf/gotracer/go_common.h @@ -18,6 +18,7 @@ #include "common/tp_info.h" #include +#include #include #include #include @@ -84,6 +85,14 @@ struct { __uint(pinning, OBI_PIN_INTERNAL); } go_trace_map SEC(".maps"); +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, go_addr_key_t); // span pointer + __type(value, otel_span_t); + __uint(max_entries, MAX_CONCURRENT_CUSTOM_SPANS); + __uint(pinning, OBI_PIN_INTERNAL); +} active_spans SEC(".maps"); + struct { __uint(type, BPF_MAP_TYPE_LRU_HASH); __type(key, go_addr_key_t); // key: goroutine diff --git a/bpf/gotracer/go_offsets.h b/bpf/gotracer/go_offsets.h index 5280ead345..4e9defa81b 100644 --- a/bpf/gotracer/go_offsets.h +++ b/bpf/gotracer/go_offsets.h @@ -84,6 +84,10 @@ typedef enum { _tracer_delegate_pos, _tracer_attribute_opt_off, _error_string_off, + // go runtime channels + _hchan_dataqsiz_pos, + _hchan_sendx_pos, + _hchan_recvx_pos, // go jsonrpc _jsonrpc_request_header_service_method_pos, // go mongodb diff --git a/bpf/gotracer/go_runtime.c b/bpf/gotracer/go_runtime.c index c676ef6f73..cdd2b0704b 100644 --- a/bpf/gotracer/go_runtime.c +++ b/bpf/gotracer/go_runtime.c @@ -26,6 +26,9 @@ #include #include +#include +#include + #include #include @@ -105,6 +108,308 @@ int obi_uprobe_runtime_newproc1_return(struct pt_regs *ctx) { return 0; } +static __always_inline bool read_hchan_offsets(off_table_t *ot, u64 *dataqsiz, u64 *sendx, u64 *recvx) { + if (!ot || !dataqsiz || !sendx || !recvx) { + return false; + } + + *dataqsiz = go_offset_of(ot, (go_offset){.v = _hchan_dataqsiz_pos}); + *sendx = go_offset_of(ot, (go_offset){.v = _hchan_sendx_pos}); + *recvx = go_offset_of(ot, (go_offset){.v = _hchan_recvx_pos}); + + return *dataqsiz != (u64)-1 && *sendx != (u64)-1 && *recvx != (u64)-1; +} + +static __always_inline bool current_otel_handoff(struct pt_regs *ctx, chan_handoff_t *handoff) { + if (!handoff) { + return false; + } + + void *goroutine_addr = (void *)GOROUTINE_PTR(ctx); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + + grpc_srv_func_invocation_t *grpc_server_inv = + bpf_map_lookup_elem(&ongoing_grpc_server_requests, &g_key); + if (grpc_server_inv && valid_trace(grpc_server_inv->tp.trace_id)) { + tp_clone(&handoff->tp, &grpc_server_inv->tp); + return true; + } + + grpc_client_func_invocation_t *grpc_client_inv = + bpf_map_lookup_elem(&ongoing_grpc_client_requests, &g_key); + if (grpc_client_inv && valid_trace(grpc_client_inv->tp.trace_id)) { + tp_clone(&handoff->tp, &grpc_client_inv->tp); + return true; + } + + server_http_func_invocation_t *http_server_inv = + bpf_map_lookup_elem(&ongoing_http_server_requests, &g_key); + if (http_server_inv && valid_trace(http_server_inv->tp.trace_id)) { + tp_clone(&handoff->tp, &http_server_inv->tp); + return true; + } + + tp_info_t *kafka_go_tp = bpf_map_lookup_elem(&produce_traceparents_by_goroutine, &g_key); + if (kafka_go_tp && valid_trace(kafka_go_tp->trace_id)) { + tp_clone(&handoff->tp, kafka_go_tp); + return true; + } + + mongo_go_client_req_t *mongo = bpf_map_lookup_elem(&ongoing_mongo_requests, &g_key); + if (mongo && valid_trace(mongo->tp.trace_id)) { + tp_clone(&handoff->tp, &mongo->tp); + return true; + } + + redis_client_req_t *redis = bpf_map_lookup_elem(&ongoing_redis_requests, &g_key); + if (redis && valid_trace(redis->tp.trace_id)) { + tp_clone(&handoff->tp, &redis->tp); + return true; + } + + sql_func_invocation_t *sql = bpf_map_lookup_elem(&ongoing_sql_queries, &g_key); + if (sql && valid_trace(sql->tp.trace_id)) { + tp_clone(&handoff->tp, &sql->tp); + return true; + } + + const u64 pid_tgid = bpf_get_current_pid_tgid(); + obi_ctx_info_t *obi_ctx = obi_ctx__get(pid_tgid); + if (!obi_ctx) { + return false; + } + + if (!valid_trace(obi_ctx->trace_id) || *((u64 *)obi_ctx->span_id) == 0) { + return false; + } + + __builtin_memcpy(handoff->tp.trace_id, obi_ctx->trace_id, sizeof(handoff->tp.trace_id)); + __builtin_memcpy(handoff->tp.span_id, obi_ctx->span_id, sizeof(handoff->tp.span_id)); + *((u64 *)handoff->tp.parent_id) = 0; + handoff->tp.flags = 0; + return true; +} + +static __always_inline void emit_channel_handoff(chan_handoff_t *sender, chan_handoff_t *receiver) { + if (!sender || !receiver || !valid_trace(sender->tp.trace_id) || + !valid_trace(receiver->tp.trace_id)) { + return; + } + + if (*((u64 *)sender->tp.span_id) == *((u64 *)receiver->tp.span_id) && + *((u64 *)sender->tp.trace_id) == *((u64 *)receiver->tp.trace_id) && + *((u64 *)(sender->tp.trace_id + 8)) == *((u64 *)(receiver->tp.trace_id + 8))) { + return; + } + + channel_link_trace_t *trace = bpf_ringbuf_reserve(&events, sizeof(*trace), 0); + if (!trace) { + return; + } + + trace->type = EVENT_GO_CHANNEL_LINK; + tp_clone(&trace->span_tp, &sender->tp); + tp_clone(&trace->linked_span_tp, &receiver->tp); + bpf_ringbuf_submit(trace, get_flags()); +} + +static __always_inline bool read_channel_u64(void *chan_ptr, u64 offset, u64 *value) { + if (!chan_ptr || !value) { + return false; + } + + bpf_probe_read_user(value, sizeof(*value), chan_ptr + offset); + return true; +} + +static __always_inline u64 circular_slot(u64 index, u64 dataqsiz) { + if (dataqsiz == 0) { + return 0; + } + return index == 0 ? dataqsiz - 1 : index - 1; +} + +static __always_inline int channel_send_start(struct pt_regs *ctx) { + void *goroutine_addr = (void *)GOROUTINE_PTR(ctx); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + + chan_func_invocation_t invocation = {.chan_ptr = (u64)GO_PARAM1(ctx)}; + bpf_map_update_elem(&chansend_invocations, &g_key, &invocation, BPF_ANY); + return 0; +} + +static __always_inline int channel_send_return(struct pt_regs *ctx) { + off_table_t *ot = get_offsets_table(); + u64 hchan_dataqsiz_off = 0; + u64 hchan_sendx_off = 0; + u64 hchan_recvx_off = 0; + void *goroutine_addr = (void *)GOROUTINE_PTR(ctx); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + + chan_func_invocation_t *invocation = bpf_map_lookup_elem(&chansend_invocations, &g_key); + if (!invocation) { + return 0; + } + + if (!read_hchan_offsets(ot, &hchan_dataqsiz_off, &hchan_sendx_off, &hchan_recvx_off)) { + bpf_map_delete_elem(&chansend_invocations, &g_key); + return 0; + } + + chan_handoff_t sender = {0}; + if (current_otel_handoff(ctx, &sender)) { + u64 dataqsiz = 0; + read_channel_u64((void *)invocation->chan_ptr, hchan_dataqsiz_off, &dataqsiz); + + if (dataqsiz == 0) { + chan_handoff_t *receiver = + bpf_map_lookup_elem(&direct_channel_receivers, &invocation->chan_ptr); + if (receiver) { + emit_channel_handoff(&sender, receiver); + bpf_map_delete_elem(&direct_channel_receivers, &invocation->chan_ptr); + } else { + bpf_map_update_elem( + &direct_channel_senders, &invocation->chan_ptr, &sender, BPF_ANY); + } + } else { + u64 sendx = 0; + read_channel_u64((void *)invocation->chan_ptr, hchan_sendx_off, &sendx); + + chan_handoff_key_t key = { + .chan_ptr = invocation->chan_ptr, + .slot = circular_slot(sendx, dataqsiz), + }; + bpf_map_update_elem(&buffered_channel_handoffs, &key, &sender, BPF_ANY); + } + } + + bpf_map_delete_elem(&chansend_invocations, &g_key); + return 0; +} + +static __always_inline int channel_recv_start(struct pt_regs *ctx) { + off_table_t *ot = get_offsets_table(); + u64 hchan_dataqsiz_off = 0; + u64 hchan_sendx_off = 0; + u64 hchan_recvx_off = 0; + void *goroutine_addr = (void *)GOROUTINE_PTR(ctx); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + + chan_func_invocation_t invocation = {.chan_ptr = (u64)GO_PARAM1(ctx)}; + chan_handoff_t receiver = {0}; + if (current_otel_handoff(ctx, &receiver)) { + tp_clone(&invocation.tp, &receiver.tp); + invocation.has_tp = 1; + } + bpf_map_update_elem(&chanrecv_invocations, &g_key, &invocation, BPF_ANY); + + if (!read_hchan_offsets(ot, &hchan_dataqsiz_off, &hchan_sendx_off, &hchan_recvx_off)) { + return 0; + } + + u64 dataqsiz = 0; + read_channel_u64((void *)invocation.chan_ptr, hchan_dataqsiz_off, &dataqsiz); + if (dataqsiz == 0 && invocation.has_tp) { + bpf_map_update_elem(&direct_channel_receivers, &invocation.chan_ptr, &receiver, BPF_ANY); + } + + return 0; +} + +static __always_inline int channel_recv_return(struct pt_regs *ctx) { + off_table_t *ot = get_offsets_table(); + u64 hchan_dataqsiz_off = 0; + u64 hchan_sendx_off = 0; + u64 hchan_recvx_off = 0; + void *goroutine_addr = (void *)GOROUTINE_PTR(ctx); + go_addr_key_t g_key = {}; + go_addr_key_from_id(&g_key, goroutine_addr); + + chan_func_invocation_t *invocation = bpf_map_lookup_elem(&chanrecv_invocations, &g_key); + if (!invocation) { + return 0; + } + + if (!read_hchan_offsets(ot, &hchan_dataqsiz_off, &hchan_sendx_off, &hchan_recvx_off)) { + bpf_map_delete_elem(&chanrecv_invocations, &g_key); + return 0; + } + + chan_handoff_t receiver = {0}; + bool have_receiver = current_otel_handoff(ctx, &receiver); + if (!have_receiver && invocation->has_tp) { + tp_clone(&receiver.tp, &invocation->tp); + have_receiver = true; + } + + u64 dataqsiz = 0; + read_channel_u64((void *)invocation->chan_ptr, hchan_dataqsiz_off, &dataqsiz); + + if (dataqsiz == 0) { + bpf_map_delete_elem(&direct_channel_receivers, &invocation->chan_ptr); + chan_handoff_t *sender = + bpf_map_lookup_elem(&direct_channel_senders, &invocation->chan_ptr); + if (sender) { + if (have_receiver) { + emit_channel_handoff(sender, &receiver); + } + bpf_map_delete_elem(&direct_channel_senders, &invocation->chan_ptr); + } + } else { + u64 recvx = 0; + read_channel_u64((void *)invocation->chan_ptr, hchan_recvx_off, &recvx); + + chan_handoff_key_t key = { + .chan_ptr = invocation->chan_ptr, + .slot = circular_slot(recvx, dataqsiz), + }; + chan_handoff_t *sender = bpf_map_lookup_elem(&buffered_channel_handoffs, &key); + if (sender) { + if (have_receiver) { + emit_channel_handoff(sender, &receiver); + } + bpf_map_delete_elem(&buffered_channel_handoffs, &key); + } + } + + bpf_map_delete_elem(&chanrecv_invocations, &g_key); + return 0; +} + +SEC("uprobe/runtime_chansend1") +int obi_uprobe_runtime_chansend1(struct pt_regs *ctx) { + return channel_send_start(ctx); +} + +SEC("uprobe/runtime_chansend1_return") +int obi_uprobe_runtime_chansend1_return(struct pt_regs *ctx) { + return channel_send_return(ctx); +} + +SEC("uprobe/runtime_chanrecv1") +int obi_uprobe_runtime_chanrecv1(struct pt_regs *ctx) { + return channel_recv_start(ctx); +} + +SEC("uprobe/runtime_chanrecv1_return") +int obi_uprobe_runtime_chanrecv1_return(struct pt_regs *ctx) { + return channel_recv_return(ctx); +} + +SEC("uprobe/runtime_chanrecv2") +int obi_uprobe_runtime_chanrecv2(struct pt_regs *ctx) { + return channel_recv_start(ctx); +} + +SEC("uprobe/runtime_chanrecv2_return") +int obi_uprobe_runtime_chanrecv2_return(struct pt_regs *ctx) { + return channel_recv_return(ctx); +} + enum gstatus { // _Gidle: just allocated, not yet initialized g_idle = 0, diff --git a/bpf/gotracer/go_sdk.c b/bpf/gotracer/go_sdk.c index b6c05fc445..fe569266d9 100644 --- a/bpf/gotracer/go_sdk.c +++ b/bpf/gotracer/go_sdk.c @@ -51,16 +51,6 @@ struct { __uint(pinning, OBI_PIN_INTERNAL); } span_names SEC(".maps"); -// this is a large value data structure, increase -// concurrent_custom_spans carefully. -struct { - __uint(type, BPF_MAP_TYPE_HASH); - __type(key, go_addr_key_t); // span pointer - __type(value, otel_span_t); - __uint(max_entries, MAX_CONCURRENT_CUSTOM_SPANS); - __uint(pinning, OBI_PIN_INTERNAL); -} active_spans SEC(".maps"); - struct { __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); __type(key, int); @@ -233,25 +223,30 @@ int obi_uprobe_tracer_Start_Returns(struct pt_regs *ctx) { read_attrs_from_opts(span, (void *)span_info->opts_ptr, span_info->opts_len); } - unsigned char tp_buf[TP_MAX_VAL_LENGTH]; tp_info_t *tp = tp_info_from_parent_go(&g_key, &span->parent_go); - if (tp) { - __builtin_memcpy(&span->prev_tp, tp, sizeof(tp_info_t)); - tp_from_parent(&span->tp, tp); - span->tp.flags = tp->flags; - urand_bytes(span->tp.span_id, SPAN_ID_SIZE_BYTES); - encode_hex(tp_buf, span->tp.parent_id, SPAN_ID_SIZE_BYTES); - - if (span->parent_go) { - go_addr_key_t gp_key = {}; - go_addr_key_from_id(&gp_key, (void *)span->parent_go); - update_tp_parent_go(&gp_key, &span->tp); - - // reusing gp_key to save stack space - go_addr_key_from_id(&gp_key, span_ptr); - - bpf_map_update_elem(&active_spans, &gp_key, span, BPF_ANY); - } + if (!tp) { + bpf_map_delete_elem(&span_names, &g_key); + return 0; + } + + __builtin_memcpy(&span->prev_tp, tp, sizeof(tp_info_t)); + tp_from_parent(&span->tp, tp); + span->tp.flags = tp->flags; + urand_bytes(span->tp.span_id, SPAN_ID_SIZE_BYTES); + + if (span->parent_go) { + go_addr_key_t gp_key = {}; + go_addr_key_from_id(&gp_key, (void *)span->parent_go); + update_tp_parent_go(&gp_key, &span->tp); + + // reusing gp_key to save stack space + go_addr_key_from_id(&gp_key, span_ptr); + + bpf_map_update_elem(&active_spans, &gp_key, span, BPF_ANY); + } else { + go_addr_key_t s_key = {}; + go_addr_key_from_id(&s_key, span_ptr); + bpf_map_update_elem(&active_spans, &s_key, span, BPF_ANY); } bpf_map_delete_elem(&span_names, &g_key); @@ -260,9 +255,10 @@ int obi_uprobe_tracer_Start_Returns(struct pt_regs *ctx) { SEC("uprobe/nonRecordingSpan_End") int obi_uprobe_nonRecordingSpan_End(struct pt_regs *ctx) { + void *goroutine_addr = (void *)GOROUTINE_PTR(ctx); void *span_ptr = (void *)GO_PARAM1(ctx); bpf_dbg_printk("=== uprobe/nonRecordingSpan_End ==="); - bpf_dbg_printk("goroutine_addr=%lx, span_ptr=%lx", (void *)GOROUTINE_PTR(ctx), span_ptr); + bpf_dbg_printk("goroutine_addr=%lx, span_ptr=%lx", goroutine_addr, span_ptr); go_addr_key_t s_key = {}; go_addr_key_from_id(&s_key, span_ptr); diff --git a/bpf/gotracer/maps/runtime.h b/bpf/gotracer/maps/runtime.h index fb372dc9a4..9db15551d2 100644 --- a/bpf/gotracer/maps/runtime.h +++ b/bpf/gotracer/maps/runtime.h @@ -6,9 +6,70 @@ #include #include +#include +#include +#include +#include + +typedef struct chan_func_invocation { + u64 chan_ptr; + tp_info_t tp; + u8 has_tp; + u8 _pad[7]; +} chan_func_invocation_t; + +typedef struct chan_handoff_key { + u64 chan_ptr; + u64 slot; +} chan_handoff_key_t; + +typedef struct chan_handoff { + tp_info_t tp; +} chan_handoff_t; + struct { __uint(type, BPF_MAP_TYPE_LRU_HASH); __type(key, void *); // *m __type(value, u32); __uint(max_entries, 5000); } mptr_to_root_tid SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, go_addr_key_t); + __type(value, chan_func_invocation_t); + __uint(max_entries, MAX_CONCURRENT_REQUESTS); + __uint(pinning, OBI_PIN_INTERNAL); +} chansend_invocations SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, go_addr_key_t); + __type(value, chan_func_invocation_t); + __uint(max_entries, MAX_CONCURRENT_REQUESTS); + __uint(pinning, OBI_PIN_INTERNAL); +} chanrecv_invocations SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, chan_handoff_key_t); + __type(value, chan_handoff_t); + __uint(max_entries, MAX_CONCURRENT_REQUESTS); + __uint(pinning, OBI_PIN_INTERNAL); +} buffered_channel_handoffs SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, u64); + __type(value, chan_handoff_t); + __uint(max_entries, MAX_CONCURRENT_REQUESTS); + __uint(pinning, OBI_PIN_INTERNAL); +} direct_channel_senders SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, u64); + __type(value, chan_handoff_t); + __uint(max_entries, MAX_CONCURRENT_REQUESTS); + __uint(pinning, OBI_PIN_INTERNAL); +} direct_channel_receivers SEC(".maps"); diff --git a/configs/offsets/tracker_input.json b/configs/offsets/tracker_input.json index 1f06ce8953..bc83d5b5d7 100644 --- a/configs/offsets/tracker_input.json +++ b/configs/offsets/tracker_input.json @@ -26,7 +26,8 @@ "net/rpc.Request": ["ServiceMethod"], "database/sql.driverConn": ["ci"], "net/textproto.Reader": ["R"], - "bufio.Reader": ["buf", "w"] + "bufio.Reader": ["buf", "w"], + "runtime.hchan": ["dataqsiz", "sendx", "recvx"] } }, "golang.org/x/net": { diff --git a/devdocs/context-propagation.md b/devdocs/context-propagation.md index 48cc596765..8357696f0c 100644 --- a/devdocs/context-propagation.md +++ b/devdocs/context-propagation.md @@ -345,6 +345,45 @@ The Go runtime calls `runtime.casgstatus` on every goroutine status transition. This fires on every context switch, so `traces_ctx_v1` is always in sync with whichever goroutine is currently executing on the OS thread. +OBI also hooks the runtime channel wrappers `runtime.chansend1`, `runtime.chanrecv1`, and `runtime.chanrecv2` to detect work handed between goroutines through a `chan`. + +When a send and receive are matched, the Go runtime probe reads the active OBI trace context on the sending goroutine and the receiving goroutine and emits a dedicated channel-link event. User-space keeps that link temporarily and attaches it when the normal OBI span event for either side is decoded. The resulting OTLP spans contain ordinary span links. + +Example: + +```go +package main + +import ( + "net/http" +) + +type job struct { + id int +} + +var work = make(chan job) + +func main() { + http.HandleFunc("/receive", func(rw http.ResponseWriter, _ *http.Request) { + item := <-work + _, _ = rw.Write([]byte("received job")) + _ = item + }) + + http.HandleFunc("/dispatch", func(rw http.ResponseWriter, _ *http.Request) { + work <- job{id: 1} + _, _ = rw.Write([]byte("dispatched job")) + }) + + _ = http.ListenAndServe(":8080", nil) +} +``` + +In this example, the `/receive` and `/dispatch` handlers run in separate goroutines and produce separate OBI HTTP server spans. OBI observes the `chan` handoff, emits a channel-link event in eBPF space, and the exported `/receive` and `/dispatch` spans carry reciprocal span links so the relationship is visible downstream even though the two requests are separate traces. + +Current limitation: this channel-linking path covers the direct runtime send/receive wrappers above. `select`-based receive paths use different runtime internals and are not covered by these probes. + #### Node.js — `async_hooks` before callback + `uv_fs_access` uprobe **Files**: `pkg/internal/nodejs/fdextractor.js`, `bpf/generictracer/nodejs.c` diff --git a/examples/go-channel-links/README.md b/examples/go-channel-links/README.md new file mode 100644 index 0000000000..9352dea50b --- /dev/null +++ b/examples/go-channel-links/README.md @@ -0,0 +1,69 @@ +# Go Channel Links Example + +This example demonstrates Go span linking across a user-space channel handoff. + +The app exposes two HTTP endpoints, `/receive` and `/dispatch`. Each cycle: + +- serves `GET /receive`, which blocks waiting on an unbuffered Go `chan` +- serves `GET /dispatch`, which sends work into that same `chan` +- creates a channel handoff between two separate HTTP handler goroutines + +OBI observes the `runtime.chansend1` and `runtime.chanrecv1`/`runtime.chanrecv2` handoff and emits link metadata for the active OBI spans involved in that handoff. In this example those are ordinary OBI HTTP server spans for `/dispatch` and `/receive`. + +## Topology + +- `channel-links-app`: small Go HTTP service with `/receive` and `/dispatch` +- `traffic-generator`: repeatedly opens a waiting `/receive` request and then triggers `/dispatch` +- `obi`: instruments the Go service and exports traces and metrics over OTLP +- `lgtm`: Grafana LGTM stack for viewing telemetry + +## Run + +```bash +cd examples/go-channel-links +docker compose up -d --build +``` + +By default, the compose file builds the `obi` image from your current workspace and tags it as `obi-go-channel-links:local`. + +If the stack is already running and you changed the app, OBI config, or OBI source, recreate the app and OBI containers: + +```bash +docker compose up -d --build --force-recreate channel-links-app obi +``` + +Useful endpoints: + +- app receive: `http://localhost:8080/receive` +- app dispatch: `http://localhost:8080/dispatch` +- Grafana: `http://localhost:3000` (`admin` / `admin`) + +If you want to trigger the handoff manually: + +```bash +curl http://localhost:8080/receive & +sleep 1 +curl http://localhost:8080/dispatch +wait +``` + +## View The Links + +1. Open `http://localhost:3000` and sign in with `admin` / `admin`. +2. Open Grafana Explore. +3. Select the `Tempo` data source. +4. Trigger the `/receive` then `/dispatch` pair a few times, or let the traffic generator run. +5. Open recent traces produced by `channel-links-app` and inspect the top-level HTTP server spans for `/receive` and `/dispatch`. + +What to look for: + +- the `GET /receive` server span has a `Links` section +- the `GET /dispatch` server span has a `Links` section +- each link points to the peer trace's `processing` span created by OBI +- the two requests remain separate traces; the relationship is expressed with links, not parent-child edges + +## Notes + +- The compose stack uses the same privileged OBI + LGTM pattern as the NGINX example. +- You can still override the OBI image explicitly with `OBI_IMAGE=... docker compose up -d`. +- The example intentionally uses a plain blocking receive (`item := <-workCh`). `select`-based channel receive paths are not covered by this probe set. diff --git a/examples/go-channel-links/app/Dockerfile b/examples/go-channel-links/app/Dockerfile new file mode 100644 index 0000000000..eaf7db0b3f --- /dev/null +++ b/examples/go-channel-links/app/Dockerfile @@ -0,0 +1,21 @@ +FROM golang:1.25.8@sha256:f55a6ec7f24aedc1ed66e2641fdc52de01f2d24d6e49d1fa38582c07dd5f601d AS builder + +ARG TARGETARCH + +ENV GOARCH=$TARGETARCH + +WORKDIR /src + +COPY go.mod go.mod +COPY go.sum go.sum +COPY examples/go-channel-links/app examples/go-channel-links/app + +RUN go build -o /out/channel-links-app ./examples/go-channel-links/app + +FROM debian:bookworm-slim@sha256:74d56e3931e0d5a1dd51f8c8a2466d21de84a271cd3b5a733b803aa91abf4421 + +COPY --from=builder /out/channel-links-app /channel-links-app + +EXPOSE 8080 + +CMD ["/channel-links-app"] diff --git a/examples/go-channel-links/app/main.go b/examples/go-channel-links/app/main.go new file mode 100644 index 0000000000..c8656be2e4 --- /dev/null +++ b/examples/go-channel-links/app/main.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "encoding/json" + "log/slog" + "net/http" + "os" + "sync/atomic" +) + +type job struct { + ID int64 +} + +type response struct { + JobID int64 `json:"job_id"` + Result string `json:"result"` +} + +var ( + jobCounter atomic.Int64 + workCh = make(chan job) +) + +func main() { + port := envOrDefault("PORT", "8080") + + mux := http.NewServeMux() + mux.HandleFunc("/", func(rw http.ResponseWriter, _ *http.Request) { + _, _ = rw.Write([]byte("try GET /receive and GET /dispatch\n")) + }) + mux.HandleFunc("/receive", receive) + mux.HandleFunc("/dispatch", dispatch) + + addr := ":" + port + slog.Info("starting channel links example", "addr", addr) + if err := http.ListenAndServe(addr, mux); err != nil { + slog.Error("server stopped", "error", err) + os.Exit(1) + } +} + +func dispatch(rw http.ResponseWriter, req *http.Request) { + jobID := jobCounter.Add(1) + workCh <- job{ID: jobID} + + out := response{ + JobID: jobID, + Result: "dispatched to waiting receiver", + } + + rw.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(rw).Encode(out); err != nil { + slog.Warn("encode response", "error", err, "path", req.URL.Path) + } +} + +func receive(rw http.ResponseWriter, req *http.Request) { + item := <-workCh + + rw.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(rw).Encode(response{ + JobID: item.ID, + Result: "received from channel", + }); err != nil { + slog.Warn("encode response", "error", err, "path", req.URL.Path) + } +} + +func envOrDefault(name, fallback string) string { + if value := os.Getenv(name); value != "" { + return value + } + + return fallback +} diff --git a/examples/go-channel-links/docker-compose.yml b/examples/go-channel-links/docker-compose.yml new file mode 100644 index 0000000000..501bedd5d3 --- /dev/null +++ b/examples/go-channel-links/docker-compose.yml @@ -0,0 +1,52 @@ +services: + channel-links-app: + build: + context: ../.. + dockerfile: examples/go-channel-links/app/Dockerfile + container_name: channel-links-app + ports: + - "8080:8080" + + traffic-generator: + image: curlimages/curl:8.16.0@sha256:5a91ea0c9c3ee27b4abe657b68cf6bf0676afa13b236b3bda34283cb3924d4f6 + platform: linux/amd64 + container_name: channel-links-traffic-generator + command: + - sh + - -c + - | + while true; do + curl -fsS --max-time 5 http://channel-links-app:8080/receive >/dev/null & + recv_pid=$$! + sleep 1 + curl -fsS http://channel-links-app:8080/dispatch >/dev/null || true + wait "$$recv_pid" || true + sleep 2 + done + depends_on: + - channel-links-app + + obi: + image: ${OBI_IMAGE:-obi-go-channel-links:local} + build: + context: ../.. + dockerfile: Dockerfile + container_name: obi-go-channel-links-example + command: + - --config=/config/obi-config.yaml + pid: host + privileged: true + volumes: + - ./obi-config.yaml:/config/obi-config.yaml:ro + depends_on: + - channel-links-app + - traffic-generator + - lgtm + + lgtm: + image: grafana/otel-lgtm:0.22.1@sha256:52a197a4aad6203af126c148c6194472b48f88d5481f246f5d3657023b30ac9f + container_name: lgtm-go-channel-links-example + ports: + - "3000:3000" + - "4317:4317" + - "4318:4318" diff --git a/examples/go-channel-links/obi-config.yaml b/examples/go-channel-links/obi-config.yaml new file mode 100644 index 0000000000..932764159d --- /dev/null +++ b/examples/go-channel-links/obi-config.yaml @@ -0,0 +1,10 @@ +otel_metrics_export: + endpoint: http://lgtm:4318 +otel_traces_export: + endpoint: http://lgtm:4318 +discovery: + instrument: + - namespace: channel-links-example + name: channel-links-app + containers_only: true + exe_path: /channel-links-app diff --git a/pkg/appolly/app/request/span.go b/pkg/appolly/app/request/span.go index 0069c57771..c293af5bda 100644 --- a/pkg/appolly/app/request/span.go +++ b/pkg/appolly/app/request/span.go @@ -194,6 +194,12 @@ type MessagingInfo struct { Partition int `json:"partition"` } +type SpanLink struct { + TraceID trace.TraceID `json:"traceID"` + SpanID trace.SpanID `json:"spanID"` + TraceFlags uint8 `json:"traceFlags,string"` +} + type GraphQL struct { Document string `json:"document"` OperationName string `json:"operationName"` @@ -412,6 +418,7 @@ type Span struct { Elasticsearch *Elasticsearch `json:"-"` AWS *AWS `json:"-"` GenAI *GenAI `json:"-"` + Links []SpanLink `json:"links,omitempty"` // RequestHeaders stores extracted HTTP request headers based on enrichment rules. // Keys are canonical header names, values are all header values (possibly obfuscated). diff --git a/pkg/ebpf/common/common.go b/pkg/ebpf/common/common.go index a94fb1123f..eebe1eb819 100644 --- a/pkg/ebpf/common/common.go +++ b/pkg/ebpf/common/common.go @@ -34,7 +34,7 @@ import ( "go.opentelemetry.io/obi/pkg/pipe/msg" ) -//go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64 -type http_request_trace_t -type sql_request_trace_t -type http_info_t -type connection_info_t -type http2_grpc_request_t -type tcp_req_t -type kafka_client_req_t -type kafka_go_req_t -type redis_client_req_t -type tcp_large_buffer_t -type otel_span_t -type mongo_go_client_req_t -type dns_req_t Bpf ../../../bpf/common/common.c -- -I../../../bpf +//go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64 -type http_request_trace_t -type sql_request_trace_t -type http_info_t -type connection_info_t -type http2_grpc_request_t -type tcp_req_t -type kafka_client_req_t -type kafka_go_req_t -type redis_client_req_t -type tcp_large_buffer_t -type otel_span_t -type channel_link_trace_t -type mongo_go_client_req_t -type dns_req_t Bpf ../../../bpf/common/common.c -- -I../../../bpf // HTTPRequestTrace contains information from an HTTP request as directly received from the // eBPF layer. This contains low-level C structures for accurate binary read from ring buffer. @@ -72,6 +72,7 @@ const ( EventTypeGoMongo = 14 // EVENT_GO_MONGO - Go MongoDB spans EventTypeFailedConnect = 15 // EVENT_FAILED_CONNECT - Failed Connections EventTypeDNS = 16 // EVENT_DNS_REQUEST - DNS events + EventTypeGoChannelLink = 17 // EVENT_GO_CHANNEL_LINK - Go channel handoff span links ) // Kernel-side classification @@ -188,6 +189,7 @@ type EBPFParseContext struct { kafkaTopicUUIDToName *simplelru.LRU[kafkaparser.UUID, string] payloadExtraction config.PayloadExtraction dnsEvents *expirable.LRU[dnsparser.DNSId, *request.Span] + pendingSpanLinks *pendingSpanLinks emitSpans func([]request.Span) } @@ -301,9 +303,19 @@ func NewEBPFParseContext(cfg *config.EBPFTracer, spansChan *msg.Queue[[]request. payloadExtraction: payloadExtraction, dnsEvents: dnsEvents, emitSpans: emitSpans, + pendingSpanLinks: newPendingSpanLinks(), } } +func finalizeParsedSpan(parseCtx *EBPFParseContext, span request.Span, ignore bool, err error) (request.Span, bool, error) { + if err != nil || ignore || parseCtx == nil { + return span, ignore, err + } + + parseCtx.consumePendingSpanLinks(&span) + return span, false, nil +} + func (ctx *EBPFParseContext) emitExtraSpans(spans ...request.Span) { if ctx == nil || ctx.emitSpans == nil || len(spans) == 0 { return @@ -330,29 +342,41 @@ func ReadBPFTraceAsSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, reco switch eventType { case EventTypeSQL: - return ReadSQLRequestTraceAsSpan(record) + span, ignore, err := ReadSQLRequestTraceAsSpan(record) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeKHTTP: - return ReadHTTPInfoIntoSpan(parseCtx, record, filter) + span, ignore, err := ReadHTTPInfoIntoSpan(parseCtx, record, filter) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeKHTTP2: - return ReadHTTP2InfoIntoSpan(parseCtx, record, filter) + span, ignore, err := ReadHTTP2InfoIntoSpan(parseCtx, record, filter) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeTCP: - return ReadTCPRequestIntoSpan(parseCtx, cfg, record, filter) + span, ignore, err := ReadTCPRequestIntoSpan(parseCtx, cfg, record, filter) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeGoSarama: - return ReadGoSaramaRequestIntoSpan(record) + span, ignore, err := ReadGoSaramaRequestIntoSpan(record) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeGoRedis: - return ReadGoRedisRequestIntoSpan(record) + span, ignore, err := ReadGoRedisRequestIntoSpan(record) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeGoMongo: - return ReadGoMongoRequestIntoSpan(record) + span, ignore, err := ReadGoMongoRequestIntoSpan(record) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeGoKafkaGo: - return ReadGoKafkaGoRequestIntoSpan(record) + span, ignore, err := ReadGoKafkaGoRequestIntoSpan(record) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeTCPLargeBuffer: return appendTCPLargeBuffer(parseCtx, record) case EventOTelSDKGo: - return ReadGoOTelEventIntoSpan(record) + span, ignore, err := ReadGoOTelEventIntoSpan(record) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeFailedConnect: - return ReadFailedConnectIntoSpan(record, filter) + span, ignore, err := ReadFailedConnectIntoSpan(record, filter) + return finalizeParsedSpan(parseCtx, span, ignore, err) case EventTypeDNS: return readDNSEventIntoSpan(parseCtx, record) + case EventTypeGoChannelLink: + return readGoChannelLinkEvent(parseCtx, record) } event, err := ReinterpretCast[HTTPRequestTrace](record.RawSample) @@ -360,7 +384,7 @@ func ReadBPFTraceAsSpan(parseCtx *EBPFParseContext, cfg *config.EBPFTracer, reco return request.Span{}, true, err } - return HTTPRequestTraceToSpan(event), false, nil + return finalizeParsedSpan(parseCtx, HTTPRequestTraceToSpan(event), false, nil) } func ReinterpretCast[T any](b []byte) (*T, error) { diff --git a/pkg/ebpf/common/go_channel_link_transform.go b/pkg/ebpf/common/go_channel_link_transform.go new file mode 100644 index 0000000000..b56d83f576 --- /dev/null +++ b/pkg/ebpf/common/go_channel_link_transform.go @@ -0,0 +1,160 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ebpfcommon // import "go.opentelemetry.io/obi/pkg/ebpf/common" + +import ( + "time" + + "github.com/hashicorp/golang-lru/v2/expirable" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" +) + +type channelLinkTrace struct { + Type uint8 + _ [7]byte + SpanTp channelLinkTP + LinkedSpanTp channelLinkTP +} + +type channelLinkTP struct { + TraceId [16]byte + SpanId [8]byte + ParentId [8]byte + Ts uint64 + Flags uint8 + _ [7]byte +} + +const ( + maxPendingSpanLinks = 1024 + pendingSpanLinksTTL = 5 * time.Minute + maxLinksPerSpan = 8 +) + +type spanLinkKey struct { + traceID trace.TraceID + spanID trace.SpanID +} + +type pendingSpanLinks struct { + cache *expirable.LRU[spanLinkKey, []request.SpanLink] +} + +func newPendingSpanLinks() *pendingSpanLinks { + return &pendingSpanLinks{ + cache: expirable.NewLRU[spanLinkKey, []request.SpanLink]( + maxPendingSpanLinks, + nil, + pendingSpanLinksTTL, + ), + } +} + +func readGoChannelLinkEvent(parseCtx *EBPFParseContext, record *ringbuf.Record) (request.Span, bool, error) { + if parseCtx == nil || parseCtx.pendingSpanLinks == nil { + return request.Span{}, true, nil + } + + event, err := ReinterpretCast[channelLinkTrace](record.RawSample) + if err != nil { + return request.Span{}, true, err + } + + parseCtx.pendingSpanLinks.recordPair( + tpToSpanLinkKey(&event.SpanTp), + tpToSpanLink(&event.LinkedSpanTp), + ) + parseCtx.pendingSpanLinks.recordPair( + tpToSpanLinkKey(&event.LinkedSpanTp), + tpToSpanLink(&event.SpanTp), + ) + + return request.Span{}, true, nil +} + +func (ctx *EBPFParseContext) consumePendingSpanLinks(span *request.Span) { + if ctx == nil || ctx.pendingSpanLinks == nil || span == nil { + return + } + + if !span.TraceID.IsValid() || !span.SpanID.IsValid() { + return + } + + ctx.pendingSpanLinks.consume(span) +} + +func tpToSpanLinkKey(tp *channelLinkTP) spanLinkKey { + return spanLinkKey{ + traceID: trace.TraceID(tp.TraceId), + spanID: trace.SpanID(tp.SpanId), + } +} + +func tpToSpanLink(tp *channelLinkTP) request.SpanLink { + return request.SpanLink{ + TraceID: trace.TraceID(tp.TraceId), + SpanID: trace.SpanID(tp.SpanId), + TraceFlags: tp.Flags, + } +} + +func (p *pendingSpanLinks) recordPair(key spanLinkKey, link request.SpanLink) { + if p == nil || p.cache == nil { + return + } + + if !key.traceID.IsValid() || !key.spanID.IsValid() || !link.TraceID.IsValid() || !link.SpanID.IsValid() { + return + } + + links, _ := p.cache.Get(key) + for _, existing := range links { + if existing.TraceID == link.TraceID && existing.SpanID == link.SpanID { + return + } + } + + if len(links) >= maxLinksPerSpan { + return + } + + links = append(links, link) + p.cache.Add(key, links) +} + +func (p *pendingSpanLinks) consume(span *request.Span) { + if p == nil || p.cache == nil || span == nil { + return + } + + key := spanLinkKey{ + traceID: span.TraceID, + spanID: span.SpanID, + } + + links, ok := p.cache.Get(key) + if !ok || len(links) == 0 { + return + } + + for _, link := range links { + duplicate := false + for _, existing := range span.Links { + if existing.TraceID == link.TraceID && existing.SpanID == link.SpanID { + duplicate = true + break + } + } + if duplicate { + continue + } + span.Links = append(span.Links, link) + } + + p.cache.Remove(key) +} diff --git a/pkg/ebpf/common/go_channel_link_transform_test.go b/pkg/ebpf/common/go_channel_link_transform_test.go new file mode 100644 index 0000000000..f21e9cbffa --- /dev/null +++ b/pkg/ebpf/common/go_channel_link_transform_test.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ebpfcommon + +import ( + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/obi/pkg/appolly/app/request" + "go.opentelemetry.io/obi/pkg/internal/ebpf/ringbuf" +) + +func TestReadGoChannelLinkEvent(t *testing.T) { + parseCtx := &EBPFParseContext{ + pendingSpanLinks: newPendingSpanLinks(), + } + + leftTraceID := trace.TraceID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} + leftSpanID := trace.SpanID{2, 2, 2, 2, 2, 2, 2, 2} + rightTraceID := trace.TraceID{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} + rightSpanID := trace.SpanID{4, 4, 4, 4, 4, 4, 4, 4} + + event := channelLinkTrace{ + Type: EventTypeGoChannelLink, + } + copy(event.SpanTp.TraceId[:], leftTraceID[:]) + copy(event.SpanTp.SpanId[:], leftSpanID[:]) + event.SpanTp.Flags = TPFlagSampled + copy(event.LinkedSpanTp.TraceId[:], rightTraceID[:]) + copy(event.LinkedSpanTp.SpanId[:], rightSpanID[:]) + event.LinkedSpanTp.Flags = TPFlagSampled + + raw := unsafe.Slice((*byte)(unsafe.Pointer(&event)), int(unsafe.Sizeof(event))) + record := &ringbuf.Record{RawSample: append([]byte(nil), raw...)} + + span, ignore, err := readGoChannelLinkEvent(parseCtx, record) + require.NoError(t, err) + assert.True(t, ignore) + assert.Equal(t, request.Span{}, span) + + leftSpan := request.Span{TraceID: leftTraceID, SpanID: leftSpanID} + parseCtx.consumePendingSpanLinks(&leftSpan) + if assert.Len(t, leftSpan.Links, 1) { + assert.Equal(t, rightTraceID, leftSpan.Links[0].TraceID) + assert.Equal(t, rightSpanID, leftSpan.Links[0].SpanID) + assert.Equal(t, uint8(TPFlagSampled), leftSpan.Links[0].TraceFlags) + } + + rightSpan := request.Span{TraceID: rightTraceID, SpanID: rightSpanID} + parseCtx.consumePendingSpanLinks(&rightSpan) + if assert.Len(t, rightSpan.Links, 1) { + assert.Equal(t, leftTraceID, rightSpan.Links[0].TraceID) + assert.Equal(t, leftSpanID, rightSpan.Links[0].SpanID) + assert.Equal(t, uint8(TPFlagSampled), rightSpan.Links[0].TraceFlags) + } +} + +func TestPendingSpanLinksDeduplicates(t *testing.T) { + pending := newPendingSpanLinks() + + traceID := trace.TraceID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} + spanID := trace.SpanID{2, 2, 2, 2, 2, 2, 2, 2} + linkedTraceID := trace.TraceID{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} + linkedSpanID := trace.SpanID{4, 4, 4, 4, 4, 4, 4, 4} + + key := spanLinkKey{traceID: traceID, spanID: spanID} + link := request.SpanLink{TraceID: linkedTraceID, SpanID: linkedSpanID, TraceFlags: TPFlagSampled} + + pending.recordPair(key, link) + pending.recordPair(key, link) + + span := request.Span{TraceID: traceID, SpanID: spanID} + pending.consume(&span) + if assert.Len(t, span.Links, 1) { + assert.Equal(t, linkedTraceID, span.Links[0].TraceID) + assert.Equal(t, linkedSpanID, span.Links[0].SpanID) + } +} diff --git a/pkg/ebpf/tracer_linux.go b/pkg/ebpf/tracer_linux.go index 5e29e31181..c50c387dea 100644 --- a/pkg/ebpf/tracer_linux.go +++ b/pkg/ebpf/tracer_linux.go @@ -330,6 +330,7 @@ func (pt *ProcessTracer) NewExecutable(exe *link.Executable, ie *Instrumentable) } for _, p := range pt.Programs { + p.ProcessBinary(ie.FileInfo) p.RegisterOffsets(ie.FileInfo, ie.Offsets) // Go style Uprobes diff --git a/pkg/export/otel/traces_test.go b/pkg/export/otel/traces_test.go index 716c617b24..ea5bbfcde3 100644 --- a/pkg/export/otel/traces_test.go +++ b/pkg/export/otel/traces_test.go @@ -173,6 +173,44 @@ func TestGenerateTraces(t *testing.T) { assert.NotEqual(t, spans.At(1).SpanID().String(), spans.At(2).SpanID().String()) }) + t.Run("test with span links", func(t *testing.T) { + start := time.Now() + traceID, _ := trace.TraceIDFromHex("eae56fbbec9505c102e8aabfc6b5c481") + spanID, _ := trace.SpanIDFromHex("89cbc1f60aab3b04") + linkTraceID, _ := trace.TraceIDFromHex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + linkSpanID, _ := trace.SpanIDFromHex("bbbbbbbbbbbbbbbb") + span := &request.Span{ + Type: request.EventTypeManualSpan, + RequestStart: start.UnixNano(), + Start: start.UnixNano(), + End: start.Add(time.Second).UnixNano(), + Method: "work", + TraceID: traceID, + SpanID: spanID, + Service: svc.Attrs{UID: svc.UID{Name: "links"}}, + Links: []request.SpanLink{{ + TraceID: linkTraceID, + SpanID: linkSpanID, + TraceFlags: 1, + }}, + } + + traces := tracesgen.GenerateTracesWithAttributes( + cache, + &span.Service, + []attribute.KeyValue{}, + hostID, + groupFromSpanAndAttributes(span, []attribute.KeyValue{}), + reporterName, + ) + + otelSpan := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, 1, otelSpan.Links().Len()) + assert.Equal(t, pcommon.TraceID(linkTraceID), otelSpan.Links().At(0).TraceID()) + assert.Equal(t, pcommon.SpanID(linkSpanID), otelSpan.Links().At(0).SpanID()) + assert.Equal(t, uint32(1), otelSpan.Links().At(0).Flags()) + }) + t.Run("test with subtraces - generated ids", func(t *testing.T) { start := time.Now() span := &request.Span{ diff --git a/pkg/export/otel/tracesgen/tracesgen.go b/pkg/export/otel/tracesgen/tracesgen.go index 974bdd92af..d91c86c601 100644 --- a/pkg/export/otel/tracesgen/tracesgen.go +++ b/pkg/export/otel/tracesgen/tracesgen.go @@ -163,6 +163,19 @@ func GenerateTracesWithAttributes( if span.ParentSpanID.IsValid() { s.SetParentSpanID(pcommon.SpanID(span.ParentSpanID)) } + if len(span.Links) > 0 { + otelLinks := s.Links() + otelLinks.EnsureCapacity(len(span.Links)) + for _, link := range span.Links { + if !link.TraceID.IsValid() || !link.SpanID.IsValid() { + continue + } + otelLink := otelLinks.AppendEmpty() + otelLink.SetTraceID(pcommon.TraceID(link.TraceID)) + otelLink.SetSpanID(pcommon.SpanID(link.SpanID)) + otelLink.SetFlags(uint32(link.TraceFlags)) + } + } // Set span attributes m := AttrsToMap(attrs) diff --git a/pkg/internal/ebpf/gotracer/gotracer.go b/pkg/internal/ebpf/gotracer/gotracer.go index 4571fa125f..278b6f37a3 100644 --- a/pkg/internal/ebpf/gotracer/gotracer.go +++ b/pkg/internal/ebpf/gotracer/gotracer.go @@ -49,6 +49,8 @@ type Tracer struct { closers []io.Closer disabledRouteHarvesting bool supportsBPFLoop bool + channelLinkOffsetsByIno map[uint64]bool + currentBinaryIno uint64 } func New(pidFilter ebpfcommon.ServiceFilter, cfg *obi.Config, metrics imetrics.Reporter) *Tracer { @@ -70,6 +72,7 @@ func New(pidFilter ebpfcommon.ServiceFilter, cfg *obi.Config, metrics imetrics.R metrics: metrics, disabledRouteHarvesting: disabledRouteHarvesting, supportsBPFLoop: ebpfcommon.SupportsEBPFLoops(log, cfg.EBPF.OverrideBPFLoopEnabled), + channelLinkOffsetsByIno: map[uint64]bool{}, } } @@ -130,6 +133,20 @@ func (p *Tracer) constants() map[string]any { func (p *Tracer) SetupTailCalls() {} func (p *Tracer) RegisterOffsets(fileInfo *exec.FileInfo, offsets *goexec.Offsets) { + channelLinkEnabled := false + if offsets != nil { + _, haveDataqsiz := offsets.Field[goexec.HchanDataqsizPos].(uint64) + _, haveSendx := offsets.Field[goexec.HchanSendxPos].(uint64) + _, haveRecvx := offsets.Field[goexec.HchanRecvxPos].(uint64) + channelLinkEnabled = haveDataqsiz && haveSendx && haveRecvx + } + + p.channelLinkOffsetsByIno[fileInfo.Ino] = channelLinkEnabled + if !channelLinkEnabled { + p.log.Debug("disabling Go channel link probes for binary with missing runtime.hchan offsets", + "pid", fileInfo.Pid, "ino", fileInfo.Ino, "cmd", fileInfo.CmdExePath) + } + offTable := BpfOffTableT{} // Set the field offsets and the logLevel for the Go BPF program in a map for _, field := range []goexec.GoOffset{ @@ -203,6 +220,10 @@ func (p *Tracer) RegisterOffsets(fileInfo *exec.FileInfo, offsets *goexec.Offset goexec.GrpcClientStreamStream, // go manual spans goexec.GoTracerDelegatePos, + // go runtime channels + goexec.HchanDataqsizPos, + goexec.HchanSendxPos, + goexec.HchanRecvxPos, // go jsonrpc goexec.GoJsonrpcRequestHeaderServiceMethodPos, // go mongodb @@ -261,7 +282,14 @@ func (p *Tracer) RegisterOffsets(fileInfo *exec.FileInfo, offsets *goexec.Offset } } -func (p *Tracer) ProcessBinary(_ *exec.FileInfo) {} +func (p *Tracer) ProcessBinary(fileInfo *exec.FileInfo) { + if fileInfo == nil { + p.currentBinaryIno = 0 + return + } + + p.currentBinaryIno = fileInfo.Ino +} func (p *Tracer) AddCloser(c ...io.Closer) { p.closers = append(p.closers, c...) @@ -586,6 +614,25 @@ func (p *Tracer) GoProbes() map[string][]*ebpfcommon.ProbeDesc { }}, } + channelLinkEnabled := true + if p.currentBinaryIno != 0 { + channelLinkEnabled = p.channelLinkOffsetsByIno[p.currentBinaryIno] + } + if channelLinkEnabled { + m["runtime.chansend1"] = []*ebpfcommon.ProbeDesc{{ + Start: p.bpfObjects.ObiUprobeRuntimeChansend1, + End: p.bpfObjects.ObiUprobeRuntimeChansend1Return, + }} + m["runtime.chanrecv1"] = []*ebpfcommon.ProbeDesc{{ + Start: p.bpfObjects.ObiUprobeRuntimeChanrecv1, + End: p.bpfObjects.ObiUprobeRuntimeChanrecv1Return, + }} + m["runtime.chanrecv2"] = []*ebpfcommon.ProbeDesc{{ + Start: p.bpfObjects.ObiUprobeRuntimeChanrecv2, + End: p.bpfObjects.ObiUprobeRuntimeChanrecv2Return, + }} + } + // HTTP Header extraction // with bpf_loop we scan the buffer with a single uprobe - this is less overhead // otherwise we have a probe per header net/textproto.(*Reader).readContinuedLineSlice diff --git a/pkg/internal/goexec/offsets.json b/pkg/internal/goexec/offsets.json index b355c59002..8fba0ab9fd 100755 --- a/pkg/internal/goexec/offsets.json +++ b/pkg/internal/goexec/offsets.json @@ -1289,6 +1289,52 @@ } ] } + }, + "runtime.hchan": { + "dataqsiz": { + "versions": { + "oldest": "1.17.0", + "newest": "1.26.1" + }, + "offsets": [ + { + "offset": 8, + "since": "1.17.0" + } + ] + }, + "recvx": { + "versions": { + "oldest": "1.17.0", + "newest": "1.26.1" + }, + "offsets": [ + { + "offset": 48, + "since": "1.17.0" + }, + { + "offset": 56, + "since": "1.23.0" + } + ] + }, + "sendx": { + "versions": { + "oldest": "1.17.0", + "newest": "1.26.1" + }, + "offsets": [ + { + "offset": 40, + "since": "1.17.0" + }, + { + "offset": 48, + "since": "1.23.0" + } + ] + } } } } \ No newline at end of file diff --git a/pkg/internal/goexec/structmembers.go b/pkg/internal/goexec/structmembers.go index c138f2ad59..9ee29e7b1b 100644 --- a/pkg/internal/goexec/structmembers.go +++ b/pkg/internal/goexec/structmembers.go @@ -106,6 +106,10 @@ const ( GoTracerDelegatePos GoTracerAttributeOptOffset GoErrorStringOffset + // go runtime channels + HchanDataqsizPos + HchanSendxPos + HchanRecvxPos // go jsonrpc GoJsonrpcRequestHeaderServiceMethodPos // go mongodb @@ -419,6 +423,14 @@ var structMembers = map[string]structInfo{ "delegate": GoTracerDelegatePos, }, }, + "runtime.hchan": { + lib: "go", + fields: map[string]GoOffset{ + "dataqsiz": HchanDataqsizPos, + "sendx": HchanSendxPos, + "recvx": HchanRecvxPos, + }, + }, "go.mongodb.org/mongo-driver/mongo.Collection": { lib: "go.mongodb.org/mongo-driver", fields: map[string]GoOffset{