Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,37 @@ def start_span(name, _id, payload)
{ span: span, ctx_token: attach_consumer_context(span, parent_context) }
end

# Overrides `Default#start` to also snapshot performance metrics at job start
#
# @param name [String] of the Event
# @param id [String] of the event
# @param payload [Hash] containing job run information
# @return [Hash] the payload passed as a method argument
def start(name, id, payload)
payload.merge!(__otel: start_span(name, id, payload), __otel_metrics: snapshot_metrics)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

# Overrides `Default#finish` to record performance metrics on the span
#
# @param _name [String] of the Event (unused)
# @param _id [String] of the event (unused)
# @param payload [Hash] containing job run information
def finish(_name, _id, payload)
otel = payload.delete(:__otel)
metrics_start = payload.delete(:__otel_metrics)
span = otel&.fetch(:span)
token = otel&.fetch(:ctx_token)

record_metrics(span, metrics_start) if span && metrics_start
on_exception(payload[:error] || payload[:exception_object], span)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
ensure
finish_span(span, token)
end

# This method attaches a span to multiple contexts:
# 1. Registers the ingress span as the top level ActiveJob span.
# This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error
Expand All @@ -50,6 +81,54 @@ def attach_consumer_context(span, parent_context)

OpenTelemetry::Context.attach(internal_context)
end

private

def snapshot_metrics
{
monotonic_time: Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond),
cpu_time: now_cpu,
gc_time: now_gc,
allocations: GC.stat(:total_allocated_objects)
}
end

def record_metrics(span, start)
finish = snapshot_metrics

duration = finish[:monotonic_time] - start[:monotonic_time]
cpu_time = finish[:cpu_time] - start[:cpu_time]
idle_time = [duration - cpu_time, 0.0].max
gc_time = (finish[:gc_time] - start[:gc_time]) / 1_000_000.0
allocations = finish[:allocations] - start[:allocations]

span.set_attribute('messaging.active_job.job.cpu_time', cpu_time)
span.set_attribute('messaging.active_job.job.idle_time', idle_time)
span.set_attribute('messaging.active_job.job.gc_time', gc_time)
span.set_attribute('messaging.active_job.job.allocations', allocations)
end

if GC.respond_to?(:total_time)
def now_gc
GC.total_time
end
else
def now_gc
0
end
end

begin
Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID, :float_millisecond)

def now_cpu
Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID, :float_millisecond)
end
rescue StandardError
def now_cpu
0.0
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,40 @@
end

describe 'attributes' do
describe 'performance metrics' do
it 'records allocations on the process span' do
TestJob.perform_now

_(process_span.attributes['messaging.active_job.job.allocations']).must_be :>, 0
end

it 'records cpu_time on the process span' do
TestJob.perform_now

_(process_span.attributes['messaging.active_job.job.cpu_time']).must_be :>=, 0.0
end

it 'records idle_time on the process span' do
TestJob.perform_now

_(process_span.attributes['messaging.active_job.job.idle_time']).must_be :>=, 0.0
end

it 'records gc_time on the process span' do
TestJob.perform_now

_(process_span.attributes['messaging.active_job.job.gc_time']).must_be :>=, 0.0
end

it 'does not record performance metrics on the publish span' do
TestJob.perform_later

%w[allocations cpu_time idle_time gc_time].each do |metric|
_(publish_span.attributes["messaging.active_job.job.#{metric}"]).must_be_nil
end
end
end

describe 'active_job.priority' do
it 'is unset for unprioritized jobs' do
TestJob.perform_later
Expand Down
Loading