Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 144 additions & 58 deletions src/supervision/utils/video.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from __future__ import annotations

import os
import shutil
import subprocess
import tempfile
import threading
import time
from collections import deque
Expand Down Expand Up @@ -208,6 +212,62 @@ def get_video_frames_generator(
video.release()


def _mux_audio(
source_path: str,
silent_video_path: str,
target_path: str,
) -> bool:
"""
Copy audio from *source_path* into *silent_video_path* and write the
result to *target_path* using ffmpeg. Returns True on success.

The mux command uses stream-copy so it is fast (no re-encode) and
format-agnostic. The ``-map 1:a?`` flag makes audio optional: if the
source has no audio stream ffmpeg exits cleanly and we fall back to
copying the silent video as-is.
"""
if shutil.which("ffmpeg") is None:
logger.warning(
"ffmpeg not found on PATH; audio will not be copied to %s.",
target_path,
)
return False

cmd = [
"ffmpeg",
"-y", # overwrite output without prompt
"-i", silent_video_path, # processed video (no audio)
"-i", source_path, # original source (for audio)
"-c", "copy", # stream-copy — no re-encode
"-map", "0:v:0", # video track from processed file
"-map", "1:a?", # audio track from source (optional)
"-shortest", # trim to shortest stream
target_path,
]
try:
result = subprocess.run(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
timeout=300,
)
if result.returncode != 0:
logger.warning(
"ffmpeg exited with code %d; audio may not be copied. "
"stderr: %s",
result.returncode,
result.stderr.decode(errors="replace"),
)
return False
return True
except subprocess.TimeoutExpired:
logger.warning("ffmpeg timed out; audio will not be copied to %s.", target_path)
return False
except Exception as exc: # pragma: no cover
logger.warning("ffmpeg failed (%s); audio will not be copied.", exc)
return False


def process_video(
source_path: str,
target_path: str,
Expand All @@ -218,6 +278,7 @@ def process_video(
writer_buffer: int = 32,
show_progress: bool = False,
progress_message: str = "Processing video",
copy_audio: bool = True,
) -> None:
"""
Process video frames asynchronously using a threaded pipeline.
Expand Down Expand Up @@ -251,6 +312,10 @@ def process_video(
show_progress: Whether to display a tqdm progress bar during processing.
Default is False.
progress_message: Description shown in the progress bar.
copy_audio: If True (default), attempt to copy the audio stream from
`source_path` into `target_path` using ffmpeg. Falls back gracefully
(with a warning) when ffmpeg is not installed or the source has no
audio. Set to False to skip audio copying entirely.

Returns:
None
Expand Down Expand Up @@ -280,6 +345,19 @@ def callback(frame, frame_index):
else video_info.total_frames or 0
)

# When audio copying is requested we write to a temporary silent file first,
# then mux the audio in a second pass with ffmpeg.
use_audio = copy_audio and shutil.which("ffmpeg") is not None
if use_audio:
target_dir = os.path.dirname(os.path.abspath(target_path))
target_ext = os.path.splitext(target_path)[1] or ".mp4"
tmp_fd, silent_path = tempfile.mkstemp(
suffix=target_ext, dir=target_dir
)
os.close(tmp_fd)
else:
silent_path = target_path

frame_read_queue: Queue[tuple[int, npt.NDArray[np.uint8]] | None] = Queue(
maxsize=prefetch
)
Expand All @@ -304,68 +382,76 @@ def writer_thread(video_sink: VideoSink) -> None:
video_sink.write_frame(frame=frame)

reader_worker = threading.Thread(target=reader_thread, daemon=True)
with VideoSink(target_path=target_path, video_info=video_info) as video_sink:
writer_worker = threading.Thread(
target=writer_thread,
args=(video_sink,),
daemon=True,
)

reader_worker.start()
writer_worker.start()

progress_bar = tqdm(
total=total_frames,
disable=not show_progress,
desc=progress_message,
)

exception_in_worker: Exception | None = None
read_finished = False
try:
with VideoSink(target_path=silent_path, video_info=video_info) as video_sink:
writer_worker = threading.Thread(
target=writer_thread,
args=(video_sink,),
daemon=True,
)

reader_worker.start()
writer_worker.start()

progress_bar = tqdm(
total=total_frames,
disable=not show_progress,
desc=progress_message,
)

exception_in_worker: Exception | None = None
read_finished = False

try:
while True:
read_item = frame_read_queue.get()
if read_item is None:
read_finished = True
break

frame_index, frame = read_item
try:
processed_frame = callback(frame, frame_index)
frame_write_queue.put(processed_frame)
progress_bar.update(1)
except Exception as exc:
exception_in_worker = exc
break
finally:
try:
frame_write_queue.put(None, timeout=1)
except Full:
# Queue is full; this is a best-effort attempt to enqueue the sentinel.
# If we cannot enqueue it, the writer thread will still complete based
# on previously queued frames or other shutdown conditions.
pass
if not read_finished:
while True:
# Use timeout to prevent indefinite blocking if reader thread fails
read_item = frame_read_queue.get()
if read_item is None:
read_finished = True
break

frame_index, frame = read_item
try:
read_item = frame_read_queue.get(timeout=1)
if read_item is None:
break
# If we timeout waiting for a frame, only assume failure if reader
# thread is no longer alive. Otherwise, keep waiting as the reader
# may simply be slow (for example, due to a slow source).
except Empty:
if not reader_worker.is_alive():
break
# Reader is still alive; continue waiting for frames.
continue
reader_worker.join(timeout=10)
writer_worker.join(timeout=10)
progress_bar.close()
if exception_in_worker is not None:
raise exception_in_worker
processed_frame = callback(frame, frame_index)
frame_write_queue.put(processed_frame)
progress_bar.update(1)
except Exception as exc:
exception_in_worker = exc
break
finally:
try:
frame_write_queue.put(None, timeout=1)
except Full:
pass
if not read_finished:
while True:
try:
read_item = frame_read_queue.get(timeout=1)
if read_item is None:
break
except Empty:
if not reader_worker.is_alive():
break
continue
reader_worker.join(timeout=10)
writer_worker.join(timeout=10)
progress_bar.close()
if exception_in_worker is not None:
raise exception_in_worker

# Video sink is now closed — mux audio if requested.
if use_audio:
success = _mux_audio(
source_path=source_path,
silent_video_path=silent_path,
target_path=target_path,
)
if not success:
# ffmpeg failed or no audio — promote silent file to target.
shutil.move(silent_path, target_path)
silent_path = target_path # prevent double-cleanup
finally:
if use_audio and silent_path != target_path and os.path.exists(silent_path):
os.remove(silent_path)


class FPSMonitor:
Expand Down
Loading