Skip to content

Commit 71ab41f

Browse files
feat(active-job): Add performance metrics to process span
Record cpu_time, idle_time, gc_time, and allocations as span attributes on the perform.active_job process span, mirroring the metrics that ActiveSupport::Notifications::Event captures internally.
1 parent e040789 commit 71ab41f

2 files changed

Lines changed: 115 additions & 0 deletions

File tree

  • instrumentation/active_job
    • lib/opentelemetry/instrumentation/active_job/handlers
    • test/opentelemetry/instrumentation/active_job/handlers

instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,37 @@ def start_span(name, _id, payload)
3737
{ span: span, ctx_token: attach_consumer_context(span, parent_context) }
3838
end
3939

40+
# Overrides `Default#start` to also snapshot performance metrics at job start
41+
#
42+
# @param name [String] of the Event
43+
# @param id [String] of the event
44+
# @param payload [Hash] containing job run information
45+
# @return [Hash] the payload passed as a method argument
46+
def start(name, id, payload)
47+
payload.merge!(__otel: start_span(name, id, payload), __otel_metrics: snapshot_metrics)
48+
rescue StandardError => e
49+
OpenTelemetry.handle_error(exception: e)
50+
end
51+
52+
# Overrides `Default#finish` to record performance metrics on the span
53+
#
54+
# @param _name [String] of the Event (unused)
55+
# @param _id [String] of the event (unused)
56+
# @param payload [Hash] containing job run information
57+
def finish(_name, _id, payload)
58+
otel = payload.delete(:__otel)
59+
metrics_start = payload.delete(:__otel_metrics)
60+
span = otel&.fetch(:span)
61+
token = otel&.fetch(:ctx_token)
62+
63+
record_metrics(span, metrics_start) if span && metrics_start
64+
on_exception(payload[:error] || payload[:exception_object], span)
65+
rescue StandardError => e
66+
OpenTelemetry.handle_error(exception: e)
67+
ensure
68+
finish_span(span, token)
69+
end
70+
4071
# This method attaches a span to multiple contexts:
4172
# 1. Registers the ingress span as the top level ActiveJob span.
4273
# This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error
@@ -50,6 +81,56 @@ def attach_consumer_context(span, parent_context)
5081

5182
OpenTelemetry::Context.attach(internal_context)
5283
end
84+
85+
private
86+
87+
def snapshot_metrics
88+
{
89+
monotonic_time: Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond),
90+
cpu_time: now_cpu,
91+
gc_time: now_gc,
92+
allocations: GC.stat(:total_allocated_objects)
93+
}
94+
end
95+
96+
def record_metrics(span, start)
97+
finish = snapshot_metrics
98+
99+
duration = finish[:monotonic_time] - start[:monotonic_time]
100+
cpu_time = finish[:cpu_time] - start[:cpu_time]
101+
idle_time = [duration - cpu_time, 0.0].max
102+
gc_time = (finish[:gc_time] - start[:gc_time]) / 1_000_000.0
103+
allocations = finish[:allocations] - start[:allocations]
104+
105+
span.set_attributes(
106+
'messaging.active_job.job.cpu_time' => cpu_time,
107+
'messaging.active_job.job.idle_time' => idle_time,
108+
'messaging.active_job.job.gc_time' => gc_time,
109+
'messaging.active_job.job.allocations' => allocations
110+
)
111+
end
112+
113+
if GC.respond_to?(:total_time)
114+
def now_gc
115+
GC.total_time
116+
end
117+
else
118+
def now_gc
119+
0
120+
end
121+
end
122+
123+
begin
124+
Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID, :float_millisecond)
125+
126+
def now_cpu
127+
Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID, :float_millisecond)
128+
end
129+
rescue StandardError
130+
def now_cpu
131+
0.0
132+
end
133+
end
53134
end
54135
end
55136
end

instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,40 @@
101101
end
102102

103103
describe 'attributes' do
104+
describe 'performance metrics' do
105+
it 'records allocations on the process span' do
106+
TestJob.perform_now
107+
108+
_(process_span.attributes['messaging.active_job.job.allocations']).must_be :>, 0
109+
end
110+
111+
it 'records cpu_time on the process span' do
112+
TestJob.perform_now
113+
114+
_(process_span.attributes['messaging.active_job.job.cpu_time']).must_be :>=, 0.0
115+
end
116+
117+
it 'records idle_time on the process span' do
118+
TestJob.perform_now
119+
120+
_(process_span.attributes['messaging.active_job.job.idle_time']).must_be :>=, 0.0
121+
end
122+
123+
it 'records gc_time on the process span' do
124+
TestJob.perform_now
125+
126+
_(process_span.attributes['messaging.active_job.job.gc_time']).must_be :>=, 0.0
127+
end
128+
129+
it 'does not record performance metrics on the publish span' do
130+
TestJob.perform_later
131+
132+
%w[allocations cpu_time idle_time gc_time].each do |metric|
133+
_(publish_span.attributes["messaging.active_job.job.#{metric}"]).must_be_nil
134+
end
135+
end
136+
end
137+
104138
describe 'active_job.priority' do
105139
it 'is unset for unprioritized jobs' do
106140
TestJob.perform_later

0 commit comments

Comments
 (0)