Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bpf/common/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ const tcp_large_buffer_t *unused_9 __attribute__((unused));
const otel_span_t *unused_10 __attribute__((unused));
const mongo_go_client_req_t *unused_11 __attribute__((unused));
const dns_req_t *unused_12 __attribute__((unused));
const channel_link_trace_t *unused_13 __attribute__((unused));
9 changes: 8 additions & 1 deletion bpf/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,16 @@ typedef struct otel_span {
span_description_t span_description;
pid_info pid;
otel_attributes_t span_attrs;
u8 _epad[6];
u8 _span_pad[6];
} otel_span_t;

typedef struct channel_link_trace {
u8 type; // Must be first
u8 _pad[7];
tp_info_t span_tp;
tp_info_t linked_span_tp;
} channel_link_trace_t;

typedef struct mongo_go_client_req {
u8 type; // Must be first
u8 err;
Expand Down
1 change: 1 addition & 0 deletions bpf/common/event_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
#define EVENT_GO_MONGO 14
#define EVENT_FAILED_CONNECT 15
#define EVENT_DNS_REQUEST 16
#define EVENT_GO_CHANNEL_LINK 17
9 changes: 9 additions & 0 deletions bpf/gotracer/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "common/tp_info.h"
#include <bpfcore/utils.h>

#include <common/common.h>
#include <common/go_addr_key.h>
#include <common/map_sizing.h>
#include <common/pin_internal.h>
Expand Down Expand Up @@ -84,6 +85,14 @@ struct {
__uint(pinning, OBI_PIN_INTERNAL);
} go_trace_map SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, go_addr_key_t); // span pointer
__type(value, otel_span_t);
__uint(max_entries, MAX_CONCURRENT_CUSTOM_SPANS);
__uint(pinning, OBI_PIN_INTERNAL);
} active_spans SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, go_addr_key_t); // key: goroutine
Expand Down
4 changes: 4 additions & 0 deletions bpf/gotracer/go_offsets.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ typedef enum {
_tracer_delegate_pos,
_tracer_attribute_opt_off,
_error_string_off,
// go runtime channels
_hchan_dataqsiz_pos,
_hchan_sendx_pos,
_hchan_recvx_pos,
// go jsonrpc
_jsonrpc_request_header_service_method_pos,
// go mongodb
Expand Down
305 changes: 305 additions & 0 deletions bpf/gotracer/go_runtime.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include <gotracer/maps/redis.h>
#include <gotracer/maps/runtime.h>

#include <common/event_defs.h>
#include <common/ringbuf.h>

#include <gotracer/types/grpc.h>
#include <gotracer/types/nethttp.h>

Expand Down Expand Up @@ -105,6 +108,308 @@ int obi_uprobe_runtime_newproc1_return(struct pt_regs *ctx) {
return 0;
}

static __always_inline bool read_hchan_offsets(off_table_t *ot, u64 *dataqsiz, u64 *sendx, u64 *recvx) {
if (!ot || !dataqsiz || !sendx || !recvx) {
return false;
}

*dataqsiz = go_offset_of(ot, (go_offset){.v = _hchan_dataqsiz_pos});
*sendx = go_offset_of(ot, (go_offset){.v = _hchan_sendx_pos});
*recvx = go_offset_of(ot, (go_offset){.v = _hchan_recvx_pos});

return *dataqsiz != (u64)-1 && *sendx != (u64)-1 && *recvx != (u64)-1;
}

static __always_inline bool current_otel_handoff(struct pt_regs *ctx, chan_handoff_t *handoff) {
if (!handoff) {
return false;
}

void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

grpc_srv_func_invocation_t *grpc_server_inv =
bpf_map_lookup_elem(&ongoing_grpc_server_requests, &g_key);
if (grpc_server_inv && valid_trace(grpc_server_inv->tp.trace_id)) {
tp_clone(&handoff->tp, &grpc_server_inv->tp);
return true;
}

grpc_client_func_invocation_t *grpc_client_inv =
bpf_map_lookup_elem(&ongoing_grpc_client_requests, &g_key);
if (grpc_client_inv && valid_trace(grpc_client_inv->tp.trace_id)) {
tp_clone(&handoff->tp, &grpc_client_inv->tp);
return true;
}

server_http_func_invocation_t *http_server_inv =
bpf_map_lookup_elem(&ongoing_http_server_requests, &g_key);
if (http_server_inv && valid_trace(http_server_inv->tp.trace_id)) {
tp_clone(&handoff->tp, &http_server_inv->tp);
return true;
}

tp_info_t *kafka_go_tp = bpf_map_lookup_elem(&produce_traceparents_by_goroutine, &g_key);
if (kafka_go_tp && valid_trace(kafka_go_tp->trace_id)) {
tp_clone(&handoff->tp, kafka_go_tp);
return true;
}

mongo_go_client_req_t *mongo = bpf_map_lookup_elem(&ongoing_mongo_requests, &g_key);
if (mongo && valid_trace(mongo->tp.trace_id)) {
tp_clone(&handoff->tp, &mongo->tp);
return true;
}

redis_client_req_t *redis = bpf_map_lookup_elem(&ongoing_redis_requests, &g_key);
if (redis && valid_trace(redis->tp.trace_id)) {
tp_clone(&handoff->tp, &redis->tp);
return true;
}

sql_func_invocation_t *sql = bpf_map_lookup_elem(&ongoing_sql_queries, &g_key);
if (sql && valid_trace(sql->tp.trace_id)) {
tp_clone(&handoff->tp, &sql->tp);
return true;
}

const u64 pid_tgid = bpf_get_current_pid_tgid();
obi_ctx_info_t *obi_ctx = obi_ctx__get(pid_tgid);
if (!obi_ctx) {
return false;
}

if (!valid_trace(obi_ctx->trace_id) || *((u64 *)obi_ctx->span_id) == 0) {
return false;
}

__builtin_memcpy(handoff->tp.trace_id, obi_ctx->trace_id, sizeof(handoff->tp.trace_id));
__builtin_memcpy(handoff->tp.span_id, obi_ctx->span_id, sizeof(handoff->tp.span_id));
*((u64 *)handoff->tp.parent_id) = 0;
handoff->tp.flags = 0;
return true;
}

static __always_inline void emit_channel_handoff(chan_handoff_t *sender, chan_handoff_t *receiver) {
if (!sender || !receiver || !valid_trace(sender->tp.trace_id) ||
!valid_trace(receiver->tp.trace_id)) {
return;
}

if (*((u64 *)sender->tp.span_id) == *((u64 *)receiver->tp.span_id) &&
*((u64 *)sender->tp.trace_id) == *((u64 *)receiver->tp.trace_id) &&
*((u64 *)(sender->tp.trace_id + 8)) == *((u64 *)(receiver->tp.trace_id + 8))) {
return;
}

channel_link_trace_t *trace = bpf_ringbuf_reserve(&events, sizeof(*trace), 0);
if (!trace) {
return;
}

trace->type = EVENT_GO_CHANNEL_LINK;
tp_clone(&trace->span_tp, &sender->tp);
tp_clone(&trace->linked_span_tp, &receiver->tp);
bpf_ringbuf_submit(trace, get_flags());
}

static __always_inline bool read_channel_u64(void *chan_ptr, u64 offset, u64 *value) {
if (!chan_ptr || !value) {
return false;
}

bpf_probe_read_user(value, sizeof(*value), chan_ptr + offset);
return true;
}

static __always_inline u64 circular_slot(u64 index, u64 dataqsiz) {
if (dataqsiz == 0) {
return 0;
}
return index == 0 ? dataqsiz - 1 : index - 1;
}

static __always_inline int channel_send_start(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

chan_func_invocation_t invocation = {.chan_ptr = (u64)GO_PARAM1(ctx)};
bpf_map_update_elem(&chansend_invocations, &g_key, &invocation, BPF_ANY);
return 0;
}

static __always_inline int channel_send_return(struct pt_regs *ctx) {
off_table_t *ot = get_offsets_table();
u64 hchan_dataqsiz_off = 0;
u64 hchan_sendx_off = 0;
u64 hchan_recvx_off = 0;
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

chan_func_invocation_t *invocation = bpf_map_lookup_elem(&chansend_invocations, &g_key);
if (!invocation) {
return 0;
}

if (!read_hchan_offsets(ot, &hchan_dataqsiz_off, &hchan_sendx_off, &hchan_recvx_off)) {
bpf_map_delete_elem(&chansend_invocations, &g_key);
return 0;
}

chan_handoff_t sender = {0};
if (current_otel_handoff(ctx, &sender)) {
u64 dataqsiz = 0;
read_channel_u64((void *)invocation->chan_ptr, hchan_dataqsiz_off, &dataqsiz);

if (dataqsiz == 0) {
chan_handoff_t *receiver =
bpf_map_lookup_elem(&direct_channel_receivers, &invocation->chan_ptr);
if (receiver) {
emit_channel_handoff(&sender, receiver);
bpf_map_delete_elem(&direct_channel_receivers, &invocation->chan_ptr);
} else {
bpf_map_update_elem(
&direct_channel_senders, &invocation->chan_ptr, &sender, BPF_ANY);
}
} else {
u64 sendx = 0;
read_channel_u64((void *)invocation->chan_ptr, hchan_sendx_off, &sendx);

chan_handoff_key_t key = {
.chan_ptr = invocation->chan_ptr,
.slot = circular_slot(sendx, dataqsiz),
};
bpf_map_update_elem(&buffered_channel_handoffs, &key, &sender, BPF_ANY);
}
}

bpf_map_delete_elem(&chansend_invocations, &g_key);
return 0;
}

static __always_inline int channel_recv_start(struct pt_regs *ctx) {
off_table_t *ot = get_offsets_table();
u64 hchan_dataqsiz_off = 0;
u64 hchan_sendx_off = 0;
u64 hchan_recvx_off = 0;
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

chan_func_invocation_t invocation = {.chan_ptr = (u64)GO_PARAM1(ctx)};
chan_handoff_t receiver = {0};
if (current_otel_handoff(ctx, &receiver)) {
tp_clone(&invocation.tp, &receiver.tp);
invocation.has_tp = 1;
}
bpf_map_update_elem(&chanrecv_invocations, &g_key, &invocation, BPF_ANY);

if (!read_hchan_offsets(ot, &hchan_dataqsiz_off, &hchan_sendx_off, &hchan_recvx_off)) {
return 0;
}

u64 dataqsiz = 0;
read_channel_u64((void *)invocation.chan_ptr, hchan_dataqsiz_off, &dataqsiz);
if (dataqsiz == 0 && invocation.has_tp) {
bpf_map_update_elem(&direct_channel_receivers, &invocation.chan_ptr, &receiver, BPF_ANY);
}

return 0;
}

static __always_inline int channel_recv_return(struct pt_regs *ctx) {
off_table_t *ot = get_offsets_table();
u64 hchan_dataqsiz_off = 0;
u64 hchan_sendx_off = 0;
u64 hchan_recvx_off = 0;
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

chan_func_invocation_t *invocation = bpf_map_lookup_elem(&chanrecv_invocations, &g_key);
if (!invocation) {
return 0;
}

if (!read_hchan_offsets(ot, &hchan_dataqsiz_off, &hchan_sendx_off, &hchan_recvx_off)) {
bpf_map_delete_elem(&chanrecv_invocations, &g_key);
return 0;
}

chan_handoff_t receiver = {0};
bool have_receiver = current_otel_handoff(ctx, &receiver);
if (!have_receiver && invocation->has_tp) {
tp_clone(&receiver.tp, &invocation->tp);
have_receiver = true;
}

u64 dataqsiz = 0;
read_channel_u64((void *)invocation->chan_ptr, hchan_dataqsiz_off, &dataqsiz);

if (dataqsiz == 0) {
bpf_map_delete_elem(&direct_channel_receivers, &invocation->chan_ptr);
chan_handoff_t *sender =
bpf_map_lookup_elem(&direct_channel_senders, &invocation->chan_ptr);
if (sender) {
if (have_receiver) {
emit_channel_handoff(sender, &receiver);
}
bpf_map_delete_elem(&direct_channel_senders, &invocation->chan_ptr);
}
} else {
u64 recvx = 0;
read_channel_u64((void *)invocation->chan_ptr, hchan_recvx_off, &recvx);

chan_handoff_key_t key = {
.chan_ptr = invocation->chan_ptr,
.slot = circular_slot(recvx, dataqsiz),
};
chan_handoff_t *sender = bpf_map_lookup_elem(&buffered_channel_handoffs, &key);
if (sender) {
if (have_receiver) {
emit_channel_handoff(sender, &receiver);
}
bpf_map_delete_elem(&buffered_channel_handoffs, &key);
}
}

bpf_map_delete_elem(&chanrecv_invocations, &g_key);
return 0;
}

SEC("uprobe/runtime_chansend1")
int obi_uprobe_runtime_chansend1(struct pt_regs *ctx) {
return channel_send_start(ctx);
}

SEC("uprobe/runtime_chansend1_return")
int obi_uprobe_runtime_chansend1_return(struct pt_regs *ctx) {
return channel_send_return(ctx);
}

SEC("uprobe/runtime_chanrecv1")
int obi_uprobe_runtime_chanrecv1(struct pt_regs *ctx) {
return channel_recv_start(ctx);
}

SEC("uprobe/runtime_chanrecv1_return")
int obi_uprobe_runtime_chanrecv1_return(struct pt_regs *ctx) {
return channel_recv_return(ctx);
}

SEC("uprobe/runtime_chanrecv2")
int obi_uprobe_runtime_chanrecv2(struct pt_regs *ctx) {
return channel_recv_start(ctx);
}

SEC("uprobe/runtime_chanrecv2_return")
int obi_uprobe_runtime_chanrecv2_return(struct pt_regs *ctx) {
return channel_recv_return(ctx);
}

enum gstatus {
// _Gidle: just allocated, not yet initialized
g_idle = 0,
Expand Down
Loading
Loading