Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 59 additions & 5 deletions folder_paths.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import time
import mimetypes
import logging
Expand All @@ -7,6 +8,34 @@

from comfy.cli_args import args

# exFAT does not update directory mtime when files are added/removed,
# so the cache invalidation needs an extra check on such filesystems.
_exfat_checked = False
_is_exfat_cache: dict[str, bool] = {}

def _is_exfat(path: str) -> bool:
"""Check if the given path resides on an exFAT volume (Windows only)."""
global _exfat_checked, _is_exfat_cache
if sys.platform != "win32":
return False
# Cache per drive letter
drive = os.path.splitdrive(path)[0]
if drive in _is_exfat_cache:
return _is_exfat_cache[drive]
try:
import ctypes
kernel32 = ctypes.windll.kernel32
# Get volume information
fs_name = ctypes.create_unicode_buffer(256)
if kernel32.GetVolumeInformationW(drive + "\\", None, 0, None, None, None, fs_name, 256):
result = fs_name.value.upper() == "EXFAT"
else:
result = False
except Exception:
result = False
_is_exfat_cache[drive] = result
return result

supported_pt_extensions: set[str] = {'.ckpt', '.pt', '.pt2', '.bin', '.pth', '.safetensors', '.pkl', '.sft'}

folder_names_and_paths: dict[str, tuple[list[str], set[str]]] = {}
Expand Down Expand Up @@ -308,19 +337,22 @@ def get_folder_paths(folder_name: str) -> list[str]:
folder_name = map_legacy(folder_name)
return folder_names_and_paths[folder_name][0][:]

def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) -> tuple[list[str], dict[str, float]]:
def recursive_search(directory: str, excluded_dir_names: list[str] | None=None) -> tuple[list[str], dict[str, float], dict[str, int]]:
if not os.path.isdir(directory):
return [], {}
return [], {}, {}

if excluded_dir_names is None:
excluded_dir_names = []

result = []
dirs = {}
entry_counts = {}

# Attempt to add the initial directory to dirs with error handling
try:
dirs[directory] = os.path.getmtime(directory)
if _is_exfat(directory):
entry_counts[directory] = len(os.listdir(directory))
except FileNotFoundError:
logging.warning(f"Warning: Unable to access {directory}. Skipping this path.")

Expand All @@ -343,11 +375,13 @@ def recursive_search(directory: str, excluded_dir_names: list[str] | None=None)
path: str = os.path.join(dirpath, d)
try:
dirs[path] = os.path.getmtime(path)
if _is_exfat(path):
entry_counts[path] = len(os.listdir(path))
except FileNotFoundError:
logging.warning(f"Warning: Unable to access {path}. Skipping this path.")
continue
logging.debug("found {} files".format(len(result)))
return result, dirs
return result, dirs, entry_counts

def filter_files_extensions(files: Collection[str], extensions: Collection[str]) -> list[str]:
return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions or len(extensions) == 0, files)))
Expand Down Expand Up @@ -390,11 +424,16 @@ def get_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float], f
output_list = set()
folders = folder_names_and_paths[folder_name]
output_folders = {}
all_entry_counts = {}
for x in folders[0]:
files, folders_all = recursive_search(x, excluded_dir_names=[".git"])
files, folders_all, entry_counts = recursive_search(x, excluded_dir_names=[".git"])
output_list.update(filter_files_extensions(files, folders[1]))
output_folders = {**output_folders, **folders_all}
# Store entry counts with a key suffix to avoid colliding with mtime keys
for k, v in entry_counts.items():
all_entry_counts[k + "::entry_count"] = v

output_folders.update(all_entry_counts)
return sorted(list(output_list)), output_folders, time.perf_counter()

def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float], float] | None:
Expand All @@ -410,10 +449,25 @@ def cached_filename_list_(folder_name: str) -> tuple[list[str], dict[str, float]
out = filename_list_cache[folder_name]

for x in out[1]:
# Skip entry count keys used for exFAT cache invalidation
if x.endswith("::entry_count"):
continue
time_modified = out[1][x]
folder = x
if os.path.getmtime(folder) != time_modified:
try:
current_mtime = os.path.getmtime(folder)
except FileNotFoundError:
return None
if current_mtime != time_modified:
return None
# exFAT does not update directory mtime on file changes;
# use entry count as a fallback invalidation signal.
if _is_exfat(folder):
try:
if len(os.listdir(folder)) != out[1][folder + "::entry_count"]:
return None
except (KeyError, FileNotFoundError):
return None

folders = folder_names_and_paths[folder_name]
for x in folders[0]:
Expand Down