Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/xdist/scheduler/loadscope.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ def remove_node(self, node: WorkerController) -> str | None:
node has no more pending items.
"""
workload = self.assigned_work.pop(node)
# Also remove from registered_collections to allow replacement workers
# to properly register their collections (#1189)
self.registered_collections.pop(node, None)

if not self._pending_of(workload):
return None

Expand All @@ -196,8 +200,14 @@ def remove_node(self, node: WorkerController) -> str | None:
"Unable to identify crashitem on a workload with pending items"
)

# Made uncompleted work unit available again
self.workqueue.update(workload)
# Only requeue work units that have pending items (#1323)
# Completed work units should not be requeued as they would
# result in empty nodeids_indexes causing workers to hang
pending_work = OrderedDict()
for scope, work_unit in workload.items():
if not all(work_unit.values()):
pending_work[scope] = work_unit
self.workqueue.update(pending_work)

for node in self.assigned_work:
self._reschedule(node)
Expand Down Expand Up @@ -231,7 +241,9 @@ def add_node_collection(
self.collection, collection, other_node.gateway.id, node.gateway.id
)
self.log(msg)
return
# Still register collection so the node can participate
# _check_nodes_have_same_collection() will report the mismatch
# when schedule() is called (#1189)

self.registered_collections[node] = list(collection)

Expand Down