diff --git a/src/supervision/utils/video.py b/src/supervision/utils/video.py index 690bb176aa..f75c4d7274 100644 --- a/src/supervision/utils/video.py +++ b/src/supervision/utils/video.py @@ -1,5 +1,9 @@ from __future__ import annotations +import os +import shutil +import subprocess +import tempfile import threading import time from collections import deque @@ -208,6 +212,62 @@ def get_video_frames_generator( video.release() +def _mux_audio( + source_path: str, + silent_video_path: str, + target_path: str, +) -> bool: + """ + Copy audio from *source_path* into *silent_video_path* and write the + result to *target_path* using ffmpeg. Returns True on success. + + The mux command uses stream-copy so it is fast (no re-encode) and + format-agnostic. The ``-map 1:a?`` flag makes audio optional: if the + source has no audio stream ffmpeg exits cleanly and we fall back to + copying the silent video as-is. + """ + if shutil.which("ffmpeg") is None: + logger.warning( + "ffmpeg not found on PATH; audio will not be copied to %s.", + target_path, + ) + return False + + cmd = [ + "ffmpeg", + "-y", # overwrite output without prompt + "-i", silent_video_path, # processed video (no audio) + "-i", source_path, # original source (for audio) + "-c", "copy", # stream-copy — no re-encode + "-map", "0:v:0", # video track from processed file + "-map", "1:a?", # audio track from source (optional) + "-shortest", # trim to shortest stream + target_path, + ] + try: + result = subprocess.run( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + timeout=300, + ) + if result.returncode != 0: + logger.warning( + "ffmpeg exited with code %d; audio may not be copied. " + "stderr: %s", + result.returncode, + result.stderr.decode(errors="replace"), + ) + return False + return True + except subprocess.TimeoutExpired: + logger.warning("ffmpeg timed out; audio will not be copied to %s.", target_path) + return False + except Exception as exc: # pragma: no cover + logger.warning("ffmpeg failed (%s); audio will not be copied.", exc) + return False + + def process_video( source_path: str, target_path: str, @@ -218,6 +278,7 @@ def process_video( writer_buffer: int = 32, show_progress: bool = False, progress_message: str = "Processing video", + copy_audio: bool = True, ) -> None: """ Process video frames asynchronously using a threaded pipeline. @@ -251,6 +312,10 @@ def process_video( show_progress: Whether to display a tqdm progress bar during processing. Default is False. progress_message: Description shown in the progress bar. + copy_audio: If True (default), attempt to copy the audio stream from + `source_path` into `target_path` using ffmpeg. Falls back gracefully + (with a warning) when ffmpeg is not installed or the source has no + audio. Set to False to skip audio copying entirely. Returns: None @@ -280,6 +345,19 @@ def callback(frame, frame_index): else video_info.total_frames or 0 ) + # When audio copying is requested we write to a temporary silent file first, + # then mux the audio in a second pass with ffmpeg. + use_audio = copy_audio and shutil.which("ffmpeg") is not None + if use_audio: + target_dir = os.path.dirname(os.path.abspath(target_path)) + target_ext = os.path.splitext(target_path)[1] or ".mp4" + tmp_fd, silent_path = tempfile.mkstemp( + suffix=target_ext, dir=target_dir + ) + os.close(tmp_fd) + else: + silent_path = target_path + frame_read_queue: Queue[tuple[int, npt.NDArray[np.uint8]] | None] = Queue( maxsize=prefetch ) @@ -304,68 +382,76 @@ def writer_thread(video_sink: VideoSink) -> None: video_sink.write_frame(frame=frame) reader_worker = threading.Thread(target=reader_thread, daemon=True) - with VideoSink(target_path=target_path, video_info=video_info) as video_sink: - writer_worker = threading.Thread( - target=writer_thread, - args=(video_sink,), - daemon=True, - ) - - reader_worker.start() - writer_worker.start() - - progress_bar = tqdm( - total=total_frames, - disable=not show_progress, - desc=progress_message, - ) - - exception_in_worker: Exception | None = None - read_finished = False + try: + with VideoSink(target_path=silent_path, video_info=video_info) as video_sink: + writer_worker = threading.Thread( + target=writer_thread, + args=(video_sink,), + daemon=True, + ) + + reader_worker.start() + writer_worker.start() + + progress_bar = tqdm( + total=total_frames, + disable=not show_progress, + desc=progress_message, + ) + + exception_in_worker: Exception | None = None + read_finished = False - try: - while True: - read_item = frame_read_queue.get() - if read_item is None: - read_finished = True - break - - frame_index, frame = read_item - try: - processed_frame = callback(frame, frame_index) - frame_write_queue.put(processed_frame) - progress_bar.update(1) - except Exception as exc: - exception_in_worker = exc - break - finally: try: - frame_write_queue.put(None, timeout=1) - except Full: - # Queue is full; this is a best-effort attempt to enqueue the sentinel. - # If we cannot enqueue it, the writer thread will still complete based - # on previously queued frames or other shutdown conditions. - pass - if not read_finished: while True: - # Use timeout to prevent indefinite blocking if reader thread fails + read_item = frame_read_queue.get() + if read_item is None: + read_finished = True + break + + frame_index, frame = read_item try: - read_item = frame_read_queue.get(timeout=1) - if read_item is None: - break - # If we timeout waiting for a frame, only assume failure if reader - # thread is no longer alive. Otherwise, keep waiting as the reader - # may simply be slow (for example, due to a slow source). - except Empty: - if not reader_worker.is_alive(): - break - # Reader is still alive; continue waiting for frames. - continue - reader_worker.join(timeout=10) - writer_worker.join(timeout=10) - progress_bar.close() - if exception_in_worker is not None: - raise exception_in_worker + processed_frame = callback(frame, frame_index) + frame_write_queue.put(processed_frame) + progress_bar.update(1) + except Exception as exc: + exception_in_worker = exc + break + finally: + try: + frame_write_queue.put(None, timeout=1) + except Full: + pass + if not read_finished: + while True: + try: + read_item = frame_read_queue.get(timeout=1) + if read_item is None: + break + except Empty: + if not reader_worker.is_alive(): + break + continue + reader_worker.join(timeout=10) + writer_worker.join(timeout=10) + progress_bar.close() + if exception_in_worker is not None: + raise exception_in_worker + + # Video sink is now closed — mux audio if requested. + if use_audio: + success = _mux_audio( + source_path=source_path, + silent_video_path=silent_path, + target_path=target_path, + ) + if not success: + # ffmpeg failed or no audio — promote silent file to target. + shutil.move(silent_path, target_path) + silent_path = target_path # prevent double-cleanup + finally: + if use_audio and silent_path != target_path and os.path.exists(silent_path): + os.remove(silent_path) class FPSMonitor: