Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/OrchestrationSessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage.Tests
using System.Threading.Tasks;
using DurableTask.AzureStorage.Messaging;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Storage;
using DurableTask.AzureStorage.Tracking;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
Expand Down Expand Up @@ -223,5 +224,82 @@ public void AbortAllSessions_NoSessions_DoesNotThrow()
manager.GetStats(out _, out _, out int count);
Assert.AreEqual(0, count, "Should still have no active sessions");
}

[TestMethod]
public async Task GetNextSessionAsync_DrainedReadyQueueNode_IsIgnored()
{
var settings = new AzureStorageOrchestrationServiceSettings
{
StorageAccountClientProvider = new StorageAccountClientProvider("UseDevelopmentStorage=true"),
};
var stats = new AzureStorageOrchestrationServiceStats();
var trackingStore = new Mock<ITrackingStore>();

using var manager = new OrchestrationSessionManager(
"testaccount",
settings,
stats,
trackingStore.Object);

var storageClient = new AzureStorageClient(settings);
var messageManager = new MessageManager(settings, storageClient, settings.TaskHubName);
var controlQueue = new ControlQueue(storageClient, "partition-0", messageManager);

object pendingBatch = CreatePendingBatch(controlQueue);
object node = AddPendingBatchNode(manager, pendingBatch);
RemovePendingBatchNode(manager, node);
EnqueueReadyForProcessingNode(manager, node);

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
try
{
await manager.GetNextSessionAsync(entitiesOnly: false, cts.Token);
Assert.Fail("Expected cancellation after the drained node was skipped.");
}
catch (OperationCanceledException)
{
}
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
}

static object CreatePendingBatch(ControlQueue controlQueue)
{
Type pendingBatchType = typeof(OrchestrationSessionManager)
.GetNestedType("PendingMessageBatch", BindingFlags.NonPublic);

return Activator.CreateInstance(
pendingBatchType,
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic,
binder: null,
args: new object[] { controlQueue, "instance1", "execution1" },
culture: null);
}

static object AddPendingBatchNode(OrchestrationSessionManager manager, object pendingBatch)
{
object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
MethodInfo addLast = pendingBatches.GetType().GetMethod("AddLast", new[] { pendingBatch.GetType() });
return addLast.Invoke(pendingBatches, new[] { pendingBatch });
}

static void RemovePendingBatchNode(OrchestrationSessionManager manager, object node)
{
object pendingBatches = GetPrivateField(manager, "pendingOrchestrationMessageBatches");
MethodInfo remove = pendingBatches.GetType().GetMethod("Remove", new[] { node.GetType() });
remove.Invoke(pendingBatches, new[] { node });
}

static void EnqueueReadyForProcessingNode(OrchestrationSessionManager manager, object node)
{
object readyQueue = GetPrivateField(manager, "orchestrationsReadyForProcessingQueue");
MethodInfo enqueue = readyQueue.GetType().GetMethod("Enqueue");
enqueue.Invoke(readyQueue, new[] { node });
}

static object GetPrivateField(object target, string fieldName)
{
FieldInfo field = target.GetType().GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Instance);
Assert.IsNotNull(field);
return field.GetValue(target);
}
}
}
45 changes: 45 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace DurableTask.AzureStorage.Messaging
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.AzureStorage.Storage;
using DurableTask.Core;

