Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ pub enum Feature {
/// Stream minidumps to objectstore.
#[serde(rename = "projects:relay-minidump-uploads")]
MinidumpUploads,
/// Enable relay billing outcome generation.
#[serde(rename = "organizations:relay-generate-billing-outcome")]
GenerateBillingOutcome,

/// Enables OTLP spans to use the Span V2 processing pipeline in Relay.
///
Expand Down
48 changes: 46 additions & 2 deletions relay-server/src/metrics/outcomes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ impl MetricOutcomes {
pub fn track(&self, scoping: Scoping, buckets: &[impl TrackableBucket], outcome: Outcome) {
let timestamp = Utc::now();

// Never emit accepted outcomes for surrogate metrics.
// These are handled from within Sentry.
// Accepted outcomes go through `track_accepted_outcome`, which does
// additional work to prevent billing double-counting.
if !matches!(outcome, Outcome::Accepted) {
let SourceQuantities {
transactions,
Expand Down Expand Up @@ -58,6 +58,50 @@ impl MetricOutcomes {
}
}
}

/// Emits accepted outcomes, for the provided list of buckets.
///
/// Additionally, adds a marker tag `billing_outcome_accepted` to all buckets for which an
/// outcome has been emitted.
pub fn track_accepted_outcome(&self, scoping: Scoping, buckets: &mut [Bucket]) {
let timestamp = Utc::now();
for bucket in buckets {
let summary = bucket.summary();
match summary {
BucketSummary::Spans {
count,
is_segment,
was_transaction: _,
} => {
if count == 0 {
continue;
}

let categories = match is_segment {
true => [DataCategory::Span, DataCategory::Transaction].as_slice(),
false => [DataCategory::Span].as_slice(),
};
Comment on lines +80 to +83

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The track_accepted_outcome function incorrectly generates a transaction outcome for any segment span by only checking is_segment and ignoring was_transaction, leading to billing inaccuracies.
Severity: HIGH

Suggested Fix

Modify the track_accepted_outcome function to align with the logic in extract_quantities. The function should check for both is_segment and was_transaction being true before emitting a DataCategory::Transaction outcome. This ensures that only segments originating from transactions are counted as such for billing.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: relay-server/src/metrics/outcomes.rs#L80-L83

Potential issue: The `track_accepted_outcome` function has logic that is inconsistent
with `extract_quantities`. It emits a `DataCategory::Transaction` outcome for any span
where `is_segment` is true, but it ignores the `was_transaction` flag. The
`extract_quantities` function, however, requires both `is_segment` and `was_transaction`
to be true to count a transaction. This discrepancy will cause over-billing, as segment
spans that did not originate from a transaction (e.g., from raw span ingestion) will be
incorrectly billed as transactions.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No bug here I think.


bucket
.tags
.insert("billing_outcome_accepted".to_owned(), "true".to_owned());

for category in categories {
self.outcomes.send(TrackOutcome {
timestamp,
scoping,
outcome: Outcome::Accepted,
event_id: None,
remote_addr: None,
category: *category,
quantity: count as u32,
});
}
}
BucketSummary::None => continue,
};
}
}
}

/// The return value of [`TrackableBucket::summary`].
Expand Down
15 changes: 14 additions & 1 deletion relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
use relay_common::time::UnixTimestamp;
use relay_config::{Config, HttpEncoding, UpstreamDescriptor};
use relay_dynamic_config::Feature;
use relay_event_normalization::{ClockDriftProcessor, GeoIpLookup};
use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::ClientReport;
Expand Down Expand Up @@ -1171,6 +1172,7 @@ impl EnvelopeProcessorService {
///
/// This function runs the following steps:
/// - rate limiting
/// - emit billing outcomes
/// - submit to `StoreForwarder`
#[cfg(feature = "processing")]
async fn encode_metrics_processing(
Expand All @@ -1188,14 +1190,25 @@ impl EnvelopeProcessorService {
..
} in message.buckets.into_values()
{
let buckets = self
let mut buckets = self
.rate_limit_buckets(scoping, &project_info, buckets)
.await;

if buckets.is_empty() {
continue;
}

if project_info
.config
.features
.has(Feature::GenerateBillingOutcome)
{
// Emit metric billing outcomes.
self.inner
.metric_outcomes
.track_accepted_outcome(scoping, &mut buckets);
}

let retention = project_info
.config
.event_retention
Expand Down
50 changes: 46 additions & 4 deletions tests/integration/test_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ def test_ai_spans_example_transaction(
outcomes_consumer = outcomes_consumer()

project_id = 42
mini_sentry.add_full_project_config(project_id)

project = mini_sentry.add_full_project_config(project_id)
project["config"].setdefault("features", []).extend(
["organizations:relay-generate-billing-outcome"]
)
mini_sentry.global_config["aiModelMetadata"] = {
"version": 1,
"models": {
Expand Down Expand Up @@ -1296,14 +1298,54 @@ def test_ai_spans_example_transaction(
},
]

num_messages = 3
if relay_emits_accepted_outcome:
num_messages = 13
outcomes = outcomes_consumer.get_aggregated_outcomes(n=num_messages)

if relay_emits_accepted_outcome:
assert outcomes_consumer.get_aggregated_outcomes() == [
assert outcomes == [
{
"category": DataCategory.TRANSACTION.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 1,
},
{
"category": DataCategory.SPAN.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 10,
},
{
"category": DataCategory.SPAN_INDEXED.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 10,
}
},
]
else:
assert outcomes == [
{
"category": DataCategory.TRANSACTION.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 1,
},
{
"category": DataCategory.SPAN.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 10,
},
]
9 changes: 6 additions & 3 deletions tests/integration/test_attachment_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,11 @@ def test_attachment_ref_validation(
event_id = "515539018c9b4260a6f999572f1661ee"
project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"].setdefault("features", []).append(
"projects:relay-upload-endpoint"
project_config["config"].setdefault("features", []).extend(
[
"projects:relay-upload-endpoint",
"organizations:relay-generate-billing-outcome",
]
)

relay = relay_with_processing()
Expand All @@ -275,7 +278,7 @@ def test_attachment_ref_validation(

relay.send_envelope(project_id, envelope)

outcomes = outcomes_consumer.get_outcomes(n=3 if event_type == "transaction" else 2)
outcomes = outcomes_consumer.get_outcomes(n=5 if event_type == "transaction" else 2)
o = {DataCategory(o["category"]): o for o in outcomes}
assert o[DataCategory.ATTACHMENT]["reason"] == "invalid_placeholder_attachment"
assert o[DataCategory.ATTACHMENT]["quantity"] == expected_bytes_quantity
Expand Down
19 changes: 18 additions & 1 deletion tests/integration/test_attachmentsv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def test_attachment_with_matching_span_store(
project_config["config"]["features"] = [
"projects:span-v2-experimental-processing",
"projects:span-v2-attachment-processing",
"organizations:relay-generate-billing-outcome",
]
relay = relay_with_processing()

Expand Down Expand Up @@ -424,8 +425,16 @@ def test_attachment_with_matching_span_store(
objectstore = objectstore(usecase="trace_attachments", project_id=project_id)
assert objectstore.get(metadata["attachment_id"]).payload.read() == body

outcomes = outcomes_consumer.get_aggregated_outcomes(n=3)
outcomes = outcomes_consumer.get_aggregated_outcomes(n=5)
assert outcomes == [
{
"category": DataCategory.TRANSACTION.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 1,
},
{
"category": DataCategory.ATTACHMENT.value,
"key_id": 123,
Expand All @@ -434,6 +443,14 @@ def test_attachment_with_matching_span_store(
"project_id": 42,
"quantity": 23,
},
{
"category": DataCategory.SPAN.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 1,
},
{
"category": DataCategory.SPAN_INDEXED.value,
"key_id": 123,
Expand Down
1 change: 1 addition & 0 deletions tests/integration/test_otlp_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def test_otlp_logs_multiple_records(
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"]["features"] = [
"organizations:ourlogs-ingestion",
"organizations:relay-generate-billing-outcome",
]
project_config["config"]["retentions"] = {
"log": {"standard": 30, "downsampled": 13 * 30},
Expand Down
Loading