diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb index 28981c9450..9ab8a07e4b 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/perform.rb @@ -37,6 +37,37 @@ def start_span(name, _id, payload) { span: span, ctx_token: attach_consumer_context(span, parent_context) } end + # Overrides `Default#start` to also snapshot performance metrics at job start + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] the payload passed as a method argument + def start(name, id, payload) + payload.merge!(__otel: start_span(name, id, payload), __otel_metrics: snapshot_metrics) + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + end + + # Overrides `Default#finish` to record performance metrics on the span + # + # @param _name [String] of the Event (unused) + # @param _id [String] of the event (unused) + # @param payload [Hash] containing job run information + def finish(_name, _id, payload) + otel = payload.delete(:__otel) + metrics_start = payload.delete(:__otel_metrics) + span = otel&.fetch(:span) + token = otel&.fetch(:ctx_token) + + record_metrics(span, metrics_start) if span && metrics_start + on_exception(payload[:error] || payload[:exception_object], span) + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + ensure + finish_span(span, token) + end + # This method attaches a span to multiple contexts: # 1. Registers the ingress span as the top level ActiveJob span. # This is used later to enrich the ingress span in children, e.g. setting span status to error when a child event like `discard` terminates due to an error @@ -50,6 +81,54 @@ def attach_consumer_context(span, parent_context) OpenTelemetry::Context.attach(internal_context) end + + private + + def snapshot_metrics + { + monotonic_time: Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond), + cpu_time: now_cpu, + gc_time: now_gc, + allocations: GC.stat(:total_allocated_objects) + } + end + + def record_metrics(span, start) + finish = snapshot_metrics + + duration = finish[:monotonic_time] - start[:monotonic_time] + cpu_time = finish[:cpu_time] - start[:cpu_time] + idle_time = [duration - cpu_time, 0.0].max + gc_time = (finish[:gc_time] - start[:gc_time]) / 1_000_000.0 + allocations = finish[:allocations] - start[:allocations] + + span.set_attribute('messaging.active_job.job.cpu_time', cpu_time) + span.set_attribute('messaging.active_job.job.idle_time', idle_time) + span.set_attribute('messaging.active_job.job.gc_time', gc_time) + span.set_attribute('messaging.active_job.job.allocations', allocations) + end + + if GC.respond_to?(:total_time) + def now_gc + GC.total_time + end + else + def now_gc + 0 + end + end + + begin + Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID, :float_millisecond) + + def now_cpu + Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID, :float_millisecond) + end + rescue StandardError + def now_cpu + 0.0 + end + end end end end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb index 5b97435fd7..42f8c5a127 100644 --- a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/perform_test.rb @@ -101,6 +101,40 @@ end describe 'attributes' do + describe 'performance metrics' do + it 'records allocations on the process span' do + TestJob.perform_now + + _(process_span.attributes['messaging.active_job.job.allocations']).must_be :>, 0 + end + + it 'records cpu_time on the process span' do + TestJob.perform_now + + _(process_span.attributes['messaging.active_job.job.cpu_time']).must_be :>=, 0.0 + end + + it 'records idle_time on the process span' do + TestJob.perform_now + + _(process_span.attributes['messaging.active_job.job.idle_time']).must_be :>=, 0.0 + end + + it 'records gc_time on the process span' do + TestJob.perform_now + + _(process_span.attributes['messaging.active_job.job.gc_time']).must_be :>=, 0.0 + end + + it 'does not record performance metrics on the publish span' do + TestJob.perform_later + + %w[allocations cpu_time idle_time gc_time].each do |metric| + _(publish_span.attributes["messaging.active_job.job.#{metric}"]).must_be_nil + end + end + end + describe 'active_job.priority' do it 'is unset for unprioritized jobs' do TestJob.perform_later