diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs index 18f4eccd5f..ff60ac5aaf 100644 --- a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs @@ -8,6 +8,11 @@ // even if the process is interrupted mid-loop, but may also result in chat history that is not // yet finalized (e.g., tool calls without results) being persisted, which may be undesirable in some cases. // +// Additionally, this sample demonstrates the MessageInjectingChatClient feature, which allows tool +// code to inject new user messages during the function execution loop. When a tool or anything else enqueues +// a message via MessageInjectingChatClient.EnqueueMessages during the tool execution loop, the PerServiceCallChatHistoryPersistingChatClient +// detects the pending message before the next service call and includes the injected message in the request. +// // To use end-of-run persistence instead (atomic run semantics), remove the // RequirePerServiceCallChatHistoryPersistence = true setting (or set it to false). End-of-run // persistence is the default behavior. @@ -54,6 +59,37 @@ static string GetTime([Description("The city name.")] string city) => _ => $"{city}: time data not available." }; +// This tool demonstrates message injection during the function execution loop. +// When called, it checks travel advisories for a city. If an advisory is active, it uses +// the ambient run context to resolve MessageInjectingChatClient and injects a follow-up user message +// asking for alternative destinations. The model will process this injected message on the next +// service call — even though the parent FunctionInvokingChatClient loop would otherwise stop. +[Description("Check current travel advisories for a city.")] +static string CheckTravelAdvisory([Description("The city name.")] string city) +{ + // Simulated travel advisory data. + var advisory = city.ToUpperInvariant() switch + { + "LONDON" => "Travel advisory: Severe fog warnings in London. Flights may be delayed or cancelled.", + "SEATTLE" => "Travel advisory: Heavy rainfall expected. Flooding possible in low-lying areas.", + _ => null + }; + + if (advisory is null) + { + return $"{city}: No active travel advisories."; + } + + // When an advisory is found, inject a follow-up question so the model automatically + // suggests alternatives without the user needing to ask. + var runContext = AIAgent.CurrentRunContext!; + runContext.Agent.GetService()?.EnqueueMessages( + runContext.Session!, + [new ChatMessage(ChatRole.User, $"Given the travel advisory for {city}, what alternative cities would you recommend instead?")]); + + return advisory; +} + // Create the agent — per-service-call persistence is enabled via RequirePerServiceCallChatHistoryPersistence. // The in-memory ChatHistoryProvider is used by default when the service does not require service stored chat // history, so for those cases, we can inspect the chat history via session.TryGetInMemoryChatHistory(). @@ -65,10 +101,11 @@ static string GetTime([Description("The city name.")] string city) => { Name = "WeatherAssistant", RequirePerServiceCallChatHistoryPersistence = true, + EnableMessageInjection = true, ChatOptions = new() { - Instructions = "You are a helpful assistant. When asked about multiple cities, call the appropriate tool for each city.", - Tools = [AIFunctionFactory.Create(GetWeather), AIFunctionFactory.Create(GetTime)] + Instructions = "You are a helpful travel assistant. When asked about cities, call the appropriate tools for each city.", + Tools = [AIFunctionFactory.Create(GetWeather), AIFunctionFactory.Create(GetTime), AIFunctionFactory.Create(CheckTravelAdvisory)] }, }); @@ -109,6 +146,18 @@ async Task RunNonStreamingAsync() response = await agent.RunAsync(FollowUp2, session); PrintAgentResponse(response.Text); PrintChatHistory(session, "After third run", ref lastChatHistorySize, ref lastConversationId); + + // Fourth turn — demonstrates message injection during the function loop. + // The CheckTravelAdvisory tool detects an advisory for London and injects a follow-up + // user message asking for alternative cities. After the tool completes, the internal loop + // in PerServiceCallChatHistoryPersistingChatClient detects the pending injected message + // and calls the service again, so the model answers the follow-up automatically. + const string TravelPrompt = "I'm planning to travel to London next week. Check if there are any travel advisories."; + PrintUserMessage(TravelPrompt); + + response = await agent.RunAsync(TravelPrompt, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After travel advisory run", ref lastChatHistorySize, ref lastConversationId); } async Task RunStreamingAsync() @@ -181,6 +230,30 @@ async Task RunStreamingAsync() Console.WriteLine(); PrintChatHistory(session, "After third run", ref lastChatHistorySize, ref lastConversationId); + + // Fourth turn — demonstrates message injection during the function loop (streaming). + // The CheckTravelAdvisory tool detects an advisory for London and injects a follow-up + // user message asking for alternative cities. After the tool completes, the internal loop + // in PerServiceCallChatHistoryPersistingChatClient detects the pending injected message + // and calls the service again, so the model answers the follow-up automatically. + const string TravelPrompt = "I'm planning to travel to London next week. Check if there are any travel advisories."; + PrintUserMessage(TravelPrompt); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(TravelPrompt, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During travel advisory run", ref lastChatHistorySize, ref lastConversationId); + } + + Console.WriteLine(); + PrintChatHistory(session, "After travel advisory run", ref lastChatHistorySize, ref lastConversationId); } void PrintUserMessage(string message) diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs index e7340cec19..fad6b4e316 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs @@ -151,6 +151,36 @@ public sealed class ChatClientAgentOptions [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] public bool RequirePerServiceCallChatHistoryPersistence { get; set; } + /// + /// Gets or sets a value indicating whether to include a + /// in the chat client pipeline. + /// + /// + /// + /// When set to , a is added to the pipeline + /// between the and the inner client. This enables external code + /// (such as tool delegates) to inject messages into the function execution loop via the + /// class, which can be resolved from the chat client using + /// GetService<MessageInjectingChatClient>(). + /// + /// + /// This setting can be used independently of , + /// however it is recommended to also enable per-service-call persistence when using message injection + /// so that injected messages are persisted to chat history between service calls. + /// + /// + /// When setting the setting to and + /// to , ensure that your custom chat client stack + /// includes a . You can add one manually via the + /// extension method. + /// + /// + /// + /// Default is . + /// + [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] + public bool EnableMessageInjection { get; set; } + /// /// Creates a new instance of with the same values as this instance. /// @@ -168,5 +198,6 @@ public ChatClientAgentOptions Clone() WarnOnChatHistoryProviderConflict = this.WarnOnChatHistoryProviderConflict, ThrowOnChatHistoryProviderConflict = this.ThrowOnChatHistoryProviderConflict, RequirePerServiceCallChatHistoryPersistence = this.RequirePerServiceCallChatHistoryPersistence, + EnableMessageInjection = this.EnableMessageInjection, }; } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs index 5cf87aa950..22027a44a6 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientBuilderExtensions.cs @@ -114,4 +114,38 @@ public static ChatClientBuilder UsePerServiceCallChatHistoryPersistence(this Cha { return builder.Use(innerClient => new PerServiceCallChatHistoryPersistingChatClient(innerClient)); } + + /// + /// Adds a to the chat client pipeline. + /// + /// + /// + /// This decorator enables external code (such as tool delegates) to inject messages into the function + /// execution loop. It should be positioned between the and + /// the (or the leaf ) + /// in the pipeline. + /// + /// + /// The can be retrieved from the chat client via + /// GetService<MessageInjectingChatClient> to enqueue messages from tool delegates or other code. + /// + /// + /// This extension method is intended for use with custom chat client stacks when + /// is . + /// When is (the default), + /// the automatically includes this decorator in the pipeline when + /// is . + /// + /// + /// This decorator only works within the context of a running and will throw an + /// exception if used in any other stack. + /// + /// + /// The to add the decorator to. + /// The for chaining. + [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] + public static ChatClientBuilder UseMessageInjection(this ChatClientBuilder builder) + { + return builder.Use(innerClient => new MessageInjectingChatClient(innerClient)); + } } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs index b2c48ec572..5859e98032 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs @@ -63,13 +63,21 @@ internal static IChatClient WithDefaultAgentMiddleware(this IChatClient chatClie }); } - // PerServiceCallChatHistoryPersistingChatClient is only injected when RequirePerServiceCallChatHistoryPersistence is enabled. - // It is registered after FunctionInvokingChatClient so that it sits between FIC and the leaf client. + // MessageInjectingChatClient is injected when EnableMessageInjection is enabled. + // It is registered after FunctionInvokingChatClient so that it sits between FIC and the inner client. // ChatClientBuilder.Build applies factories in reverse order, making the first Use() call outermost. - // By adding our decorator second, the resulting pipeline is: - // FunctionInvokingChatClient → PerServiceCallChatHistoryPersistingChatClient → leaf IChatClient - // This allows the decorator to simulate service-stored chat history by loading history before - // each service call, persisting after each call, and returning a sentinel ConversationId. + // MessageInjectingChatClient enables injecting messages during the function loop and looping when needed. + if (options?.EnableMessageInjection is true) + { + chatBuilder.Use(innerClient => new MessageInjectingChatClient(innerClient)); + } + + // PerServiceCallChatHistoryPersistingChatClient is injected when RequirePerServiceCallChatHistoryPersistence is enabled. + // It is registered after MessageInjectingChatClient (if present) so it sits closest to the leaf client. + // The resulting pipeline is: + // FunctionInvokingChatClient → [MessageInjectingChatClient] → [PerServiceCallChatHistoryPersistingChatClient] → leaf IChatClient + // PerServiceCallChatHistoryPersistingChatClient simulates service-stored chat history by loading history + // before each service call, persisting after each call, and returning a sentinel ConversationId. if (options?.RequirePerServiceCallChatHistoryPersistence is true) { chatBuilder.Use(innerClient => new PerServiceCallChatHistoryPersistingChatClient(innerClient)); diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/MessageInjectingChatClient.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/MessageInjectingChatClient.cs new file mode 100644 index 0000000000..5d43b55f3e --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/MessageInjectingChatClient.cs @@ -0,0 +1,320 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; +using Microsoft.Shared.DiagnosticIds; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI; + +/// +/// A delegating chat client that supports injecting messages into the function execution loop. +/// +/// +/// +/// This decorator enables external code (such as tool delegates) to enqueue messages that will be +/// sent to the underlying model at the next opportunity. It sits between the +/// and the (or the leaf ) +/// in a pipeline. +/// +/// +/// The injected messages queue is stored per-session in the , ensuring +/// isolation between concurrent sessions. +/// +/// +/// After each service call, if no actionable is returned but injected +/// messages are pending, the decorator loops internally and calls the inner client again with the new +/// messages. When actionable function calls are present, control returns to the parent +/// loop. +/// +/// +/// This chat client must be used within the context of a running . It retrieves the +/// current session from , which is set automatically when an agent's +/// or +/// +/// method is called. +/// +/// +[Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] +public sealed class MessageInjectingChatClient : DelegatingChatClient +{ + /// + /// The key used to store the pending injected messages queue in the session's . + /// + internal const string PendingMessagesStateKey = "MessageInjectingChatClient.PendingInjectedMessages"; + + /// + /// Initializes a new instance of the class. + /// + /// The underlying chat client that will handle the core operations. + public MessageInjectingChatClient(IChatClient innerClient) + : base(innerClient) + { + } + + /// + public override async Task GetResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + CancellationToken cancellationToken = default) + { + var session = GetRequiredSession(); + var queue = GetOrCreateQueue(session); + + var newMessages = DrainInjectedMessages(queue, messages as IList ?? messages.ToList()); + + // Loop to process injected messages: after each service call, if no actionable function calls + // are pending but new messages have been injected into the queue, we call the service again + // so the model can process them. The loop exits when the response contains actionable + // function calls (handed off to the parent FunctionInvokingChatClient) or the queue is empty. + while (true) + { + var response = await base.GetResponseAsync(newMessages, options, cancellationToken).ConfigureAwait(false); + + // If the response contains actionable function calls, the parent FunctionInvokingChatClient + // loop will iterate — return immediately so it can process them. + if (HasActionableFunctionCalls(response.Messages)) + { + return response; + } + + // No actionable function calls. If there are pending injected messages, loop again + // to send them to the service. Otherwise, we're done. + bool queueEmpty; + lock (queue) + { + queueEmpty = queue.Count == 0; + } + + if (queueEmpty) + { + return response; + } + + // Propagate any ConversationId returned by the service so subsequent iterations + // continue within the same conversation. + UpdateOptionsForNextIteration(ref options, response.ConversationId); + + newMessages = DrainInjectedMessages(queue, Array.Empty()); + } + } + + /// + public override async IAsyncEnumerable GetStreamingResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var session = GetRequiredSession(); + var queue = GetOrCreateQueue(session); + + var newMessages = DrainInjectedMessages(queue, messages as IList ?? messages.ToList()); + + // Loop to process injected messages: after each service call, if no actionable function calls + // are pending but new messages have been injected into the queue, we call the service again + // so the model can process them. The loop exits when the response contains actionable + // function calls (handed off to the parent FunctionInvokingChatClient) or the queue is empty. + while (true) + { + bool hasActionableFunctionCalls = false; + string? lastConversationId = null; + + var enumerator = base.GetStreamingResponseAsync(newMessages, options, cancellationToken).GetAsyncEnumerator(cancellationToken); + try + { + while (await enumerator.MoveNextAsync().ConfigureAwait(false)) + { + var update = enumerator.Current; + + // Check each update for actionable function call content as it streams through. + if (!hasActionableFunctionCalls && HasActionableFunctionCalls(update)) + { + hasActionableFunctionCalls = true; + } + + // Track the latest ConversationId from the stream. + if (update.ConversationId is not null) + { + lastConversationId = update.ConversationId; + } + + yield return update; + } + } + finally + { + await enumerator.DisposeAsync().ConfigureAwait(false); + } + + // If the response contains actionable function calls, the parent FunctionInvokingChatClient + // loop will iterate — return immediately so it can process them. + if (hasActionableFunctionCalls) + { + yield break; + } + + // No actionable function calls. If there are pending injected messages, loop again + // to send them to the service. Otherwise, we're done. + bool queueEmpty; + lock (queue) + { + queueEmpty = queue.Count == 0; + } + + if (queueEmpty) + { + yield break; + } + + // Propagate any ConversationId returned by the service so subsequent iterations + // continue within the same conversation. + UpdateOptionsForNextIteration(ref options, lastConversationId); + + newMessages = DrainInjectedMessages(queue, Array.Empty()); + } + } + + /// + /// Enqueues one or more messages to be used at the next opportunity. + /// + /// + /// This method is thread-safe and can be called concurrently from tool delegates or other code + /// while the function execution loop is in progress. The enqueued messages will be picked up + /// at the next opportunity. + /// + /// The agent session to enqueue messages for. + /// The messages to enqueue. + public void EnqueueMessages(AgentSession session, IEnumerable messages) + { + Throw.IfNull(session); + Throw.IfNull(messages); + + var queue = GetOrCreateQueue(session); + + lock (queue) + { + foreach (var message in messages) + { + queue.Add(message); + } + } + } + + /// + /// Gets or creates the pending injected messages queue from the session's . + /// + private static List GetOrCreateQueue(AgentSession session) + { + if (session.StateBag.TryGetValue>(PendingMessagesStateKey, out var queue)) + { + return queue!; + } + + var newQueue = new List(); + session.StateBag.SetValue(PendingMessagesStateKey, newQueue); + return newQueue; + } + + /// + /// Gets the current from the run context. + /// + private static AgentSession GetRequiredSession() + { + var runContext = AIAgent.CurrentRunContext + ?? throw new InvalidOperationException( + $"{nameof(MessageInjectingChatClient)} can only be used within the context of a running AIAgent. " + + "Ensure that the chat client is being invoked as part of an AIAgent.RunAsync or AIAgent.RunStreamingAsync call."); + + return runContext.Session + ?? throw new InvalidOperationException( + $"{nameof(MessageInjectingChatClient)} requires a session. " + + "The current run context does not have a session."); + } + + /// + /// Drains all pending injected messages from the queue and returns a new list combining + /// the original messages with the drained messages. The original list is never modified. + /// + private static IList DrainInjectedMessages(List queue, IList newMessages) + { + lock (queue) + { + if (queue.Count == 0) + { + return newMessages; + } + + var combined = new List(newMessages); + combined.AddRange(queue); + queue.Clear(); + return combined; + } + } + + /// + /// Determines whether any message in the list contains a + /// that is not marked as . + /// + private static bool HasActionableFunctionCalls(IList responseMessages) + { + for (int i = 0; i < responseMessages.Count; i++) + { + var contents = responseMessages[i].Contents; + for (int j = 0; j < contents.Count; j++) + { + if (contents[j] is FunctionCallContent fcc && !fcc.InformationalOnly) + { + return true; + } + } + } + + return false; + } + + /// + /// Determines whether a streaming update contains a + /// that is not marked as . + /// + private static bool HasActionableFunctionCalls(ChatResponseUpdate update) + { + var contents = update.Contents; + for (int i = 0; i < contents.Count; i++) + { + if (contents[i] is FunctionCallContent fcc && !fcc.InformationalOnly) + { + return true; + } + } + + return false; + } + + /// + /// Propagates the from the service response into + /// so that subsequent loop iterations continue within the + /// same conversation. Clones before mutating to avoid + /// affecting the caller's instance. + /// + private static void UpdateOptionsForNextIteration(ref ChatOptions? options, string? conversationId) + { + if (options is null) + { + if (conversationId is not null) + { + options = new() { ConversationId = conversationId }; + } + } + else if (options.ConversationId != conversationId) + { + options = options.Clone(); + options.ConversationId = conversationId; + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/MessageInjectingChatClientTests.cs b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/MessageInjectingChatClientTests.cs new file mode 100644 index 0000000000..93f6b02b03 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/MessageInjectingChatClientTests.cs @@ -0,0 +1,493 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using Moq.Protected; + +namespace Microsoft.Agents.AI.UnitTests; + +/// +/// Unit tests for . +/// +public class MessageInjectingChatClientTests +{ + /// + /// Verifies that is resolvable via GetService when the decorator is active. + /// + [Fact] + public void GetService_ReturnsMessageInjectingChatClient_WhenDecoratorActive() + { + // Arrange + Mock mockService = new(); + ChatClientAgent agent = new(mockService.Object, options: new() + { + EnableMessageInjection = true, + }); + + // Act + var injector = agent.ChatClient.GetService(); + + // Assert + Assert.NotNull(injector); + } + + /// + /// Verifies that is null when the decorator is not active. + /// + [Fact] + public void GetService_ReturnsNull_WhenDecoratorNotActive() + { + // Arrange + Mock mockService = new(); + ChatClientAgent agent = new(mockService.Object, options: new()); + + // Act + var injector = agent.ChatClient.GetService(); + + // Assert + Assert.Null(injector); + } + + /// + /// Verifies that messages enqueued on the session before RunAsync are included in the service call messages. + /// + [Fact] + public async Task RunAsync_IncludesInjectedMessages_WhenEnqueuedBeforeCallAsync() + { + // Arrange + List capturedMessages = []; + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Callback((IEnumerable msgs, ChatOptions? _, CancellationToken _) => + capturedMessages.AddRange(msgs)) + .ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + RequirePerServiceCallChatHistoryPersistence = true, + EnableMessageInjection = true, + }); + + // Create session and enqueue a message directly onto the session's StateBag queue before calling RunAsync + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + var queue = new List(); + queue.Add(new ChatMessage(ChatRole.User, "injected message")); + session!.StateBag.SetValue("MessageInjectingChatClient.PendingInjectedMessages", queue); + + // Act + await agent.RunAsync([new(ChatRole.User, "original")], session); + + // Assert — the service should have received both the original and injected messages + Assert.Contains(capturedMessages, m => m.Text == "original"); + Assert.Contains(capturedMessages, m => m.Text == "injected message"); + } + + /// + /// Verifies that the queue is drained after a call (messages are not re-delivered on subsequent calls). + /// + [Fact] + public async Task RunAsync_DrainsQueue_MessagesNotRedeliveredAsync() + { + // Arrange + List capturedMessages = []; + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Callback((IEnumerable msgs, ChatOptions? _, CancellationToken _) => + capturedMessages.AddRange(msgs)) + .ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + RequirePerServiceCallChatHistoryPersistence = true, + EnableMessageInjection = true, + }); + + // Create session and enqueue a message directly onto the session's StateBag queue + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + var queue = new List(); + queue.Add(new ChatMessage(ChatRole.User, "injected once")); + session!.StateBag.SetValue("MessageInjectingChatClient.PendingInjectedMessages", queue); + + // Act + await agent.RunAsync([new(ChatRole.User, "first call")], session); + + // Assert — the injected message was included in the service call + Assert.Contains(capturedMessages, m => m.Text == "injected once"); + + // Assert — the session's queue is now empty (drained) + Assert.Empty(queue); + } + + /// + /// Verifies that the internal loop fires when no actionable FunctionCallContent is returned + /// but there are pending injected messages in the queue. + /// + [Fact] + public async Task RunAsync_LoopsInternally_WhenNoActionableFCCButPendingMessagesAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + MessageInjectingChatClient? injectorRef = null; + ChatClientAgentSession? sessionRef = null; + + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns((IEnumerable msgs, ChatOptions? _, CancellationToken _) => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + // First call — simulate that something enqueues a message (e.g., a provider or background task) + injectorRef!.EnqueueMessages(sessionRef!, [new ChatMessage(ChatRole.User, "injected during first call")]); + } + + // Return a plain text response (no FunctionCallContent) to trigger the internal loop + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, $"response {serviceCallCount}")])); + }); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + RequirePerServiceCallChatHistoryPersistence = true, + EnableMessageInjection = true, + }); + + injectorRef = agent.ChatClient.GetService()!; + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + sessionRef = session; + await agent.RunAsync([new(ChatRole.User, "original")], session); + + // Assert — should have made 2 service calls (internal loop triggered by the injected message) + Assert.Equal(2, serviceCallCount); + } + + /// + /// Verifies that the internal loop does NOT fire when the response contains actionable + /// FunctionCallContent, even if there are pending injected messages. + /// + [Fact] + public async Task RunAsync_DoesNotLoopInternally_WhenActionableFCCPresentAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + MessageInjectingChatClient? injectorRef = null; + ChatClientAgentSession? sessionRef = null; + + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns((IEnumerable msgs, ChatOptions? _, CancellationToken _) => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + // Enqueue a message during the first call + injectorRef!.EnqueueMessages(sessionRef!, [new ChatMessage(ChatRole.User, "injected")]); + // Return a response with an actionable FunctionCallContent + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, + [new FunctionCallContent("call1", "myTool", new Dictionary())])])); + } + + // Subsequent calls return plain text (the FCC loop will call back after tool execution) + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final")])); + }); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + RequirePerServiceCallChatHistoryPersistence = true, + EnableMessageInjection = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + injectorRef = agent.ChatClient.GetService()!; + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + sessionRef = session; + await agent.RunAsync([new(ChatRole.User, "original")], session); + + // Assert — The first service call returned actionable FCC, so no internal injected-message loop + // occurred there. The FCC loop invokes the tool and calls the service again (second call). + // The injected message should be picked up by the second service call (drained at start of + // GetResponseAsync), but no extra internal loop should fire. Exactly 2 service calls expected. + Assert.Equal(2, serviceCallCount); + } + + /// + /// Verifies that the internal loop fires when the response contains only InformationalOnly + /// FunctionCallContent (which are not actionable) and there are pending injected messages. + /// + [Fact] + public async Task RunAsync_LoopsInternally_WhenOnlyInformationalOnlyFCCAndPendingMessagesAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + MessageInjectingChatClient? injectorRef = null; + ChatClientAgentSession? sessionRef = null; + + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns((IEnumerable msgs, ChatOptions? _, CancellationToken _) => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + // Enqueue a message during the first call + injectorRef!.EnqueueMessages(sessionRef!, [new ChatMessage(ChatRole.User, "injected")]); + // Return a response with InformationalOnly FCC (not actionable) + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, + [new FunctionCallContent("call1", "myTool", new Dictionary()) { InformationalOnly = true }])])); + } + + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final")])); + }); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + RequirePerServiceCallChatHistoryPersistence = true, + EnableMessageInjection = true, + }); + + injectorRef = agent.ChatClient.GetService()!; + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + sessionRef = session; + await agent.RunAsync([new(ChatRole.User, "original")], session); + + // Assert — InformationalOnly FCC is NOT actionable, so internal loop should trigger + Assert.Equal(2, serviceCallCount); + } + + /// + /// Verifies that when the inner client returns a ConversationId on the first call, the + /// MessageInjectingChatClient propagates it to options on subsequent loop iterations. + /// + [Fact] + public async Task RunAsync_PropagatesConversationId_AcrossInternalLoopIterationsAsync() + { + // Arrange + int serviceCallCount = 0; + List capturedConversationIds = []; + MessageInjectingChatClient? injectorRef = null; + ChatClientAgentSession? sessionRef = null; + + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns((IEnumerable _, ChatOptions? opts, CancellationToken _) => + { + serviceCallCount++; + capturedConversationIds.Add(opts?.ConversationId); + + if (serviceCallCount == 1) + { + // First call: inject a message and return a ConversationId + injectorRef!.EnqueueMessages(sessionRef!, [new ChatMessage(ChatRole.User, "injected")]); + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "first response")]) + { + ConversationId = "conv-123", + }); + } + + // Second call (from loop): should have the propagated ConversationId + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "second response")])); + }); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + EnableMessageInjection = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + injectorRef = agent.ChatClient.GetService()!; + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + sessionRef = session; + await agent.RunAsync([new(ChatRole.User, "hello")], session); + + // Assert — The second call should have received the ConversationId propagated from the first response + Assert.Equal(2, serviceCallCount); + Assert.Null(capturedConversationIds[0]); // First call: no ConversationId yet + Assert.Equal("conv-123", capturedConversationIds[1]); // Second call: propagated from first response + } + + /// + /// Verifies that a session with pending injected messages can be serialized and deserialized, + /// and that the deserialized session correctly delivers the injected messages on the next run. + /// + [Fact] + public async Task RunAsync_DeliversInjectedMessages_AfterSessionSerializationRoundTripAsync() + { + // Arrange + List capturedMessagesFirstRun = []; + List capturedMessagesSecondRun = []; + int runCount = 0; + Mock mockService = new(); + MessageInjectingChatClient? injectorRef = null; + ChatClientAgentSession? sessionRef = null; + + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns((IEnumerable msgs, ChatOptions? _, CancellationToken _) => + { + if (runCount == 1) + { + capturedMessagesFirstRun.AddRange(msgs); + + // Inject a message during the first run — this will remain pending (not drained) + // because we return an actionable FCC that causes the parent loop to take over. + injectorRef!.EnqueueMessages(sessionRef!, [new ChatMessage(ChatRole.User, "injected before serialization")]); + + // Return actionable FCC so the injection loop does NOT drain the message + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, + [new FunctionCallContent("call1", "myTool", new Dictionary())])])); + } + + // Second run (after deserialization) — capture what messages come through + capturedMessagesSecondRun.AddRange(msgs); + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final response")])); + }); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + RequirePerServiceCallChatHistoryPersistence = true, + EnableMessageInjection = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + injectorRef = agent.ChatClient.GetService()!; + + // Act — First run: inject a message that stays pending + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + sessionRef = session; + runCount = 1; + await agent.RunAsync([new(ChatRole.User, "first run message")], session); + + // Serialize the session and deserialize into a new instance + var serialized = await agent.SerializeSessionAsync(session!); + var deserializedSession = await agent.DeserializeSessionAsync(serialized) as ChatClientAgentSession; + + // Second run on the deserialized session — the injected message should be delivered + runCount = 2; + sessionRef = deserializedSession; + await agent.RunAsync([new(ChatRole.User, "second run message")], deserializedSession); + + // Assert — the second run should include the injected message from before serialization + Assert.Contains(capturedMessagesSecondRun, m => m.Text == "injected before serialization"); + Assert.Contains(capturedMessagesSecondRun, m => m.Text == "second run message"); + } +}