diff --git a/src/Dapr.Workflow.Abstractions/HistoryEventKind.cs b/src/Dapr.Workflow.Abstractions/HistoryEventKind.cs deleted file mode 100644 index ea31ed07f..000000000 --- a/src/Dapr.Workflow.Abstractions/HistoryEventKind.cs +++ /dev/null @@ -1,115 +0,0 @@ -// ------------------------------------------------------------------------ -// Copyright 2026 The Dapr Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ------------------------------------------------------------------------ - -namespace Dapr.Workflow; - -/// -/// Identifies the kind of a workflow history event returned in propagated history. -/// -public enum HistoryEventKind -{ - /// - /// Unknown or unsupported event type. - /// - Unknown = 0, - - /// - /// The workflow execution started. - /// - ExecutionStarted, - - /// - /// The workflow execution completed. - /// - ExecutionCompleted, - - /// - /// The workflow execution was terminated. - /// - ExecutionTerminated, - - /// - /// An activity task was scheduled. - /// - TaskScheduled, - - /// - /// An activity task completed successfully. - /// - TaskCompleted, - - /// - /// An activity task failed. - /// - TaskFailed, - - /// - /// A child workflow instance was created. - /// - SubOrchestrationInstanceCreated, - - /// - /// A child workflow instance completed successfully. - /// - SubOrchestrationInstanceCompleted, - - /// - /// A child workflow instance failed. - /// - SubOrchestrationInstanceFailed, - - /// - /// A durable timer was created. - /// - TimerCreated, - - /// - /// A durable timer fired. - /// - TimerFired, - - /// - /// The orchestrator started a processing turn. - /// - OrchestratorStarted, - - /// - /// The orchestrator completed a processing turn. - /// - OrchestratorCompleted, - - /// - /// An event was sent to another workflow instance. - /// - EventSent, - - /// - /// An external event was raised for this workflow instance. - /// - EventRaised, - - /// - /// The workflow continued as new. - /// - ContinueAsNew, - - /// - /// The workflow execution was suspended. - /// - ExecutionSuspended, - - /// - /// The workflow execution was resumed. - /// - ExecutionResumed -} diff --git a/src/Dapr.Workflow.Abstractions/PropagatedHistory.cs b/src/Dapr.Workflow.Abstractions/PropagatedHistory.cs index eda643b7d..a1c905c0c 100644 --- a/src/Dapr.Workflow.Abstractions/PropagatedHistory.cs +++ b/src/Dapr.Workflow.Abstractions/PropagatedHistory.cs @@ -15,71 +15,120 @@ namespace Dapr.Workflow; using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Linq; /// -/// Contains the workflow history that was propagated from ancestor workflow instances. -/// Each entry corresponds to a single ancestor's history. +/// Workflow history propagated from one or more ancestor workflows to a child workflow or activity. /// /// -/// A workflow receives propagated history when it is scheduled with a -/// other than . -/// Use to retrieve the propagated history -/// inside a workflow implementation. +/// A propagated history is an ordered list of values, +/// one per ancestor workflow. Order is execution order: index 0 is the oldest ancestor, +/// the last entry is the immediate parent. +/// +/// Use the Get* / TryGet* methods to walk the list by app, instance, or workflow name. +/// Mirrors the PropagatedHistory type in the Go and Python SDKs. +/// /// public sealed class PropagatedHistory { private readonly IReadOnlyList _entries; /// - /// Initializes a new instance of with the given entries. + /// Initializes a new from the given workflow entries. /// - /// The propagated history entries from ancestor workflows. + /// + /// Workflow entries in execution order (ancestor first, immediate parent last). + /// public PropagatedHistory(IReadOnlyList entries) { _entries = entries ?? throw new ArgumentNullException(nameof(entries)); } /// - /// Gets the ordered list of propagated history entries. - /// The first entry corresponds to the immediate parent workflow; subsequent entries - /// correspond to progressively older ancestors when is used. + /// Returns every workflow entry in the propagated history, in execution order + /// (ancestor first, immediate parent last). /// - public IReadOnlyList Entries => _entries; + public IReadOnlyList GetWorkflows() => _entries; /// - /// Returns a new containing only entries from the specified App ID. + /// Returns an ordered, deduplicated list of app IDs in this propagated history. /// - /// The Dapr App ID to filter by. - /// A filtered instance. - public PropagatedHistory FilterByAppId(string appId) + public IReadOnlyList GetAppIds() { - ArgumentException.ThrowIfNullOrWhiteSpace(appId); - return new PropagatedHistory( - _entries.Where(e => string.Equals(e.AppId, appId, StringComparison.OrdinalIgnoreCase)).ToList()); + var seen = new HashSet(StringComparer.OrdinalIgnoreCase); + var result = new List(_entries.Count); + foreach (var entry in _entries) + { + if (seen.Add(entry.AppId)) + { + result.Add(entry.AppId); + } + } + + return result; } /// - /// Returns a new containing only the entry with the specified instance ID. + /// Returns every workflow entry whose name matches, in execution order. Useful when the + /// list contains the same name more than once (e.g. recursion or ContinueAsNew). /// - /// The workflow instance ID to filter by. - /// A filtered instance. - public PropagatedHistory FilterByInstanceId(string instanceId) + /// The workflow name to filter by. + /// An empty list when no match is found. + public IReadOnlyList GetWorkflowsByName(string name) { - ArgumentException.ThrowIfNullOrWhiteSpace(instanceId); - return new PropagatedHistory( - _entries.Where(e => string.Equals(e.InstanceId, instanceId, StringComparison.Ordinal)).ToList()); + ArgumentException.ThrowIfNullOrWhiteSpace(name); + return _entries + .Where(e => string.Equals(e.Name, name, StringComparison.OrdinalIgnoreCase)) + .ToList(); + } + + /// + /// Tries to return the most recent workflow entry whose name matches. + /// + /// The workflow name to look up. + /// When this method returns , the last matching workflow entry; otherwise . + /// if a matching entry was found; otherwise . + public bool TryGetLastWorkflowByName(string name, [NotNullWhen(true)] out PropagatedHistoryEntry? result) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + for (var i = _entries.Count - 1; i >= 0; i--) + { + if (string.Equals(_entries[i].Name, name, StringComparison.OrdinalIgnoreCase)) + { + result = _entries[i]; + return true; + } + } + + result = null; + return false; + } + + /// + /// Returns every workflow entry produced by the given app, in execution order. + /// + /// The Dapr App ID to filter by. + /// An empty list when no match is found. + public IReadOnlyList GetWorkflowsByAppId(string appId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(appId); + return _entries + .Where(e => string.Equals(e.AppId, appId, StringComparison.OrdinalIgnoreCase)) + .ToList(); } /// - /// Returns a new containing only entries for the specified workflow name. + /// Returns every workflow entry produced by the given instance, in execution order. + /// Usually a single entry, except when the same instance reappears via ContinueAsNew. /// - /// The workflow name to filter by. - /// A filtered instance. - public PropagatedHistory FilterByWorkflowName(string workflowName) + /// The workflow instance ID to filter by. + /// An empty list when no match is found. + public IReadOnlyList GetWorkflowsByInstanceId(string instanceId) { - ArgumentException.ThrowIfNullOrWhiteSpace(workflowName); - return new PropagatedHistory( - _entries.Where(e => string.Equals(e.WorkflowName, workflowName, StringComparison.Ordinal)).ToList()); + ArgumentException.ThrowIfNullOrWhiteSpace(instanceId); + return _entries + .Where(e => string.Equals(e.InstanceId, instanceId, StringComparison.Ordinal)) + .ToList(); } } diff --git a/src/Dapr.Workflow.Abstractions/PropagatedHistoryActivityResult.cs b/src/Dapr.Workflow.Abstractions/PropagatedHistoryActivityResult.cs new file mode 100644 index 000000000..9ed27c855 --- /dev/null +++ b/src/Dapr.Workflow.Abstractions/PropagatedHistoryActivityResult.cs @@ -0,0 +1,37 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Workflow; + +/// +/// A reconstructed view of a single activity invocation surfaced through propagated workflow history. +/// +/// The scheduled name of the activity. +/// Whether the activity was scheduled in the propagated history. +/// Whether the activity completed successfully. +/// Whether the activity failed. +/// The JSON-encoded input payload, or null when unset. +/// The JSON-encoded output payload, or null when the activity has not completed. +/// The failure details when is true, otherwise null. +/// +/// Mirrors the ActivityResult type in the Go and Python SDKs so cross-language +/// quickstarts and audit patterns line up. +/// +public sealed record PropagatedHistoryActivityResult( + string Name, + bool Started, + bool Completed, + bool Failed, + string? Input, + string? Output, + WorkflowTaskFailureDetails? FailureDetails); diff --git a/src/Dapr.Workflow.Abstractions/PropagatedHistoryChildWorkflowResult.cs b/src/Dapr.Workflow.Abstractions/PropagatedHistoryChildWorkflowResult.cs new file mode 100644 index 000000000..fb7ca74b6 --- /dev/null +++ b/src/Dapr.Workflow.Abstractions/PropagatedHistoryChildWorkflowResult.cs @@ -0,0 +1,34 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Workflow; + +/// +/// A reconstructed view of a single child workflow invocation surfaced through propagated workflow history. +/// +/// The scheduled name of the child workflow. +/// Whether the child workflow was scheduled in the propagated history. +/// Whether the child workflow completed successfully. +/// Whether the child workflow failed. +/// The JSON-encoded output payload, or null when the child workflow has not completed. +/// The failure details when is true, otherwise null. +/// +/// Mirrors the ChildWorkflowResult type in the Go and Python SDKs. +/// +public sealed record PropagatedHistoryChildWorkflowResult( + string Name, + bool Started, + bool Completed, + bool Failed, + string? Output, + WorkflowTaskFailureDetails? FailureDetails); diff --git a/src/Dapr.Workflow.Abstractions/PropagatedHistoryEntry.cs b/src/Dapr.Workflow.Abstractions/PropagatedHistoryEntry.cs index 427f1b26f..abbc903ba 100644 --- a/src/Dapr.Workflow.Abstractions/PropagatedHistoryEntry.cs +++ b/src/Dapr.Workflow.Abstractions/PropagatedHistoryEntry.cs @@ -13,17 +13,120 @@ namespace Dapr.Workflow; +using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; /// -/// Represents a segment of propagated workflow history originating from a single ancestor workflow instance. +/// A single workflow's contribution to a propagated history: the ancestor workflow's identity, +/// plus the activities and child workflows it executed. /// -/// The Dapr App ID of the application that ran the ancestor workflow. -/// The instance ID of the ancestor workflow. -/// The name of the ancestor workflow. -/// The ordered list of history events from the ancestor workflow. -public sealed record PropagatedHistoryEntry( - string AppId, - string InstanceId, - string WorkflowName, - IReadOnlyList Events); +/// The instance ID of the ancestor workflow. +/// The Dapr App ID that ran the ancestor workflow. +/// The name of the ancestor workflow. +/// Activities resolved from this entry, in execution order. +/// Child workflows resolved from this entry, in execution order. +/// +/// One exists per ancestor workflow in a +/// . Use and +/// to look up specific items in this entry; +/// the plural Get*ByName variants return every occurrence in execution order. +/// +public sealed class PropagatedHistoryEntry( + string instanceId, + string appId, + string name, + IReadOnlyList activities, + IReadOnlyList childWorkflows) +{ + private readonly IReadOnlyList _activities = + activities ?? throw new ArgumentNullException(nameof(activities)); + private readonly IReadOnlyList _childWorkflows = + childWorkflows ?? throw new ArgumentNullException(nameof(childWorkflows)); + + /// The instance ID of the ancestor workflow this entry describes. + public string InstanceId { get; } = instanceId ?? throw new ArgumentNullException(nameof(instanceId)); + + /// The Dapr App ID that ran this ancestor workflow. + public string AppId { get; } = appId ?? throw new ArgumentNullException(nameof(appId)); + + /// The name of this ancestor workflow. + public string Name { get; } = name ?? throw new ArgumentNullException(nameof(name)); + + /// All activities executed in this entry, in execution order. + public IReadOnlyList Activities => _activities; + + /// All child workflows started in this entry, in execution order. + public IReadOnlyList ChildWorkflows => _childWorkflows; + + /// + /// Returns every activity in this entry whose scheduled name matches, in execution order. + /// + /// The activity name to filter by. + /// An empty list when no match is found. + public IReadOnlyList GetActivitiesByName(string name) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + return _activities + .Where(a => string.Equals(a.Name, name, StringComparison.OrdinalIgnoreCase)) + .ToList(); + } + + /// + /// Tries to return the most recent activity in this entry whose name matches. + /// + /// The activity name to look up. + /// When this method returns , the last matching activity; otherwise . + /// if a matching activity was found; otherwise . + public bool TryGetLastActivityByName(string name, [NotNullWhen(true)] out PropagatedHistoryActivityResult? result) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + for (var i = _activities.Count - 1; i >= 0; i--) + { + if (string.Equals(_activities[i].Name, name, StringComparison.OrdinalIgnoreCase)) + { + result = _activities[i]; + return true; + } + } + + result = null; + return false; + } + + /// + /// Returns every child workflow in this entry whose name matches, in execution order. + /// + /// The child workflow name to filter by. + /// An empty list when no match is found. + public IReadOnlyList GetChildWorkflowsByName(string name) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + return _childWorkflows + .Where(c => string.Equals(c.Name, name, StringComparison.OrdinalIgnoreCase)) + .ToList(); + } + + /// + /// Tries to return the most recent child workflow in this entry whose name matches. + /// + /// The child workflow name to look up. + /// When this method returns , the last matching child workflow; otherwise . + /// if a matching child workflow was found; otherwise . + public bool TryGetLastChildWorkflowByName(string name, [NotNullWhen(true)] out PropagatedHistoryChildWorkflowResult? result) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + for (var i = _childWorkflows.Count - 1; i >= 0; i--) + { + if (string.Equals(_childWorkflows[i].Name, name, StringComparison.OrdinalIgnoreCase)) + { + result = _childWorkflows[i]; + return true; + } + } + + result = null; + return false; + } +} diff --git a/src/Dapr.Workflow.Abstractions/PropagatedHistoryEvent.cs b/src/Dapr.Workflow.Abstractions/PropagatedHistoryEvent.cs deleted file mode 100644 index 44e700c94..000000000 --- a/src/Dapr.Workflow.Abstractions/PropagatedHistoryEvent.cs +++ /dev/null @@ -1,24 +0,0 @@ -// ------------------------------------------------------------------------ -// Copyright 2026 The Dapr Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ------------------------------------------------------------------------ - -namespace Dapr.Workflow; - -using System; - -/// -/// Represents a single event in a propagated workflow history segment. -/// -/// The unique event ID within the workflow instance history. -/// The kind of history event. -/// The UTC timestamp when the event occurred. -public sealed record PropagatedHistoryEvent(int EventId, HistoryEventKind Kind, DateTimeOffset Timestamp); diff --git a/src/Dapr.Workflow.Abstractions/WorkflowContext.cs b/src/Dapr.Workflow.Abstractions/WorkflowContext.cs index de355eaf5..2ed3b2852 100644 --- a/src/Dapr.Workflow.Abstractions/WorkflowContext.cs +++ b/src/Dapr.Workflow.Abstractions/WorkflowContext.cs @@ -341,15 +341,17 @@ public virtual Task CallChildWorkflowAsync( /// specified a other than . /// /// - /// Use , , - /// or to narrow down the returned entries. + /// Use and + /// / + /// to look up specific items in the propagated history. The plural Get*ByName variants + /// return every match. /// /// /// This method always returns the same value regardless of whether the workflow is replaying. /// /// /// - /// A containing entries from ancestor workflows, + /// A containing the list of ancestor workflow entries, /// or null if no history was propagated to this workflow instance. /// public abstract PropagatedHistory? GetPropagatedHistory(); diff --git a/src/Dapr.Workflow.Abstractions/WorkflowTaskOptions.cs b/src/Dapr.Workflow.Abstractions/WorkflowTaskOptions.cs index 86b166912..ede71b177 100644 --- a/src/Dapr.Workflow.Abstractions/WorkflowTaskOptions.cs +++ b/src/Dapr.Workflow.Abstractions/WorkflowTaskOptions.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2023 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,7 +18,23 @@ namespace Dapr.Workflow; /// /// The workflow retry policy. /// The App ID indicating the app in which to find the named activity to run. -public record WorkflowTaskOptions(WorkflowRetryPolicy? RetryPolicy = null, string? TargetAppId = null); +/// +/// Determines which ancestor history events are propagated to the scheduled activity or child workflow. +/// Defaults to , meaning no history is propagated. +/// +public record WorkflowTaskOptions( + WorkflowRetryPolicy? RetryPolicy = null, + string? TargetAppId = null, + HistoryPropagationScope PropagationScope = HistoryPropagationScope.None) +{ + /// + /// Returns a new with the specified history propagation scope. + /// + /// The propagation scope to apply. + /// A new options instance with the propagation scope set. + public WorkflowTaskOptions WithHistoryPropagation(HistoryPropagationScope scope) => + this with { PropagationScope = scope }; +} /// /// Options for controlling the behavior of child workflow execution. @@ -34,13 +50,20 @@ public record ChildWorkflowTaskOptions( string? InstanceId = null, WorkflowRetryPolicy? RetryPolicy = null, string? TargetAppId = null, - HistoryPropagationScope PropagationScope = HistoryPropagationScope.None) : WorkflowTaskOptions(RetryPolicy, TargetAppId) + HistoryPropagationScope PropagationScope = HistoryPropagationScope.None) + : WorkflowTaskOptions(RetryPolicy, TargetAppId, PropagationScope) { /// /// Returns a new with the specified history propagation scope. /// /// The propagation scope to apply. /// A new options instance with the propagation scope set. - public ChildWorkflowTaskOptions WithHistoryPropagation(HistoryPropagationScope scope) => + /// + /// Hides the base method (records cannot override) so the derived type is returned — + /// callers must hold a reference to preserve + /// . Calling through a reference + /// invokes the base method and returns a plain . + /// + public new ChildWorkflowTaskOptions WithHistoryPropagation(HistoryPropagationScope scope) => this with { PropagationScope = scope }; } diff --git a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs index cdbfbe3ab..2aa18deec 100644 --- a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs +++ b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs @@ -170,16 +170,29 @@ private async Task CallActivityInternalAsync(string name, object? input, W return await HandleHistoryMatch(name, earlyCompletion, taskId); } + var scheduleTaskAction = new ScheduleTaskAction + { + Name = name, + Input = _workflowSerializer.Serialize(input), + Router = router, + TaskExecutionId = taskExecutionId + }; + + var activityPropagationScope = options?.PropagationScope ?? HistoryPropagationScope.None; + if (activityPropagationScope != HistoryPropagationScope.None) + { + scheduleTaskAction.HistoryPropagationScope = activityPropagationScope switch + { + HistoryPropagationScope.OwnHistory => Dapr.DurableTask.Protobuf.HistoryPropagationScope.OwnHistory, + HistoryPropagationScope.Lineage => Dapr.DurableTask.Protobuf.HistoryPropagationScope.Lineage, + _ => Dapr.DurableTask.Protobuf.HistoryPropagationScope.None + }; + } + _pendingActions.Add(taskId, new WorkflowAction { Id = taskId, - ScheduleTask = new ScheduleTaskAction - { - Name = name, - Input = _workflowSerializer.Serialize(input), - Router = router, - TaskExecutionId = taskExecutionId - }, + ScheduleTask = scheduleTaskAction, Router = router }); @@ -989,72 +1002,155 @@ private static WorkflowTaskFailedException CreateTaskFailedException(TaskFailedE } /// - /// Converts a proto message to a domain . + /// Converts a proto message into a public-facing + /// by resolving each scheduled activity and child workflow + /// against its matching completion/failure event. /// /// - /// Each entry is the deterministic byte representation - /// of a single ; we parse the bytes here on demand rather than re-marshalling. - /// Malformed event bytes are skipped (mapped to nothing) so a single bad event cannot crash the workflow. + /// Each entry is the deterministic byte + /// representation of a single ; we parse the bytes here. + /// SDK retries reuse TaskExecutionId, so we match completions on + /// TaskScheduledId (the scheduling event ID) rather than execution ID. + /// Malformed event bytes are surfaced as exceptions — a runtime sending unparseable + /// proto bytes is a contract violation we should not hide. /// private static PropagatedHistoryEntry ConvertChunk(PropagatedHistoryChunk chunk) { - var events = new List(chunk.RawEvents.Count); + var events = new List(chunk.RawEvents.Count); foreach (var rawEvent in chunk.RawEvents) { - HistoryEvent? historyEvent; - try + events.Add(HistoryEvent.Parser.ParseFrom(rawEvent)); + } + + // Pre-index completion / failure events by TaskScheduledId so each scheduled + // activity or child workflow resolves in O(1) instead of rescanning the list. + var taskCompletions = new Dictionary(); + var taskFailures = new Dictionary(); + var childCompletions = new Dictionary(); + var childFailures = new Dictionary(); + foreach (var e in events) + { + switch (e.EventTypeCase) { - historyEvent = HistoryEvent.Parser.ParseFrom(rawEvent); + case HistoryEvent.EventTypeOneofCase.TaskCompleted: + taskCompletions[e.TaskCompleted.TaskScheduledId] = e; + break; + case HistoryEvent.EventTypeOneofCase.TaskFailed: + taskFailures[e.TaskFailed.TaskScheduledId] = e; + break; + case HistoryEvent.EventTypeOneofCase.ChildWorkflowInstanceCompleted: + childCompletions[e.ChildWorkflowInstanceCompleted.TaskScheduledId] = e; + break; + case HistoryEvent.EventTypeOneofCase.ChildWorkflowInstanceFailed: + childFailures[e.ChildWorkflowInstanceFailed.TaskScheduledId] = e; + break; } - catch (InvalidProtocolBufferException) + } + + var activities = new List(); + var childWorkflows = new List(); + foreach (var historyEvent in events) + { + switch (historyEvent.EventTypeCase) { - continue; + case HistoryEvent.EventTypeOneofCase.TaskScheduled: + activities.Add(ResolveActivity(historyEvent, taskCompletions, taskFailures)); + break; + case HistoryEvent.EventTypeOneofCase.ChildWorkflowInstanceCreated: + childWorkflows.Add(ResolveChildWorkflow(historyEvent, childCompletions, childFailures)); + break; } - - events.Add(new PropagatedHistoryEvent( - historyEvent.EventId, - MapEventKind(historyEvent), - MapTimestamp(historyEvent.Timestamp))); } return new PropagatedHistoryEntry( - chunk.AppId, chunk.InstanceId, + chunk.AppId, chunk.WorkflowName, - events); + activities, + childWorkflows); } /// - /// Maps a proto to a . + /// Builds a by matching TaskCompleted / + /// TaskFailed against the scheduling event's EventId. /// - private static HistoryEventKind MapEventKind(HistoryEvent e) => e.EventTypeCase switch + private static PropagatedHistoryActivityResult ResolveActivity( + HistoryEvent scheduleEvent, + IReadOnlyDictionary completions, + IReadOnlyDictionary failures) { - HistoryEvent.EventTypeOneofCase.ExecutionStarted => HistoryEventKind.ExecutionStarted, - HistoryEvent.EventTypeOneofCase.ExecutionCompleted => HistoryEventKind.ExecutionCompleted, - HistoryEvent.EventTypeOneofCase.ExecutionTerminated => HistoryEventKind.ExecutionTerminated, - HistoryEvent.EventTypeOneofCase.TaskScheduled => HistoryEventKind.TaskScheduled, - HistoryEvent.EventTypeOneofCase.TaskCompleted => HistoryEventKind.TaskCompleted, - HistoryEvent.EventTypeOneofCase.TaskFailed => HistoryEventKind.TaskFailed, - HistoryEvent.EventTypeOneofCase.ChildWorkflowInstanceCreated => HistoryEventKind.SubOrchestrationInstanceCreated, - HistoryEvent.EventTypeOneofCase.ChildWorkflowInstanceCompleted => HistoryEventKind.SubOrchestrationInstanceCompleted, - HistoryEvent.EventTypeOneofCase.ChildWorkflowInstanceFailed => HistoryEventKind.SubOrchestrationInstanceFailed, - HistoryEvent.EventTypeOneofCase.TimerCreated => HistoryEventKind.TimerCreated, - HistoryEvent.EventTypeOneofCase.TimerFired => HistoryEventKind.TimerFired, - HistoryEvent.EventTypeOneofCase.WorkflowStarted => HistoryEventKind.OrchestratorStarted, - HistoryEvent.EventTypeOneofCase.WorkflowCompleted => HistoryEventKind.OrchestratorCompleted, - HistoryEvent.EventTypeOneofCase.EventSent => HistoryEventKind.EventSent, - HistoryEvent.EventTypeOneofCase.EventRaised => HistoryEventKind.EventRaised, - HistoryEvent.EventTypeOneofCase.ContinueAsNew => HistoryEventKind.ContinueAsNew, - HistoryEvent.EventTypeOneofCase.ExecutionSuspended => HistoryEventKind.ExecutionSuspended, - HistoryEvent.EventTypeOneofCase.ExecutionResumed => HistoryEventKind.ExecutionResumed, - _ => HistoryEventKind.Unknown - }; + var scheduled = scheduleEvent.TaskScheduled; + var scheduleId = scheduleEvent.EventId; + var completed = false; + var failed = false; + string? output = null; + WorkflowTaskFailureDetails? failureDetails = null; + + if (completions.TryGetValue(scheduleId, out var completion)) + { + completed = true; + output = completion.TaskCompleted.Result; + } + + if (failures.TryGetValue(scheduleId, out var failure)) + { + failed = true; + failureDetails = MapFailureDetails(failure.TaskFailed.FailureDetails); + } + + return new PropagatedHistoryActivityResult( + Name: scheduled.Name, + Started: true, + Completed: completed, + Failed: failed, + Input: scheduled.Input, + Output: output, + FailureDetails: failureDetails); + } /// - /// Converts a proto Timestamp to a . + /// Builds a by matching the create event's + /// EventId against subsequent ChildWorkflowInstanceCompleted / + /// ChildWorkflowInstanceFailed events. /// - private static DateTimeOffset MapTimestamp(Timestamp? timestamp) => - timestamp is not null - ? new DateTimeOffset(timestamp.ToDateTime(), TimeSpan.Zero) - : DateTimeOffset.MinValue; + private static PropagatedHistoryChildWorkflowResult ResolveChildWorkflow( + HistoryEvent createEvent, + IReadOnlyDictionary completions, + IReadOnlyDictionary failures) + { + var created = createEvent.ChildWorkflowInstanceCreated; + var creationId = createEvent.EventId; + var completed = false; + var failed = false; + string? output = null; + WorkflowTaskFailureDetails? failureDetails = null; + + if (completions.TryGetValue(creationId, out var completion)) + { + completed = true; + output = completion.ChildWorkflowInstanceCompleted.Result; + } + + if (failures.TryGetValue(creationId, out var failure)) + { + failed = true; + failureDetails = MapFailureDetails(failure.ChildWorkflowInstanceFailed.FailureDetails); + } + + return new PropagatedHistoryChildWorkflowResult( + Name: created.Name, + Started: true, + Completed: completed, + Failed: failed, + Output: output, + FailureDetails: failureDetails); + } + + private static WorkflowTaskFailureDetails? MapFailureDetails(TaskFailureDetails? failure) => + failure is null + ? null + : new WorkflowTaskFailureDetails( + errorType: failure.ErrorType ?? "Exception", + errorMessage: failure.ErrorMessage ?? string.Empty, + stackTrace: failure.StackTrace); } diff --git a/test/Dapr.IntegrationTest.Workflow/HistoryPropagationWorkflowTests.cs b/test/Dapr.IntegrationTest.Workflow/HistoryPropagationWorkflowTests.cs index c8c1655e9..b63360d9a 100644 --- a/test/Dapr.IntegrationTest.Workflow/HistoryPropagationWorkflowTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/HistoryPropagationWorkflowTests.cs @@ -397,7 +397,7 @@ public override async Task RunAsync(WorkflowContext conte await context.CreateTimer(TimeSpan.Zero); return new PropagationTestResult( ChildReceivedPropagatedHistory: propagated is not null, - PropagatedEntryCount: propagated?.Entries.Count ?? 0); + PropagatedEntryCount: propagated?.GetWorkflows().Count ?? 0); } } diff --git a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowHistoryPropagationTests.cs b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowHistoryPropagationTests.cs index f7b55f809..e1148d910 100644 --- a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowHistoryPropagationTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowHistoryPropagationTests.cs @@ -24,9 +24,13 @@ namespace Dapr.Workflow.Test.Worker.Internal; /// /// Tests for workflow history propagation: the SDK API surface for declaring -/// a propagation scope on , propagating -/// the scope to the outgoing , and -/// exposing inbound propagated history through . +/// a propagation scope on / +/// , propagating the scope to the +/// outgoing / , +/// and exposing inbound propagated history through +/// as +/// values, each carrying typed / +/// records. /// public class WorkflowHistoryPropagationTests { @@ -58,7 +62,7 @@ private static HistoryEvent MakeEvent(int eventId, Action configur var ev = new HistoryEvent { EventId = eventId, - Timestamp = Timestamp.FromDateTime(timestamp ?? StartTime) + Timestamp = Timestamp.FromDateTime(timestamp ?? StartTime), }; configure(ev); return ev; @@ -80,8 +84,57 @@ private static PropagatedHistoryChunk MakeChunk(string appId, string instanceId, return chunk; } + private static HistoryEvent TaskScheduled(int eventId, string name, string? input = null) => + MakeEvent(eventId, e => e.TaskScheduled = new TaskScheduledEvent + { + Name = name, + Input = input, + }); + + private static HistoryEvent TaskCompleted(int eventId, int scheduledId, string? result = null) => + MakeEvent(eventId, e => e.TaskCompleted = new TaskCompletedEvent + { + TaskScheduledId = scheduledId, + Result = result, + }); + + private static HistoryEvent TaskFailed(int eventId, int scheduledId, string errorMessage) => + MakeEvent(eventId, e => e.TaskFailed = new TaskFailedEvent + { + TaskScheduledId = scheduledId, + FailureDetails = new TaskFailureDetails + { + ErrorType = "TestException", + ErrorMessage = errorMessage, + }, + }); + + private static HistoryEvent ChildCreated(int eventId, string name) => + MakeEvent(eventId, e => e.ChildWorkflowInstanceCreated = new ChildWorkflowInstanceCreatedEvent + { + Name = name, + }); + + private static HistoryEvent ChildCompleted(int eventId, int creationId, string? result = null) => + MakeEvent(eventId, e => e.ChildWorkflowInstanceCompleted = new ChildWorkflowInstanceCompletedEvent + { + TaskScheduledId = creationId, + Result = result, + }); + + private static HistoryEvent ChildFailed(int eventId, int creationId, string errorMessage) => + MakeEvent(eventId, e => e.ChildWorkflowInstanceFailed = new ChildWorkflowInstanceFailedEvent + { + TaskScheduledId = creationId, + FailureDetails = new TaskFailureDetails + { + ErrorType = "TestException", + ErrorMessage = errorMessage, + }, + }); + // ------------------------------------------------------------------ - // GetPropagatedHistory + // GetPropagatedHistory — entry shape and ordering // ------------------------------------------------------------------ [Fact] @@ -99,7 +152,7 @@ public void GetPropagatedHistory_ReturnsNull_WhenEmptyChunksProvided() } [Fact] - public void GetPropagatedHistory_ReturnsSingleEntry_WhenOneChunkPropagated() + public void GetPropagatedHistory_ReturnsSingleWorkflow_WhenOneEntryPropagated() { var chunk = MakeChunk("parent-app", "parent-instance", "ParentWorkflow", MakeEvent(1, e => e.ExecutionStarted = new ExecutionStartedEvent { Name = "ParentWorkflow" })); @@ -109,197 +162,292 @@ public void GetPropagatedHistory_ReturnsSingleEntry_WhenOneChunkPropagated() var history = context.GetPropagatedHistory(); Assert.NotNull(history); - Assert.Single(history.Entries); - Assert.Equal("parent-app", history.Entries[0].AppId); - Assert.Equal("parent-instance", history.Entries[0].InstanceId); - Assert.Equal("ParentWorkflow", history.Entries[0].WorkflowName); - Assert.Single(history.Entries[0].Events); - Assert.Equal(HistoryEventKind.ExecutionStarted, history.Entries[0].Events[0].Kind); + var workflows = history.GetWorkflows(); + Assert.Single(workflows); + Assert.Equal("parent-app", workflows[0].AppId); + Assert.Equal("parent-instance", workflows[0].InstanceId); + Assert.Equal("ParentWorkflow", workflows[0].Name); + Assert.Empty(workflows[0].Activities); + Assert.Empty(workflows[0].ChildWorkflows); } [Fact] - public void GetPropagatedHistory_PreservesChunkOrder() + public void GetPropagatedHistory_PreservesEntryOrder() { - var parent = MakeChunk("p-app", "p-inst", "ParentWf", - MakeEvent(1, e => e.ExecutionStarted = new ExecutionStartedEvent { Name = "ParentWf" })); + // Entries arrive oldest-first: grandparent at index 0, immediate parent last. var grandparent = MakeChunk("gp-app", "gp-inst", "GrandparentWf", MakeEvent(1, e => e.ExecutionStarted = new ExecutionStartedEvent { Name = "GrandparentWf" })); + var parent = MakeChunk("p-app", "p-inst", "ParentWf", + MakeEvent(1, e => e.ExecutionStarted = new ExecutionStartedEvent { Name = "ParentWf" })); - var context = CreateContext(incomingPropagatedHistory: [parent, grandparent]); + var context = CreateContext(incomingPropagatedHistory: [grandparent, parent]); var history = context.GetPropagatedHistory(); Assert.NotNull(history); - Assert.Equal(2, history.Entries.Count); - Assert.Equal("p-inst", history.Entries[0].InstanceId); - Assert.Equal("gp-inst", history.Entries[1].InstanceId); + var workflows = history.GetWorkflows(); + Assert.Equal(2, workflows.Count); + Assert.Equal("gp-inst", workflows[0].InstanceId); + Assert.Equal("p-inst", workflows[1].InstanceId); } [Fact] - public void GetPropagatedHistory_MapsAllHistoryEventKinds() + public void GetPropagatedHistory_ThrowsOnMalformedRawEvents() { - var mappings = new Dictionary + // A runtime sending unparseable proto bytes is a contract violation; surface it + // instead of silently masking the bug. + var chunk = new PropagatedHistoryChunk { - { MakeEvent(1, e => e.ExecutionStarted = new ExecutionStartedEvent()), HistoryEventKind.ExecutionStarted }, - { MakeEvent(2, e => e.ExecutionCompleted = new ExecutionCompletedEvent()), HistoryEventKind.ExecutionCompleted }, - { MakeEvent(3, e => e.ExecutionTerminated = new ExecutionTerminatedEvent()), HistoryEventKind.ExecutionTerminated }, - { MakeEvent(4, e => e.TaskScheduled = new TaskScheduledEvent { Name = "a" }), HistoryEventKind.TaskScheduled }, - { MakeEvent(5, e => e.TaskCompleted = new TaskCompletedEvent()), HistoryEventKind.TaskCompleted }, - { MakeEvent(6, e => e.TaskFailed = new TaskFailedEvent()), HistoryEventKind.TaskFailed }, - { MakeEvent(7, e => e.ChildWorkflowInstanceCreated = new ChildWorkflowInstanceCreatedEvent()), HistoryEventKind.SubOrchestrationInstanceCreated }, - { MakeEvent(8, e => e.ChildWorkflowInstanceCompleted = new ChildWorkflowInstanceCompletedEvent()), HistoryEventKind.SubOrchestrationInstanceCompleted }, - { MakeEvent(9, e => e.ChildWorkflowInstanceFailed = new ChildWorkflowInstanceFailedEvent()), HistoryEventKind.SubOrchestrationInstanceFailed }, - { MakeEvent(10, e => e.TimerCreated = new TimerCreatedEvent()), HistoryEventKind.TimerCreated }, - { MakeEvent(11, e => e.TimerFired = new TimerFiredEvent()), HistoryEventKind.TimerFired }, - { MakeEvent(12, e => e.WorkflowStarted = new WorkflowStartedEvent()), HistoryEventKind.OrchestratorStarted }, - { MakeEvent(13, e => e.WorkflowCompleted = new WorkflowCompletedEvent()), HistoryEventKind.OrchestratorCompleted }, - { MakeEvent(14, e => e.EventSent = new EventSentEvent()), HistoryEventKind.EventSent }, - { MakeEvent(15, e => e.EventRaised = new EventRaisedEvent()), HistoryEventKind.EventRaised }, - { MakeEvent(16, e => e.ContinueAsNew = new ContinueAsNewEvent()), HistoryEventKind.ContinueAsNew }, - { MakeEvent(17, e => e.ExecutionSuspended = new ExecutionSuspendedEvent()), HistoryEventKind.ExecutionSuspended }, - { MakeEvent(18, e => e.ExecutionResumed = new ExecutionResumedEvent()), HistoryEventKind.ExecutionResumed }, + AppId = "app", + InstanceId = "inst", + WorkflowName = "Wf", }; + chunk.RawEvents.Add(ByteString.CopyFrom([0xff, 0xff, 0xff, 0xff, 0xff])); - var chunk = MakeChunk("app", "inst", "Wf", mappings.Keys.ToArray()); - var context = CreateContext(incomingPropagatedHistory: [chunk]); - var entry = context.GetPropagatedHistory()!.Entries.Single(); + Assert.Throws(() => CreateContext(incomingPropagatedHistory: [chunk])); + } - Assert.Equal(mappings.Count, entry.Events.Count); - var expectedKinds = mappings.Values.ToList(); - for (var i = 0; i < entry.Events.Count; i++) - { - Assert.Equal(expectedKinds[i], entry.Events[i].Kind); - } + // ------------------------------------------------------------------ + // Activity resolution from raw events + // ------------------------------------------------------------------ + + [Fact] + public void Activity_ResolvedAs_CompletedWithInputAndOutput() + { + var chunk = MakeChunk("app", "inst", "Wf", + TaskScheduled(eventId: 1, name: "ValidateMerchant", input: "\"merchant-1\""), + TaskCompleted(eventId: 2, scheduledId: 1, result: "true")); + + var context = CreateContext(incomingPropagatedHistory: [chunk]); + Assert.True(context.GetPropagatedHistory()!.TryGetLastWorkflowByName("Wf", out var workflow)); + Assert.True(workflow.TryGetLastActivityByName("ValidateMerchant", out var activity)); + + Assert.Equal("ValidateMerchant", activity.Name); + Assert.True(activity.Started); + Assert.True(activity.Completed); + Assert.False(activity.Failed); + Assert.Equal("\"merchant-1\"", activity.Input); + Assert.Equal("true", activity.Output); + Assert.Null(activity.FailureDetails); } [Fact] - public void GetPropagatedHistory_MapsUnsetEventTypeToUnknown() + public void Activity_ResolvedAs_FailedWithFailureDetails() { - // An event with no oneof case set should be mapped to Unknown rather than crashing. - var bareEvent = new HistoryEvent { EventId = 42, Timestamp = Timestamp.FromDateTime(StartTime) }; - var chunk = MakeChunk("app", "inst", "Wf", bareEvent); + var chunk = MakeChunk("app", "inst", "Wf", + TaskScheduled(eventId: 1, name: "ValidateCard"), + TaskFailed(eventId: 2, scheduledId: 1, errorMessage: "card declined")); var context = CreateContext(incomingPropagatedHistory: [chunk]); - var entry = context.GetPropagatedHistory()!.Entries.Single(); + Assert.True(context.GetPropagatedHistory()!.TryGetLastWorkflowByName("Wf", out var workflow)); + Assert.True(workflow.TryGetLastActivityByName("ValidateCard", out var activity)); - Assert.Single(entry.Events); - Assert.Equal(HistoryEventKind.Unknown, entry.Events[0].Kind); - Assert.Equal(42, entry.Events[0].EventId); + Assert.False(activity.Completed); + Assert.True(activity.Failed); + Assert.NotNull(activity.FailureDetails); + Assert.Equal("card declined", activity.FailureDetails.ErrorMessage); } [Fact] - public void GetPropagatedHistory_SkipsMalformedRawEvents() + public void Activity_ResolvedAs_StartedOnly_WhenNotYetCompleted() { - var chunk = new PropagatedHistoryChunk - { - AppId = "app", - InstanceId = "inst", - WorkflowName = "Wf", - }; - // Add a malformed event (not a valid serialized HistoryEvent). - chunk.RawEvents.Add(ByteString.CopyFrom(new byte[] { 0xff, 0xff, 0xff, 0xff, 0xff })); - // Followed by a well-formed event. - chunk.RawEvents.Add(MakeEvent(7, e => e.TaskCompleted = new TaskCompletedEvent()).ToByteString()); + var chunk = MakeChunk("app", "inst", "Wf", + TaskScheduled(eventId: 1, name: "PendingCheck", input: "\"in\"")); var context = CreateContext(incomingPropagatedHistory: [chunk]); - var entry = context.GetPropagatedHistory()!.Entries.Single(); + Assert.True(context.GetPropagatedHistory()!.TryGetLastWorkflowByName("Wf", out var workflow)); + Assert.True(workflow.TryGetLastActivityByName("PendingCheck", out var activity)); + + Assert.True(activity.Started); + Assert.False(activity.Completed); + Assert.False(activity.Failed); + Assert.Equal("\"in\"", activity.Input); + Assert.Null(activity.Output); + } + + [Fact] + public void TryGetLastActivityByName_ReturnsMostRecentInvocation_WhenRetried() + { + // Same activity scheduled twice — first completes, second fails. TryGet returns the failed (most recent). + var chunk = MakeChunk("app", "inst", "Wf", + TaskScheduled(eventId: 1, name: "ValidateCard", input: "\"first\""), + TaskCompleted(eventId: 2, scheduledId: 1, result: "true"), + TaskScheduled(eventId: 3, name: "ValidateCard", input: "\"second\""), + TaskFailed(eventId: 4, scheduledId: 3, errorMessage: "card declined")); - // Only the well-formed event survives. - Assert.Single(entry.Events); - Assert.Equal(7, entry.Events[0].EventId); - Assert.Equal(HistoryEventKind.TaskCompleted, entry.Events[0].Kind); + var context = CreateContext(incomingPropagatedHistory: [chunk]); + Assert.True(context.GetPropagatedHistory()!.TryGetLastWorkflowByName("Wf", out var workflow)); + + var all = workflow.GetActivitiesByName("ValidateCard"); + Assert.Equal(2, all.Count); + Assert.True(all[0].Completed); + Assert.True(all[1].Failed); + + Assert.True(workflow.TryGetLastActivityByName("ValidateCard", out var last)); + Assert.True(last.Failed); + Assert.Equal("card declined", last.FailureDetails!.ErrorMessage); } [Fact] - public void GetPropagatedHistory_PreservesEventTimestamp() + public void TryGetLastActivityByName_ReturnsFalse_WhenMissing() { - var when = new DateTime(2026, 06, 15, 12, 30, 45, DateTimeKind.Utc); var chunk = MakeChunk("app", "inst", "Wf", - MakeEvent(1, e => e.ExecutionStarted = new ExecutionStartedEvent(), timestamp: when)); + TaskScheduled(eventId: 1, name: "Real")); var context = CreateContext(incomingPropagatedHistory: [chunk]); - var entry = context.GetPropagatedHistory()!.Entries.Single(); + Assert.True(context.GetPropagatedHistory()!.TryGetLastWorkflowByName("Wf", out var workflow)); - Assert.Equal(when, entry.Events[0].Timestamp.UtcDateTime); + Assert.False(workflow.TryGetLastActivityByName("Missing", out var missing)); + Assert.Null(missing); } // ------------------------------------------------------------------ - // PropagatedHistory filters + // Child workflow resolution // ------------------------------------------------------------------ [Fact] - public void FilterByAppId_ReturnsOnlyMatchingEntries_CaseInsensitive() + public void ChildWorkflow_ResolvedAs_Completed() { - var history = new PropagatedHistory(new[] - { - new PropagatedHistoryEntry("app-a", "i1", "WfA", []), - new PropagatedHistoryEntry("app-b", "i2", "WfB", []), - new PropagatedHistoryEntry("APP-A", "i3", "WfA2", []), - }); + var chunk = MakeChunk("app", "inst", "Wf", + ChildCreated(eventId: 1, name: "ProcessPayment"), + ChildCompleted(eventId: 2, creationId: 1, result: "\"paid\"")); - var filtered = history.FilterByAppId("app-a"); + var context = CreateContext(incomingPropagatedHistory: [chunk]); + Assert.True(context.GetPropagatedHistory()!.TryGetLastWorkflowByName("Wf", out var workflow)); + Assert.True(workflow.TryGetLastChildWorkflowByName("ProcessPayment", out var child)); - Assert.Equal(2, filtered.Entries.Count); - Assert.All(filtered.Entries, e => Assert.Equal("app-a", e.AppId, StringComparer.OrdinalIgnoreCase)); + Assert.True(child.Started); + Assert.True(child.Completed); + Assert.False(child.Failed); + Assert.Equal("\"paid\"", child.Output); } [Fact] - public void FilterByInstanceId_ReturnsOnlyMatchingEntry_CaseSensitive() + public void ChildWorkflow_ResolvedAs_Failed() { - var history = new PropagatedHistory(new[] - { - new PropagatedHistoryEntry("app", "Instance-1", "Wf", []), - new PropagatedHistoryEntry("app", "instance-2", "Wf", []), - }); + var chunk = MakeChunk("app", "inst", "Wf", + ChildCreated(eventId: 1, name: "ProcessPayment"), + ChildFailed(eventId: 2, creationId: 1, errorMessage: "boom")); + + var context = CreateContext(incomingPropagatedHistory: [chunk]); + Assert.True(context.GetPropagatedHistory()!.TryGetLastWorkflowByName("Wf", out var workflow)); + Assert.True(workflow.TryGetLastChildWorkflowByName("ProcessPayment", out var child)); - Assert.Single(history.FilterByInstanceId("Instance-1").Entries); - Assert.Empty(history.FilterByInstanceId("instance-1").Entries); + Assert.True(child.Failed); + Assert.Equal("boom", child.FailureDetails!.ErrorMessage); } [Fact] - public void FilterByWorkflowName_ReturnsOnlyMatchingEntries_CaseSensitive() + public void TryGetLastChildWorkflowByName_ReturnsFalse_WhenMissing() { - var history = new PropagatedHistory(new[] - { - new PropagatedHistoryEntry("app", "i1", "PaymentWorkflow", []), - new PropagatedHistoryEntry("app", "i2", "OrderWorkflow", []), - new PropagatedHistoryEntry("app", "i3", "PaymentWorkflow", []), - new PropagatedHistoryEntry("app", "i4", "paymentworkflow", []), - }); + var chunk = MakeChunk("app", "inst", "Wf"); + var context = CreateContext(incomingPropagatedHistory: [chunk]); + Assert.True(context.GetPropagatedHistory()!.TryGetLastWorkflowByName("Wf", out var workflow)); - var filtered = history.FilterByWorkflowName("PaymentWorkflow"); + Assert.False(workflow.TryGetLastChildWorkflowByName("Missing", out var missing)); + Assert.Null(missing); + } - Assert.Equal(2, filtered.Entries.Count); - Assert.All(filtered.Entries, e => Assert.Equal("PaymentWorkflow", e.WorkflowName)); + // ------------------------------------------------------------------ + // PropagatedHistory-level helpers + // ------------------------------------------------------------------ + + [Fact] + public void GetAppIds_ReturnsOrderedDeduplicatedList() + { + var history = new PropagatedHistory([ + new PropagatedHistoryEntry("i1", "appA", "WfA", [], []), + new PropagatedHistoryEntry("i2", "appB", "WfB", [], []), + new PropagatedHistoryEntry("i3", "appA", "WfA2", [], []), + ]); + + Assert.Equal(["appA", "appB"], history.GetAppIds()); } [Fact] - public void Filters_ThrowOnEmptyOrWhitespace() + public void TryGetLastWorkflowByName_ReturnsMostRecent_WhenNameRepeated() + { + var history = new PropagatedHistory([ + new PropagatedHistoryEntry("wf-1", "app", "Loop", [], []), + new PropagatedHistoryEntry("wf-2", "app", "Loop", [], []), + ]); + + Assert.True(history.TryGetLastWorkflowByName("Loop", out var last)); + Assert.Equal("wf-2", last.InstanceId); + Assert.Equal(2, history.GetWorkflowsByName("Loop").Count); + } + + [Fact] + public void TryGetLastWorkflowByName_ReturnsFalse_WhenMissing() { var history = new PropagatedHistory([]); - Assert.Throws(() => history.FilterByAppId(string.Empty)); - Assert.Throws(() => history.FilterByInstanceId(" ")); - Assert.Throws(() => history.FilterByWorkflowName(string.Empty)); + Assert.False(history.TryGetLastWorkflowByName("Missing", out var missing)); + Assert.Null(missing); } [Fact] - public void PropagatedHistory_Ctor_ThrowsOnNullEntries() + public void PropagatedHistory_Ctor_ThrowsOnNullWorkflows() { Assert.Throws(() => new PropagatedHistory(null!)); } + [Fact] + public void PropagatedHistory_NameAndAppIdLookups_AreCaseInsensitive() + { + // Workflow / activity names register case-insensitively in WorkflowsFactory, + // and AppIds are matched case-insensitively elsewhere in the SDK. The propagated + // history lookups must follow the same contract. + var activity = new PropagatedHistoryActivityResult( + Name: "ValidateMerchant", Started: true, Completed: true, Failed: false, + Input: null, Output: null, FailureDetails: null); + var child = new PropagatedHistoryChildWorkflowResult( + Name: "FraudDetection", Started: true, Completed: true, Failed: false, + Output: null, FailureDetails: null); + var entry = new PropagatedHistoryEntry("inst-1", "AppA", "MerchantCheckout", [activity], [child]); + var history = new PropagatedHistory([ + entry, + new PropagatedHistoryEntry("inst-2", "appa", "OtherWf", [], []), + ]); + + Assert.Single(history.GetAppIds()); + Assert.Single(history.GetWorkflowsByAppId("APPA")); + Assert.Single(history.GetWorkflowsByName("merchantcheckout")); + Assert.True(history.TryGetLastWorkflowByName("MERCHANTCHECKOUT", out _)); + Assert.Single(entry.GetActivitiesByName("validatemerchant")); + Assert.True(entry.TryGetLastActivityByName("VALIDATEMERCHANT", out _)); + Assert.Single(entry.GetChildWorkflowsByName("frauddetection")); + Assert.True(entry.TryGetLastChildWorkflowByName("FRAUDDETECTION", out _)); + } + // ------------------------------------------------------------------ - // ChildWorkflowTaskOptions.WithHistoryPropagation + // Scheduling helpers — WithHistoryPropagation // ------------------------------------------------------------------ [Fact] - public void WithHistoryPropagation_SetsScope() + public void WorkflowTaskOptions_WithHistoryPropagation_SetsScope() + { + var options = new WorkflowTaskOptions().WithHistoryPropagation(HistoryPropagationScope.Lineage); + Assert.Equal(HistoryPropagationScope.Lineage, options.PropagationScope); + } + + [Fact] + public void WorkflowTaskOptions_WithHistoryPropagation_DoesNotMutateOriginal() + { + var original = new WorkflowTaskOptions(); + var updated = original.WithHistoryPropagation(HistoryPropagationScope.OwnHistory); + + Assert.Equal(HistoryPropagationScope.None, original.PropagationScope); + Assert.Equal(HistoryPropagationScope.OwnHistory, updated.PropagationScope); + } + + [Fact] + public void ChildWorkflowTaskOptions_WithHistoryPropagation_SetsScope() { var options = new ChildWorkflowTaskOptions().WithHistoryPropagation(HistoryPropagationScope.Lineage); Assert.Equal(HistoryPropagationScope.Lineage, options.PropagationScope); } [Fact] - public void WithHistoryPropagation_DoesNotMutateOriginal() + public void ChildWorkflowTaskOptions_WithHistoryPropagation_PreservesOtherFields() { var original = new ChildWorkflowTaskOptions(InstanceId: "id-1"); var updated = original.WithHistoryPropagation(HistoryPropagationScope.OwnHistory); @@ -309,8 +457,66 @@ public void WithHistoryPropagation_DoesNotMutateOriginal() Assert.Equal("id-1", updated.InstanceId); } + [Fact] + public void ChildWorkflowTaskOptions_WithHistoryPropagation_ReturnsDerivedType() + { + // Locks in the `new`-hiding behavior on the derived record: invoked on a + // ChildWorkflowTaskOptions reference, WithHistoryPropagation must return + // ChildWorkflowTaskOptions (not the base WorkflowTaskOptions) so InstanceId + // and other derived fields survive the with-expression. + var original = new ChildWorkflowTaskOptions(InstanceId: "id-1"); + var updated = original.WithHistoryPropagation(HistoryPropagationScope.Lineage); + + Assert.IsType(updated); + } + + // ------------------------------------------------------------------ + // Outbound action scope — activity path + // ------------------------------------------------------------------ + + [Fact] + public void CallActivityAsync_DefaultScope_LeavesActionScopeUnset() + { + var context = CreateContext(instanceId: "parent", appId: "my-app"); + _ = context.CallActivityAsync("Echo"); + + var action = context.PendingActions + .Select(a => a.ScheduleTask) + .First(a => a is not null); + + Assert.Equal(Dapr.DurableTask.Protobuf.HistoryPropagationScope.None, action.HistoryPropagationScope); + } + + [Fact] + public void CallActivityAsync_WithOwnHistory_SetsScopeOnAction() + { + var context = CreateContext(instanceId: "parent", appId: "my-app"); + _ = context.CallActivityAsync("Echo", + options: new WorkflowTaskOptions(PropagationScope: HistoryPropagationScope.OwnHistory)); + + var action = context.PendingActions + .Select(a => a.ScheduleTask) + .First(a => a is not null); + + Assert.Equal(Dapr.DurableTask.Protobuf.HistoryPropagationScope.OwnHistory, action.HistoryPropagationScope); + } + + [Fact] + public void CallActivityAsync_WithLineage_SetsScopeOnAction() + { + var context = CreateContext(instanceId: "parent", appId: "my-app"); + _ = context.CallActivityAsync("Echo", + options: new WorkflowTaskOptions().WithHistoryPropagation(HistoryPropagationScope.Lineage)); + + var action = context.PendingActions + .Select(a => a.ScheduleTask) + .First(a => a is not null); + + Assert.Equal(Dapr.DurableTask.Protobuf.HistoryPropagationScope.Lineage, action.HistoryPropagationScope); + } + // ------------------------------------------------------------------ - // CallChildWorkflowAsync — outbound HistoryPropagationScope on action + // Outbound action scope — child workflow path // ------------------------------------------------------------------ [Fact] @@ -323,7 +529,6 @@ public void CallChildWorkflowAsync_DefaultScope_LeavesActionScopeUnset() .Select(a => a.CreateChildWorkflow) .First(a => a is not null); - // Default = None => HistoryPropagationScope field is left at its proto default (None / unset). Assert.Equal(Dapr.DurableTask.Protobuf.HistoryPropagationScope.None, action.HistoryPropagationScope); } @@ -346,7 +551,7 @@ public void CallChildWorkflowAsync_WithLineage_SetsScopeOnAction() { var context = CreateContext(instanceId: "parent", appId: "my-app"); _ = context.CallChildWorkflowAsync("ChildWf", - options: new ChildWorkflowTaskOptions(PropagationScope: HistoryPropagationScope.Lineage)); + options: new ChildWorkflowTaskOptions().WithHistoryPropagation(HistoryPropagationScope.Lineage)); var action = context.PendingActions .Select(a => a.CreateChildWorkflow)