Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ UpdatePollCompletion(
CxPlatUpdateExecutionContexts(Worker);
}

void
CxPlatWorkerPoolWorkerDrainEvents(
_In_ CXPLAT_WORKER* Worker
);

void
CxPlatProcessEvents(
Comment thread
anrossi marked this conversation as resolved.
Outdated
_In_ CXPLAT_WORKER* Worker
);

BOOLEAN
CxPlatWorkerPoolInitWorker(
_Inout_ CXPLAT_WORKER* Worker,
Expand Down Expand Up @@ -449,6 +459,17 @@ CxPlatWorkerPoolDelete(
#else
UNREFERENCED_PARAMETER(RefType);
#endif

if (RefType == CXPLAT_WORKER_POOL_REF_EXTERNAL) {
Comment thread
anrossi marked this conversation as resolved.
Outdated
//
// In the case of external execution, it's possible for ExecutionDelete
// to run before all the queues have been drained of internal cleanup work.
// Run all the workers until there's nothing left to do here.
//
for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) {
CxPlatWorkerPoolWorkerDrainEvents(&WorkerPool->Workers[i]);
}
}
CxPlatRundownReleaseAndWait(&WorkerPool->Rundown);

#if DEBUG
Expand Down Expand Up @@ -666,6 +687,42 @@ CxPlatWorkerPoolWorkerPoll(
return Worker->State.WaitTime;
}

void
CxPlatWorkerPoolWorkerDrainEvents(
Comment thread
anrossi marked this conversation as resolved.
Outdated
_In_ CXPLAT_WORKER* Worker
)
{
#if DEBUG
uint32_t Iterations = 0;
#endif
do {
Worker->State.TimeNow = CxPlatTimeUs64();
Worker->State.ThreadID = CxPlatCurThreadID();

CxPlatRunExecutionContexts(Worker);
if (Worker->State.WaitTime && InterlockedFetchAndClearBoolean(&Worker->Running)) {
Worker->State.TimeNow = CxPlatTimeUs64();
CxPlatRunExecutionContexts(Worker); // Run once more to handle race conditions
}

//
// Set the wait time to zero here to process as soon as possible.
// Otherwise, CxPlatProcessEvents may wait this many milliseconds.
//
Worker->State.WaitTime = 0;
Comment thread
anrossi marked this conversation as resolved.
Outdated

//
// Assume there is no work to do, and this will update to zero if work was done.
//
Worker->State.NoWorkCount = 1;
Comment thread
anrossi marked this conversation as resolved.
Outdated
CxPlatProcessEvents(Worker);

#if DEBUG
CXPLAT_DBG_ASSERTMSG(++Iterations < 10, "Is the library still active?");
Comment thread
anrossi marked this conversation as resolved.
Outdated
#endif
} while (Worker->State.NoWorkCount == 0);
Comment thread
anrossi marked this conversation as resolved.
Outdated
}

#define DYNAMIC_POOL_PROCESSING_PERIOD 1000000 // 1 second
#define DYNAMIC_POOL_PRUNE_COUNT 8

Expand Down
Loading