Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions dotnet/src/Microsoft.Agents.AI.Foundry/ClientHeadersAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,15 @@ protected override Task<AgentResponse> RunCoreAsync(
CancellationToken cancellationToken = default)
{
var snapshot = TrySnapshot(options);
if (snapshot is null)
if (snapshot is not null)
{
return this.InnerAgent.RunAsync(messages, session, options, cancellationToken);
// AsyncLocal mutations made inside an awaited async method do not leak back to the
// caller after the method returns, so we do not need an explicit restore step here.
// See ClientHeadersScope remarks.
ClientHeadersScope.Current = snapshot;
}

return RunAsyncCoreAsync(messages, session, options, snapshot, cancellationToken);

async Task<AgentResponse> RunAsyncCoreAsync(
IEnumerable<ChatMessage> innerMessages,
AgentSession? innerSession,
AgentRunOptions? innerOptions,
Dictionary<string, string> innerSnapshot,
CancellationToken innerCt)
{
using var _ = ClientHeadersScope.Push(innerSnapshot);
return await this.InnerAgent.RunAsync(innerMessages, innerSession, innerOptions, innerCt).ConfigureAwait(false);
}
return this.InnerAgent.RunAsync(messages, session, options, cancellationToken);
}

/// <inheritdoc/>
Expand All @@ -69,7 +61,10 @@ protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingA
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var snapshot = TrySnapshot(options);
using var _ = snapshot is null ? default : ClientHeadersScope.Push(snapshot);
if (snapshot is not null)
{
ClientHeadersScope.Current = snapshot;
}

await foreach (var update in this.InnerAgent.RunStreamingAsync(messages, session, options, cancellationToken).ConfigureAwait(false))
{
Expand Down
44 changes: 18 additions & 26 deletions dotnet/src/Microsoft.Agents.AI.Foundry/ClientHeadersScope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,31 @@ namespace Microsoft.Agents.AI.Foundry;
/// <see cref="ClientHeadersPolicy"/> running inside the SCM transport pipeline.
/// </summary>
/// <remarks>
/// AsyncLocal flows the value into downstream awaits but does not roll the value back when the
/// setting method returns. This type pairs each <see cref="Push(IReadOnlyDictionary{string, string}?)"/>
/// with a disposable that explicitly restores the prior value, giving stack-style LIFO semantics
/// for nested or sequential per-call scopes on the same async flow.
/// <para>
/// <see cref="AsyncLocal{T}"/> propagates the value forward into every <c>await</c> on the same
/// async flow, but mutations made inside an awaited <c>async</c> method do <em>not</em> leak back
/// to the caller after the method returns. This means a method that assigns
/// <see cref="Current"/> at the top and then awaits inner work does not need any explicit
/// restoration step: the runtime restores the caller's view of the AsyncLocal automatically when
/// the method's task completes.
/// </para>
/// <para>
/// Setting <see cref="Current"/> from synchronous code, however, will leak to the caller because
/// no async-method boundary is crossed. All Agent Framework call sites of this carrier are
/// inside <c>async</c> methods (<see cref="ClientHeadersAgent"/>), so the natural restoration
/// suffices for our needs.
/// </para>
/// </remarks>
internal static class ClientHeadersScope
{
private static readonly AsyncLocal<IReadOnlyDictionary<string, string>?> s_current = new();

/// <summary>Gets the dictionary captured by the most recent <see cref="Push(IReadOnlyDictionary{string, string}?)"/> on this async flow.</summary>
public static IReadOnlyDictionary<string, string>? Current => s_current.Value;

/// <summary>
/// Pushes a new value as the current scope. Disposing the returned token restores the previous value.
/// Gets or sets the per-async-flow client-header snapshot read by <see cref="ClientHeadersPolicy"/>.
/// </summary>
/// <param name="headers">The header dictionary to surface to the policy. May be <see langword="null"/>.</param>
public static Scope Push(IReadOnlyDictionary<string, string>? headers)
{
var previous = s_current.Value;
s_current.Value = headers;
return new Scope(previous);
}

/// <summary>Disposable token that restores the previous scope on <see cref="Dispose"/>.</summary>
internal readonly struct Scope : System.IDisposable
public static IReadOnlyDictionary<string, string>? Current
{
private readonly IReadOnlyDictionary<string, string>? _previous;

internal Scope(IReadOnlyDictionary<string, string>? previous)
{
this._previous = previous;
}

public void Dispose() => s_current.Value = this._previous;
get => s_current.Value;
set => s_current.Value = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,29 +245,33 @@ public async Task ClientHeadersAgent_Streaming_HasScopeAtFirstYieldAsync()
}

// -------------------------------------------------------------------------------------------
// 10. ClientHeadersScope.Push is LIFO and AsyncLocal-isolated (parallel runs don't leak)
// 10. ClientHeadersScope is AsyncLocal-isolated across parallel runs and auto-restores on
// async-method return (no explicit Dispose needed).
// -------------------------------------------------------------------------------------------

[Fact]
public async Task ClientHeadersScope_IsLifoAndAsyncLocalIsolatedAsync()
public async Task ClientHeadersScope_IsAsyncLocalIsolatedAndAutoRestoresAsync()
{
// Arrange
var dictA = new Dictionary<string, string> { ["x-client-end-user-id"] = "alice" };
var dictB = new Dictionary<string, string> { ["x-client-end-user-id"] = "bob" };

// Act / Assert
// Act / Assert: parallel async flows do not see each other's mutations.
await Task.WhenAll(
ProbeAsync(dictA, "alice"),
ProbeAsync(dictB, "bob"));

async Task ProbeAsync(Dictionary<string, string> dict, string expected)
{
using (ClientHeadersScope.Push(dict))
{
await Task.Yield();
Assert.Equal(expected, ClientHeadersScope.Current!["x-client-end-user-id"]);
}
ClientHeadersScope.Current = dict;
await Task.Yield();
Assert.Equal(expected, ClientHeadersScope.Current!["x-client-end-user-id"]);
}

// Assert: setting Current inside an awaited async method does not leak back to the caller
// after the method returns. This is the AsyncLocal natural-restoration behavior the
// ClientHeadersAgent relies on.
Assert.Null(ClientHeadersScope.Current);
}

// -------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -320,16 +324,19 @@ public async Task ClientHeadersPolicy_StampsWithSet_OverwritesPreExistingHeaderA
perTryPolicies: default,
beforeTransportPolicies: default);

var perCall = new Dictionary<string, string> { ["x-client-end-user-id"] = "alice" };

// Act
using (ClientHeadersScope.Push(perCall))
ClientHeadersScope.Current = new Dictionary<string, string> { ["x-client-end-user-id"] = "alice" };
try
{
var msg = pipeline.CreateMessage();
msg.Request.Method = "GET";
msg.Request.Uri = new Uri("https://example.test/");
await pipeline.SendAsync(msg);
}
finally
{
ClientHeadersScope.Current = null;
}

// Assert: the per-call value won.
Assert.Equal("alice", handler.Headers["x-client-end-user-id"]);
Expand Down
Loading