class ControlQueue : TaskHubQueue, IDisposable
{
Expand Down Expand Up @@ -209,6 +210,50 @@ public override Task AbandonMessageAsync(MessageData message, SessionBase? sessi
return base.AbandonMessageAsync(message, session);
}

/// <summary>
/// Abandons a message with zero visibility timeout so it becomes immediately visible
/// for another partition owner to pick up. This is used during drain to avoid stranding
/// messages that were dequeued but not yet promoted to active sessions.
/// </summary>
public async Task AbandonMessageForDrainAsync(MessageData message)
{
this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _);

QueueMessage queueMessage = message.OriginalQueueMessage;
TaskMessage taskMessage = message.TaskMessage;
OrchestrationInstance instance = taskMessage.OrchestrationInstance;

this.settings.Logger.AbandoningMessage(
this.storageAccountName,
this.settings.TaskHubName,
taskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(taskMessage.Event),
queueMessage.MessageId,
instance.InstanceId,
instance.ExecutionId,
this.storageQueue.Name,
message.SequenceNumber,
queueMessage.PopReceipt,
visibilityTimeoutSeconds: 0);

try
{
await this.storageQueue.UpdateMessageAsync(
queueMessage,
TimeSpan.Zero,
clientRequestId: null);
}
catch (Exception e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
this.Name,
$"Failed to abandon message {queueMessage.MessageId} during drain: {e.Message}");
}
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
}

public override Task DeleteMessageAsync(MessageData message, SessionBase? session = null)
{
this.stats.PendingOrchestratorMessages.TryRemove(message.OriginalQueueMessage.MessageId, out _);
Expand Down
57 changes: 56 additions & 1 deletion src/DurableTask.AzureStorage/OrchestrationSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,60 @@ public async Task DrainAsync(string partitionId, CloseReason reason, Cancellatio
}
finally
{
// Remove the partition from memory
// Make dequeued-but-undispatched messages visible before dropping the partition.
await this.AbandonPendingMessagesAsync(partitionId);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 2d9f611. Drain now waits for the dequeue loop to stop before scanning pending batches, and AddMessageToPendingOrchestration returns messages for immediate zero-timeout abandon if the control queue has already been released.


this.RemoveQueue(partitionId, reason, caller);
}
}

/// <summary>
/// Abandons all pending (dequeued but not yet dispatched) messages for the specified partition,
/// making them immediately visible in the Azure Storage queue for the new partition owner.
/// This prevents a throughput gap equal to the visibility timeout duration when a partition
/// is released during drain.
/// </summary>
async Task AbandonPendingMessagesAsync(string partitionId)
{
var messagesToAbandon = new List<(ControlQueue Queue, MessageData Message)>();

lock (this.messageAndSessionLock)
{
var node = this.pendingOrchestrationMessageBatches.First;
while (node != null)
{
LinkedListNode<PendingMessageBatch>? next = node.Next;
PendingMessageBatch batch = node.Value;

if (string.Equals(batch.ControlQueue.Name, partitionId, StringComparison.OrdinalIgnoreCase))
{
foreach (MessageData message in batch.Messages)
{
messagesToAbandon.Add((batch.ControlQueue, message));
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 2d9f611. Prefetch now checks whether the pending-batch node is still active before fetching history, before enqueueing, and before scheduling retries, so detached or released batches exit instead of retrying.

this.pendingOrchestrationMessageBatches.Remove(node);
}

node = next;
}
}

if (messagesToAbandon.Count > 0)
{
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Abandoning {messagesToAbandon.Count} pending message(s) during drain to make them immediately visible for the new partition owner.");

await messagesToAbandon.ParallelForEachAsync(
this.settings.MaxStorageOperationConcurrency,
item => item.Queue.AbandonMessageForDrainAsync(item.Message));
}
}

/// <summary>
/// This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it
/// queries table storage to ensure that each message has a matching record in the Instances table. If not, this method
Expand Down Expand Up @@ -592,6 +641,12 @@ async Task ScheduleOrchestrationStatePrefetch(

lock (this.messageAndSessionLock)
{
// Drain may have removed this batch after it was queued for dispatch.
if (node.List != this.pendingOrchestrationMessageBatches)
{
continue;
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in c7a6691. GetNextSessionAsync now treats released queues separately: ready nodes are removed and abandoned immediately, and the delayed different-generation requeue path rechecks release state before reattaching. Added regressions for both released ready nodes and released delayed requeue nodes.


PendingMessageBatch nextBatch = node.Value;
this.pendingOrchestrationMessageBatches.Remove(node);

Expand Down
Loading