diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 15b86392c6e..de90dbe16be 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -105,6 +105,9 @@ pub enum Feature { /// Stream minidumps to objectstore. #[serde(rename = "projects:relay-minidump-uploads")] MinidumpUploads, + /// Enable relay billing outcome generation. + #[serde(rename = "organizations:relay-generate-billing-outcome")] + GenerateBillingOutcome, /// Enables OTLP spans to use the Span V2 processing pipeline in Relay. /// diff --git a/relay-server/src/metrics/outcomes.rs b/relay-server/src/metrics/outcomes.rs index 0d97359baf5..9a7f37007ab 100644 --- a/relay-server/src/metrics/outcomes.rs +++ b/relay-server/src/metrics/outcomes.rs @@ -28,8 +28,8 @@ impl MetricOutcomes { pub fn track(&self, scoping: Scoping, buckets: &[impl TrackableBucket], outcome: Outcome) { let timestamp = Utc::now(); - // Never emit accepted outcomes for surrogate metrics. - // These are handled from within Sentry. + // Accepted outcomes go through `track_accepted_outcome`, which does + // additional work to prevent billing double-counting. if !matches!(outcome, Outcome::Accepted) { let SourceQuantities { transactions, @@ -58,6 +58,50 @@ impl MetricOutcomes { } } } + + /// Emits accepted outcomes, for the provided list of buckets. + /// + /// Additionally, adds a marker tag `billing_outcome_accepted` to all buckets for which an + /// outcome has been emitted. + pub fn track_accepted_outcome(&self, scoping: Scoping, buckets: &mut [Bucket]) { + let timestamp = Utc::now(); + for bucket in buckets { + let summary = bucket.summary(); + match summary { + BucketSummary::Spans { + count, + is_segment, + was_transaction: _, + } => { + if count == 0 { + continue; + } + + let categories = match is_segment { + true => [DataCategory::Span, DataCategory::Transaction].as_slice(), + false => [DataCategory::Span].as_slice(), + }; + + bucket + .tags + .insert("billing_outcome_accepted".to_owned(), "true".to_owned()); + + for category in categories { + self.outcomes.send(TrackOutcome { + timestamp, + scoping, + outcome: Outcome::Accepted, + event_id: None, + remote_addr: None, + category: *category, + quantity: count as u32, + }); + } + } + BucketSummary::None => continue, + }; + } + } } /// The return value of [`TrackableBucket::summary`]. diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 4792eb541a5..db9c6008d5a 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -19,6 +19,7 @@ use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token}; use relay_common::time::UnixTimestamp; use relay_config::{Config, HttpEncoding, UpstreamDescriptor}; +use relay_dynamic_config::Feature; use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup}; use relay_event_schema::processor::ProcessingAction; use relay_event_schema::protocol::ClientReport; @@ -1171,6 +1172,7 @@ impl EnvelopeProcessorService { /// /// This function runs the following steps: /// - rate limiting + /// - emit billing outcomes /// - submit to `StoreForwarder` #[cfg(feature = "processing")] async fn encode_metrics_processing( @@ -1188,7 +1190,7 @@ impl EnvelopeProcessorService { .. } in message.buckets.into_values() { - let buckets = self + let mut buckets = self .rate_limit_buckets(scoping, &project_info, buckets) .await; @@ -1196,6 +1198,17 @@ impl EnvelopeProcessorService { continue; } + if project_info + .config + .features + .has(Feature::GenerateBillingOutcome) + { + // Emit metric billing outcomes. + self.inner + .metric_outcomes + .track_accepted_outcome(scoping, &mut buckets); + } + let retention = project_info .config .event_retention diff --git a/tests/integration/test_ai.py b/tests/integration/test_ai.py index eb4a8985123..0f7f39bd21a 100644 --- a/tests/integration/test_ai.py +++ b/tests/integration/test_ai.py @@ -32,8 +32,10 @@ def test_ai_spans_example_transaction( outcomes_consumer = outcomes_consumer() project_id = 42 - mini_sentry.add_full_project_config(project_id) - + project = mini_sentry.add_full_project_config(project_id) + project["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) mini_sentry.global_config["aiModelMetadata"] = { "version": 1, "models": { @@ -1296,8 +1298,29 @@ def test_ai_spans_example_transaction( }, ] + num_messages = 3 + if relay_emits_accepted_outcome: + num_messages = 13 + outcomes = outcomes_consumer.get_aggregated_outcomes(n=num_messages) + if relay_emits_accepted_outcome: - assert outcomes_consumer.get_aggregated_outcomes() == [ + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 10, + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, @@ -1305,5 +1328,24 @@ def test_ai_spans_example_transaction( "outcome": 0, "project_id": 42, "quantity": 10, - } + }, + ] + else: + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 10, + }, ] diff --git a/tests/integration/test_attachment_ref.py b/tests/integration/test_attachment_ref.py index 2601e854e7a..ebbe44d0aba 100644 --- a/tests/integration/test_attachment_ref.py +++ b/tests/integration/test_attachment_ref.py @@ -254,8 +254,11 @@ def test_attachment_ref_validation( event_id = "515539018c9b4260a6f999572f1661ee" project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"].setdefault("features", []).append( - "projects:relay-upload-endpoint" + project_config["config"].setdefault("features", []).extend( + [ + "projects:relay-upload-endpoint", + "organizations:relay-generate-billing-outcome", + ] ) relay = relay_with_processing() @@ -275,7 +278,7 @@ def test_attachment_ref_validation( relay.send_envelope(project_id, envelope) - outcomes = outcomes_consumer.get_outcomes(n=3 if event_type == "transaction" else 2) + outcomes = outcomes_consumer.get_outcomes(n=5 if event_type == "transaction" else 2) o = {DataCategory(o["category"]): o for o in outcomes} assert o[DataCategory.ATTACHMENT]["reason"] == "invalid_placeholder_attachment" assert o[DataCategory.ATTACHMENT]["quantity"] == expected_bytes_quantity diff --git a/tests/integration/test_attachmentsv2.py b/tests/integration/test_attachmentsv2.py index 936a234b55e..46b0561eeac 100644 --- a/tests/integration/test_attachmentsv2.py +++ b/tests/integration/test_attachmentsv2.py @@ -353,6 +353,7 @@ def test_attachment_with_matching_span_store( project_config["config"]["features"] = [ "projects:span-v2-experimental-processing", "projects:span-v2-attachment-processing", + "organizations:relay-generate-billing-outcome", ] relay = relay_with_processing() @@ -424,8 +425,16 @@ def test_attachment_with_matching_span_store( objectstore = objectstore(usecase="trace_attachments", project_id=project_id) assert objectstore.get(metadata["attachment_id"]).payload.read() == body - outcomes = outcomes_consumer.get_aggregated_outcomes(n=3) + outcomes = outcomes_consumer.get_aggregated_outcomes(n=5) assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, { "category": DataCategory.ATTACHMENT.value, "key_id": 123, @@ -434,6 +443,14 @@ def test_attachment_with_matching_span_store( "project_id": 42, "quantity": 23, }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, diff --git a/tests/integration/test_otlp_logs.py b/tests/integration/test_otlp_logs.py index 80c7b57b8de..6145facdf21 100644 --- a/tests/integration/test_otlp_logs.py +++ b/tests/integration/test_otlp_logs.py @@ -187,6 +187,7 @@ def test_otlp_logs_multiple_records( project_config = mini_sentry.add_full_project_config(project_id) project_config["config"]["features"] = [ "organizations:ourlogs-ingestion", + "organizations:relay-generate-billing-outcome", ] project_config["config"]["retentions"] = { "log": {"standard": 30, "downsampled": 13 * 30}, diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index e3a6f7a81e3..72b1a68a3de 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -256,7 +256,11 @@ def test_outcomes_non_processing(relay, mini_sentry, event_type): Send one event that generates an outcome and verify that we get an outcomes batch with all necessary information set. """ - config = {"outcomes": {"emit_outcomes": True, "batch_size": 1, "batch_interval": 1}} + config = { + "outcomes": { + "emit_outcomes": True, + } + } relay = relay(mini_sentry, config) @@ -305,9 +309,7 @@ def test_outcomes_not_sent_when_disabled(relay, mini_sentry): Set batching to a very short interval and verify that we don't receive any outcome when we disable outcomes. """ - config = { - "outcomes": {"emit_outcomes": False, "batch_size": 1, "batch_interval": 1} - } + config = {"outcomes": {"emit_outcomes": False}} relay = relay(mini_sentry, config) @@ -407,8 +409,6 @@ def test_outcome_source(relay, mini_sentry): config = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, "source": "my-layer", } } @@ -445,8 +445,6 @@ def test_outcome_forwarding( processing_config = { "outcomes": { "emit_outcomes": False, # The default, overridden by processing.enabled: true - "batch_size": 1, - "batch_interval": 1, "source": "processing-layer", } } @@ -457,8 +455,6 @@ def test_outcome_forwarding( intermediate_config = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, "source": "intermediate-layer", } } @@ -514,8 +510,6 @@ def test_outcomes_forwarding_rate_limited( processing_config = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, "source": "processing-layer", } } @@ -525,8 +519,6 @@ def test_outcomes_forwarding_rate_limited( config_downstream = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, "source": "downstream-layer", } } @@ -762,7 +754,7 @@ def test_outcomes_rate_limit( Pass a transaction that is rate limited and check whether a rate limit outcome is emitted. """ - config = {"outcomes": {"emit_outcomes": True, "batch_size": 1, "batch_interval": 1}} + config = {"outcomes": {"emit_outcomes": True}} relay = relay_with_processing(config) project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) @@ -901,9 +893,6 @@ def test_filtered_event_outcome_client_reports(relay, mini_sentry): "outcomes": { "emit_outcomes": "as_client_reports", "source": "downstream-layer", - "aggregator": { - "flush_interval": 1, - }, } }, ) @@ -931,11 +920,6 @@ def test_filtered_event_outcome_kafka(relay, mini_sentry): { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "flush_interval": 1, - }, } }, ) @@ -1064,8 +1048,6 @@ def test_outcomes_aggregate_inbound_filters( { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, "aggregator": { "flush_interval": 1, }, @@ -1125,8 +1107,6 @@ def test_graceful_shutdown(relay, mini_sentry): "limits": {"shutdown_timeout": 1}, "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, "aggregator": { "flush_interval": 10, }, @@ -1201,7 +1181,9 @@ def test_profile_outcomes( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id)["config"] - project_config.setdefault("features", []).append("organizations:profiling") + project_config.setdefault("features", []).extend( + ["organizations:profiling", "organizations:relay-generate-billing-outcome"] + ) project_config["sampling"] = { "version": 2, "rules": [ @@ -1221,18 +1203,8 @@ def test_profile_outcomes( config = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "bucket_interval": 1, - "flush_interval": 1, - }, "source": "processing-relay", }, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - }, } # The innermost Relay needs to be in processing mode @@ -1277,9 +1249,6 @@ def make_envelope(transaction_name): project_id, make_envelope("ho") ) # should be kept by dynamic sampling - outcomes = outcomes_consumer.get_outcomes() - outcomes.sort(key=lambda o: sorted(o.items())) - expected_source = { 0: "processing-relay", 1: "pop-relay", @@ -1287,6 +1256,15 @@ def make_envelope(transaction_name): 2: "pop-relay", }[num_intermediate_relays] expected_outcomes = [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + "source": "processing-relay", + }, { "category": DataCategory.ATTACHMENT.value, # attachment "key_id": 123, @@ -1307,6 +1285,15 @@ def make_envelope(transaction_name): "reason": "Sampled:3000", "source": expected_source, }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 4, + "source": "processing-relay", + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, @@ -1337,8 +1324,10 @@ def make_envelope(transaction_name): "source": expected_source, }, ] - for outcome in outcomes: - outcome.pop("timestamp") + outcomes = outcomes_consumer.get_aggregated_outcomes(n=12) + outcomes.sort(key=lambda o: sorted(o.items())) + + assert outcomes == expected_outcomes, outcomes metrics = [ m @@ -1347,8 +1336,6 @@ def make_envelope(transaction_name): ] assert sum(metric["value"] for metric in metrics) == 2 - assert outcomes == expected_outcomes, outcomes - assert profiles_consumer.get_profile() assert profiles_consumer.get_profile() @@ -1379,7 +1366,9 @@ def test_profile_outcomes_invalid( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id)["config"] - project_config.setdefault("features", []).append("organizations:profiling") + project_config.setdefault("features", []).extend( + ["organizations:profiling", "organizations:relay-generate-billing-outcome"] + ) config = { "outcomes": { @@ -1418,6 +1407,15 @@ def make_envelope(): outcomes.sort(key=lambda o: sorted(o.items())) assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + "timestamp": time_within_delta(), + }, { "category": DataCategory.PROFILE.value, "key_id": 123, @@ -1438,6 +1436,15 @@ def make_envelope(): "reason": expected_outcome, "timestamp": time_within_delta(), }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + "timestamp": time_within_delta(), + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, @@ -1465,7 +1472,9 @@ def test_profile_outcomes_too_many( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id)["config"] - project_config.setdefault("features", []).append("organizations:profiling") + project_config.setdefault("features", []).extend( + ["organizations:profiling", "organizations:relay-generate-billing-outcome"] + ) config = { "outcomes": { @@ -1505,10 +1514,19 @@ def make_envelope(): envelope = make_envelope() upstream.send_envelope(project_id, envelope) - outcomes = outcomes_consumer.get_outcomes() + outcomes = outcomes_consumer.get_outcomes(n=5) outcomes.sort(key=lambda o: sorted(o.items())) assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + "timestamp": time_within_delta(), + }, { "category": DataCategory.PROFILE.value, "key_id": 123, @@ -1529,6 +1547,15 @@ def make_envelope(): "reason": "profiling_too_many_profiles", "timestamp": time_within_delta(), }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + "timestamp": time_within_delta(), + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, @@ -1565,7 +1592,9 @@ def test_profile_outcomes_rate_limited( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id)["config"] - project_config.setdefault("features", []).append("organizations:profiling") + project_config.setdefault("features", []).extend( + ["organizations:profiling", "organizations:relay-generate-billing-outcome"] + ) project_config["quotas"] = [ { "id": f"test_rate_limiting_{uuid.uuid4().hex}", @@ -1579,7 +1608,6 @@ def test_profile_outcomes_rate_limited( "outcomes": { "emit_outcomes": True, "aggregator": { - "bucket_interval": 1, "flush_interval": 1, }, } @@ -1613,22 +1641,22 @@ def test_profile_outcomes_rate_limited( outcomes = outcomes_consumer.get_outcomes() expected_categories = [ - (DataCategory.PROFILE, 1), - (DataCategory.PROFILE_INDEXED, 1), + (DataCategory.PROFILE.value, 1), + (DataCategory.PROFILE_INDEXED.value, 1), ] # If the platform header is set, the outcome can be emitted in the fast path, for all limits, # if the header is missing, it can only be enforced with consistent rate limiting, which only # happens for the `profile_ui` category (as the rate limit can't be enforced in the fast path). if with_platform_header or quota_category == "profile_ui": - expected_categories.append((DataCategory.PROFILE_UI, 1)) + expected_categories.append((DataCategory.PROFILE_UI.value, 1)) if quota_category == "transaction": # Transaction got rate limited as well: expected_categories += [ - (DataCategory.TRANSACTION, 1), - (DataCategory.TRANSACTION_INDEXED, 1), - (DataCategory.SPAN, 2), - (DataCategory.SPAN_INDEXED, 2), + (DataCategory.TRANSACTION.value, 1), + (DataCategory.TRANSACTION_INDEXED.value, 1), + (DataCategory.SPAN.value, 2), + (DataCategory.SPAN_INDEXED.value, 2), ] expected_outcomes = [ @@ -1640,7 +1668,6 @@ def test_profile_outcomes_rate_limited( "project_id": 42, "quantity": quantity, "reason": "profiles_exceeded", - "timestamp": time_within_delta(), } for (category, quantity) in expected_categories ] @@ -1648,16 +1675,40 @@ def test_profile_outcomes_rate_limited( if quota_category != "transaction": expected_outcomes.append( { - "category": DataCategory.SPAN_INDEXED, + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + } + ) + + expected_outcomes.append( + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + } + ) + + expected_outcomes.append( + { + "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, "org_id": 1, "outcome": 0, "project_id": 42, "quantity": 2, - "timestamp": time_within_delta(), } ) + for outcome in outcomes: + outcome.pop("timestamp") + outcomes.sort(key=lambda o: sorted(o.items())) expected_outcomes.sort(key=lambda o: sorted(o.items())) @@ -1686,12 +1737,6 @@ def test_profile_outcomes_rate_limited_when_dynamic_sampling_drops( config = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "bucket_interval": 1, - "flush_interval": 0, - }, }, } @@ -1749,6 +1794,9 @@ def test_span_outcomes( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id)["config"] + project_config.setdefault("features", []).extend( + ["organizations:profiling", "organizations:relay-generate-billing-outcome"] + ) project_config["sampling"] = { "version": 2, "rules": [ @@ -1768,18 +1816,8 @@ def test_span_outcomes( config = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "bucket_interval": 1, - "flush_interval": 1, - }, "source": "processing-relay", }, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - }, } # The innermost Relay needs to be in processing mode @@ -1816,18 +1854,26 @@ def make_envelope(transaction_name): project_id, make_envelope("ho") ) # should be kept by dynamic sampling - outcomes = outcomes_consumer.get_outcomes() - outcomes.sort(key=lambda o: sorted(o.items())) - expected_source = { 0: "processing-relay", 1: "pop-relay", 2: "pop-relay", }[num_intermediate_relays] - assert outcomes == [ + outcomes = outcomes_consumer.get_aggregated_outcomes(n=10) + outcomes.sort(key=lambda o: sorted(o.items())) + + expected_outcomes = [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + "source": "processing-relay", + }, { - "timestamp": time_within_delta(), "category": DataCategory.TRANSACTION_INDEXED.value, "key_id": 123, "org_id": 1, @@ -1838,7 +1884,15 @@ def make_envelope(transaction_name): "source": expected_source, }, { - "timestamp": time_within_delta(), + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 4, + "source": "processing-relay", + }, + { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, "org_id": 1, @@ -1848,7 +1902,6 @@ def make_envelope(transaction_name): "source": "processing-relay", }, { - "timestamp": time_within_delta(), "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, "org_id": 1, @@ -1860,6 +1913,8 @@ def make_envelope(transaction_name): }, ] + assert outcomes == expected_outcomes + def test_span_outcomes_invalid( mini_sentry, @@ -1962,18 +2017,8 @@ def test_replay_outcomes_item_failed( config = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "bucket_interval": 1, - "flush_interval": 1, - }, "source": "pop-relay", }, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - }, } upstream = relay_with_processing(config) diff --git a/tests/integration/test_profile_chunks.py b/tests/integration/test_profile_chunks.py index d5b1f49ceb2..6ef2a055dd4 100644 --- a/tests/integration/test_profile_chunks.py +++ b/tests/integration/test_profile_chunks.py @@ -144,8 +144,11 @@ def test_profile_chunk_outcomes_invalid( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id)["config"] - project_config.setdefault("features", []).append( - "organizations:continuous-profiling" + project_config.setdefault("features", []).extend( + [ + "organizations:continuous-profiling", + "organizations:relay-generate-billing-outcome", + ] ) upstream = relay_with_processing(TEST_CONFIG) diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 6abf5bcf004..b2bc3b747db 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -55,7 +55,9 @@ def test_span_extraction( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) - project_config["config"].setdefault("features", []) + project_config["config"].setdefault("features", []).append( + "organizations:relay-generate-billing-outcome" + ) if performance_issues_spans: project_config["config"]["features"].append( "organizations:performance-issues-spans" @@ -284,8 +286,30 @@ def test_span_extraction( spans_consumer.assert_empty() + num_messages = 3 + if relay_emits_accepted_outcome: + num_messages = 5 + + outcomes = outcomes_consumer.get_aggregated_outcomes(n=num_messages) + if relay_emits_accepted_outcome: - assert outcomes_consumer.get_aggregated_outcomes() == [ + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, @@ -293,7 +317,26 @@ def test_span_extraction( "outcome": 0, "project_id": 42, "quantity": 2, - } + }, + ] + else: + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + }, ] @@ -666,6 +709,9 @@ def test_rate_limit_indexed_consistent( relay = relay_with_processing() project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) project_config["config"]["quotas"] = [ { "categories": ["span_indexed"], @@ -694,11 +740,19 @@ def summarize_outcomes(): relay.send_envelope(project_id, envelope) spans = spans_consumer.get_spans(n=3, timeout=10) assert len(spans) == 3 - assert summarize_outcomes() == {(16, 0): 3} # SpanIndexed, Accepted + assert summarize_outcomes() == { + (16, 0): 3, + (12, 0): 3, + (2, 0): 1, + } # SpanIndexed, Accepted # Second batch is limited relay.send_envelope(project_id, envelope) - assert summarize_outcomes() == {(16, 2): 3} # SpanIndexed, RateLimited + assert summarize_outcomes() == { + (16, 2): 3, + (12, 0): 3, + (2, 0): 1, + } # SpanIndexed, RateLimited spans_consumer.assert_empty() outcomes_consumer.assert_empty() @@ -715,6 +769,9 @@ def test_rate_limit_consistent_extracted( relay = relay_with_processing(options=TEST_CONFIG) project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).append( + "organizations:relay-generate-billing-outcome" + ) project_config["config"]["quotas"] = [ { "categories": ["span"], @@ -759,7 +816,11 @@ def summarize_outcomes(): spans = spans_consumer.get_spans(n=2, timeout=10) # one for the transaction, one for the contained span assert len(spans) == 2 - assert summarize_outcomes() == {(16, 0): 2} # SpanIndexed, Accepted + assert summarize_outcomes() == { + (16, 0): 2, + (12, 0): 2, + (2, 0): 1, + } # SpanIndexed, Accepted # A limit only for span_indexed does not affect extracted metrics metrics = metrics_consumer.get_metrics(n=4) span_count = sum( @@ -843,6 +904,10 @@ def test_rate_limit_is_consistent_between_transaction_and_spans( relay = relay_with_processing(options=TEST_CONFIG) project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) + project_config["config"]["quotas"] = [ { "categories": [category], @@ -886,7 +951,11 @@ def summarize_outcomes(): # We have one nested span and the transaction itself becomes a span spans = spans_consumer.get_spans(n=2, timeout=10) assert len(spans) == 2 - assert summarize_outcomes() == {(16, 0): 2} # SpanIndexed, Accepted + assert summarize_outcomes() == { + (16, 0): 2, + (2, 0): 1, + (12, 0): 2, + } # SpanIndexed, Accepted assert span_usage_metric() == 2 # Second batch nothing passes @@ -904,7 +973,9 @@ def summarize_outcomes(): assert span_usage_metric() == 0 elif category == "transaction_indexed": assert summarize_outcomes() == { + (2, 0): 1, (9, 2): 1, # TransactionIndexed, Rate Limited + (12, 0): 2, (16, 2): 2, # SpanIndexed, Rate Limited } assert span_usage_metric() == 2 @@ -933,7 +1004,9 @@ def summarize_outcomes(): # We do not check indexed limits on the fast path, # so we count the correct number of spans (ignoring the span_count header): assert summarize_outcomes() == { + (2, 0): 1, (9, 2): 1, # TransactionIndexed, Rate Limited + (12, 0): 2, (16, 2): 2, # SpanIndexed, Rate Limited } # Metrics are always correct: @@ -1015,10 +1088,18 @@ def test_dynamic_sampling( outcomes_consumer = outcomes_consumer() project_id = 42 - mini_sentry.add_basic_project_config(project_id) + config = mini_sentry.add_basic_project_config(project_id) + config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) sampling_config = mini_sentry.add_basic_project_config(43) + sampling_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) + sampling_public_key = sampling_config["publicKeys"][0]["publicKey"] + sampling_config["config"]["txNameRules"] = [ { "pattern": "/auth/login/*/**", @@ -1083,14 +1164,20 @@ def summarize_outcomes(outcomes): if sample_rate == 1.0: spans = spans_consumer.get_spans(timeout=10, n=3) assert len(spans) == 3 - outcomes = outcomes_consumer.get_outcomes(timeout=10, n=3) - assert summarize_outcomes(outcomes) == {(16, 0): 3} # SpanIndexed, Accepted + outcomes = outcomes_consumer.get_outcomes(timeout=10, n=6) + assert summarize_outcomes(outcomes) == { + (16, 0): 3, + (12, 0): 3, + (2, 0): 1, + } # SpanIndexed, Accepted else: - outcomes = outcomes_consumer.get_outcomes(timeout=10, n=1) + outcomes = outcomes_consumer.get_outcomes(timeout=10, n=4) assert summarize_outcomes(outcomes) == { (16, 1): 3, # SpanIndexed, Filtered + (12, 0): 3, + (2, 0): 1, } - assert {o["reason"] for o in outcomes} == { + assert {o["reason"] for o in outcomes if o["outcome"] != 0} == { "Sampled:3000", } diff --git a/tests/integration/test_spans_standalone.py b/tests/integration/test_spans_standalone.py index 142943d47f5..f841e7c36fe 100644 --- a/tests/integration/test_spans_standalone.py +++ b/tests/integration/test_spans_standalone.py @@ -150,8 +150,11 @@ def test_lcp_span( project_config["config"]["performanceScore"] = { "profiles": performance_score_profiles } + project_config["config"].setdefault( + "features", ["organizations:relay-generate-billing-outcome"] + ) if mode == "v2": - project_config["config"].setdefault("features", []).append( + project_config["config"]["features"].append( "projects:span-v2-experimental-processing" ) @@ -329,7 +332,7 @@ def test_lcp_span( "type": "c", "value": 1.0, "timestamp": time_within_delta(ts), - "tags": {"is_segment": "false"}, + "tags": {"is_segment": "false", "billing_outcome_accepted": "true"}, "retention_days": 90, "received_at": time_within(ts, precision="s"), }, @@ -353,8 +356,12 @@ def test_cls_span( project_config["config"]["performanceScore"] = { "profiles": performance_score_profiles } + project_config["config"].setdefault("features", []).append( + "organizations:relay-generate-billing-outcome" + ) + if mode == "v2": - project_config["config"].setdefault("features", []).append( + project_config["config"]["features"].append( "projects:span-v2-experimental-processing" ) @@ -538,7 +545,7 @@ def test_cls_span( "type": "c", "value": 1.0, "timestamp": time_within_delta(ts), - "tags": {"is_segment": "false"}, + "tags": {"is_segment": "false", "billing_outcome_accepted": "true"}, "retention_days": 90, "received_at": time_within(ts, precision="s"), }, @@ -562,6 +569,9 @@ def test_inp_span( project_config["config"]["performanceScore"] = { "profiles": performance_score_profiles } + project_config["config"].setdefault("features", []).append( + "organizations:relay-generate-billing-outcome" + ) if mode == "v2": project_config["config"].setdefault("features", []).append( "projects:span-v2-experimental-processing" @@ -710,7 +720,7 @@ def test_inp_span( "type": "c", "value": 1.0, "timestamp": time_within_delta(ts), - "tags": {"is_segment": "false"}, + "tags": {"is_segment": "false", "billing_outcome_accepted": "true"}, "retention_days": 90, "received_at": time_within(ts, precision="s"), }, diff --git a/tests/integration/test_spansv2.py b/tests/integration/test_spansv2.py index 367324929d1..35370c9de8d 100644 --- a/tests/integration/test_spansv2.py +++ b/tests/integration/test_spansv2.py @@ -65,6 +65,9 @@ def test_spansv2_basic( project_config["config"].update( {"retentions": {"span": {"standard": 42, "downsampled": 1337}}} ) + project_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) relay = relay(relay_with_processing(options=TEST_CONFIG), options=TEST_CONFIG) @@ -200,6 +203,7 @@ def test_spansv2_basic( "tags": { "was_transaction": "false", "is_segment": "true", + "billing_outcome_accepted": "true", }, "timestamp": time_within_delta(), "type": "c", @@ -207,8 +211,30 @@ def test_spansv2_basic( }, ] + num_messages = 2 + if relay_emits_accepted_outcome: + num_messages = 3 + + outcomes = outcomes_consumer.get_aggregated_outcomes(n=num_messages) + if relay_emits_accepted_outcome: - assert outcomes_consumer.get_aggregated_outcomes() == [ + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, @@ -216,7 +242,27 @@ def test_spansv2_basic( "outcome": 0, "project_id": 42, "quantity": 1, - } + }, + ] + + else: + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, ] @@ -236,6 +282,9 @@ def test_spansv2_trimming_basic( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).append( + "organizations:relay-generate-billing-outcome" + ) project_config["config"].update( { "retentions": {"span": {"standard": 42, "downsampled": 1337}}, @@ -409,6 +458,7 @@ def test_spansv2_trimming_basic( "tags": { "was_transaction": "false", "is_segment": "true", + "billing_outcome_accepted": "true", }, "timestamp": time_within_delta(), "type": "c", @@ -645,10 +695,16 @@ def test_spansv2_ds_sampled( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) add_sampling_config(project_config, sample_rate=0.0, rule_type="trace") sampling_project_id = 43 sampling_config = mini_sentry.add_basic_project_config(sampling_project_id) + sampling_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) sampling_config["organizationId"] = project_config["organizationId"] add_sampling_config(sampling_config, sample_rate=0.9, rule_type="trace") @@ -734,6 +790,7 @@ def test_spansv2_ds_sampled( "retention_days": 90, "tags": { "is_segment": "false", + "billing_outcome_accepted": "true", }, "timestamp": time_within_delta(), "type": "c", @@ -748,6 +805,7 @@ def test_spansv2_ds_sampled( "tags": { "was_transaction": "false", "is_segment": "true", + "billing_outcome_accepted": "true", }, "timestamp": time_within_delta(), "type": "c", @@ -755,7 +813,25 @@ def test_spansv2_ds_sampled( }, ] - assert outcomes_consumer.get_aggregated_outcomes(n=2) == [ + outcomes = outcomes_consumer.get_aggregated_outcomes(n=5) + + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 2, + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, @@ -763,7 +839,7 @@ def test_spansv2_ds_sampled( "outcome": 0, "project_id": 42, "quantity": 2, - } + }, ] @@ -786,10 +862,18 @@ def test_spansv2_ds_root_in_different_org( project_id = 42 project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) + add_sampling_config(project_config, sample_rate=0.0, rule_type="trace") sampling_project_id = 43 sampling_config = mini_sentry.add_basic_project_config(sampling_project_id) + sampling_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) + sampling_config["organizationId"] = 99 add_sampling_config(sampling_config, sample_rate=1.0, rule_type="trace") @@ -838,6 +922,7 @@ def test_spansv2_ds_root_in_different_org( "retention_days": 90, "tags": { "is_segment": "false", + "billing_outcome_accepted": "true", }, "timestamp": time_within_delta(), "type": "c", @@ -845,16 +930,27 @@ def test_spansv2_ds_root_in_different_org( }, ] - assert outcomes_consumer.get_outcome() == { - "category": DataCategory.SPAN_INDEXED.value, - "key_id": 123, - "org_id": 1, - "outcome": 1, - "project_id": 42, - "quantity": 1, - "reason": "Sampled:0", - "timestamp": time_within_delta(), - } + assert outcomes_consumer.get_outcomes(n=2) == [ + { + "category": DataCategory.SPAN_INDEXED.value, + "key_id": 123, + "org_id": 1, + "outcome": 1, + "project_id": 42, + "quantity": 1, + "reason": "Sampled:0", + "timestamp": time_within_delta(), + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + "timestamp": time_within_delta(), + }, + ] spans_consumer.assert_empty() diff --git a/tests/integration/test_spansv2_otel.py b/tests/integration/test_spansv2_otel.py index 4d297b13939..4613dc5fc2a 100644 --- a/tests/integration/test_spansv2_otel.py +++ b/tests/integration/test_spansv2_otel.py @@ -48,7 +48,10 @@ def test_span_ingestion( relay = relay(relay_with_processing()) project_id = 42 - mini_sentry.add_full_project_config(project_id) + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"].setdefault("features", []).extend( + ["organizations:relay-generate-billing-outcome"] + ) ts = datetime.now(timezone.utc) @@ -170,15 +173,40 @@ def test_span_ingestion( "project_id": 42, "received_at": time_within_delta(), "retention_days": 90, - "tags": {"is_segment": "true", "was_transaction": "false"}, + "tags": { + "is_segment": "true", + "was_transaction": "false", + "billing_outcome_accepted": "true", + }, "timestamp": time_within_delta(), "type": "c", "value": 1.0, }, ] + num_messages = 2 + if relay_emits_accepted_outcome: + num_messages = 3 + outcomes = outcomes_consumer.get_aggregated_outcomes(n=num_messages) + if relay_emits_accepted_outcome: - assert outcomes_consumer.get_aggregated_outcomes() == [ + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, { "category": DataCategory.SPAN_INDEXED.value, "key_id": 123, @@ -186,5 +214,25 @@ def test_span_ingestion( "outcome": 0, "project_id": 42, "quantity": 1, - } + }, + ] + + else: + assert outcomes == [ + { + "category": DataCategory.TRANSACTION.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, + { + "category": DataCategory.SPAN.value, + "key_id": 123, + "org_id": 1, + "outcome": 0, + "project_id": 42, + "quantity": 1, + }, ]