[bugfix] add optional MEDIA_DECODE_TIMEOUT to prevent silent media-decode hang#9541
[bugfix] add optional MEDIA_DECODE_TIMEOUT to prevent silent media-decode hang#9541HaozheZhang6 wants to merge 1 commit into
Conversation
…code hang Corrupt or unsupported audio/video clips can make the native decoders (librosa->audioread/ffmpeg, decord) deadlock in C while holding the GIL, which silently hangs a DataLoader worker forever with GPUs idle and no error logged. Add an opt-in hard wall-clock timeout: when MEDIA_DECODE_TIMEOUT (seconds) > 0, the decode runs in a forked worker that is killed on overrun and raises TimeoutError so one bad clip cannot freeze the whole run. Default (unset/0) keeps the original in-process path with zero overhead. Applied to load_audio (the reproduced hang path); the helper is reusable for the decord video paths. Fixes modelscope#9507
There was a problem hiding this comment.
Code Review
This pull request introduces a subprocess-based timeout mechanism (_decode_with_timeout) for media decoding to prevent silent hangs in DataLoader workers, along with corresponding unit tests. A critical review comment points out that using multiprocessing.SimpleQueue can lead to a deadlock if the decoded payload exceeds the OS pipe buffer limit, as the parent process joins the child before reading from the queue. It is recommended to use multiprocessing.Pipe and poll(timeout) instead to avoid this issue.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def _decode_worker(queue, func, args, kwargs): | ||
| try: | ||
| queue.put((True, func(*args, **kwargs))) | ||
| except Exception as e: | ||
| queue.put((False, e)) | ||
|
|
||
|
|
||
| def _decode_with_timeout(func: Callable[..., _T], *args, **kwargs) -> _T: | ||
| # Native media decoders (audioread/ffmpeg, decord) can deadlock in C while holding the GIL on a | ||
| # corrupt/unsupported clip, silently hanging a DataLoader worker forever; a signal-based timeout | ||
| # can't interrupt them. When `MEDIA_DECODE_TIMEOUT` (seconds) > 0, decode in a killable subprocess. | ||
| timeout = get_env_args('media_decode_timeout', float, 0) | ||
| if not timeout or timeout <= 0: | ||
| return func(*args, **kwargs) | ||
| import multiprocessing as mp | ||
|
|
||
| # Fork the decode worker: load_audio runs inside the data pipeline where fork is already the | ||
| # norm (PyTorch DataLoader), and unlike forkserver/spawn it does not re-import the training | ||
| # entrypoint per call. Fall back to the default context where fork is unavailable. | ||
| try: | ||
| ctx = mp.get_context('fork') | ||
| except ValueError: | ||
| ctx = mp.get_context() | ||
| queue = ctx.SimpleQueue() | ||
| process = ctx.Process(target=_decode_worker, args=(queue, func, args, kwargs)) | ||
| process.start() | ||
| process.join(timeout) | ||
| if process.is_alive(): | ||
| process.terminate() | ||
| process.join() | ||
| raise TimeoutError(f'Media decode exceeded MEDIA_DECODE_TIMEOUT={timeout}s and was killed ' | ||
| '(likely a corrupt or unsupported clip).') | ||
| if process.exitcode != 0: | ||
| raise RuntimeError(f'Media decode subprocess exited abnormally (exitcode={process.exitcode}).') | ||
| ok, payload = queue.get() | ||
| if not ok: | ||
| raise payload | ||
| return payload |
There was a problem hiding this comment.
Using multiprocessing.SimpleQueue can lead to a silent deadlock when the decoded payload is large.
The Issue
SimpleQueue is backed by an OS pipe. If the serialized payload (e.g., decoded audio arrays or video frames) exceeds the OS pipe buffer limit (typically 64KB on Linux), the child process's queue.put() call will block indefinitely until the parent process reads from the queue. However, the parent process calls process.join(timeout) before reading from the queue. This creates a classic deadlock: the child is blocked waiting for the parent to read, and the parent is blocked waiting for the child to finish. This will cause a false TimeoutError and terminate the child process for any large media file.
The Solution
Use multiprocessing.Pipe and poll(timeout) instead. This allows the parent process to detect if data is available to read (or if the child has exited/errored) and read from the pipe, which unblocks the child process if the buffer fills up.
def _decode_worker(conn, func, args, kwargs):
try:
conn.send((True, func(*args, **kwargs)))
except Exception as e:
conn.send((False, e))
finally:
conn.close()
def _decode_with_timeout(func: Callable[..., _T], *args, **kwargs) -> _T:
# Native media decoders (audioread/ffmpeg, decord) can deadlock in C while holding the GIL on a
# corrupt/unsupported clip, silently hanging a DataLoader worker forever; a signal-based timeout
# can't interrupt them. When `MEDIA_DECODE_TIMEOUT` (seconds) > 0, decode in a killable subprocess.
timeout = get_env_args('media_decode_timeout', float, 0)
if not timeout or timeout <= 0:
return func(*args, **kwargs)
import multiprocessing as mp
# Fork the decode worker: load_audio runs inside the data pipeline where fork is already the
# norm (PyTorch DataLoader), and unlike forkserver/spawn it does not re-import the training
# entrypoint per call. Fall back to the default context where fork is unavailable.
try:
ctx = mp.get_context('fork')
except ValueError:
ctx = mp.get_context()
parent_conn, child_conn = ctx.Pipe(duplex=False)
process = ctx.Process(target=_decode_worker, args=(child_conn, func, args, kwargs))
process.start()
child_conn.close()
if parent_conn.poll(timeout):
try:
ok, payload = parent_conn.recv()
except EOFError:
process.join()
raise RuntimeError(f'Media decode subprocess exited abnormally (exitcode={process.exitcode}).')
process.join()
if not ok:
raise payload
return payload
else:
process.terminate()
process.join()
raise TimeoutError(f'Media decode exceeded MEDIA_DECODE_TIMEOUT={timeout}s and was killed '
'(likely a corrupt or unsupported clip).')
PR type
PR information
Fixes #9507.
During multimodal SFT a corrupt or unsupported clip can make the native media decoders deadlock in C while holding the GIL —
librosa.loadfalling back toaudioread.ffdec.FFmpegAudioFile(the known ffmpeg pipe-starvation hang), anddecord.get_batch. The decode runs in-loop in the data pipeline with no bound, so a single bad sample silently hangs a DataLoader worker forever: process alive, GPUs at 0%, no exception, no log line. A signal-based timeout can't help here — the decoder holds the GIL, soSIGALRMnever fires.This adds an opt-in hard wall-clock timeout. When
MEDIA_DECODE_TIMEOUT(seconds) > 0, the decode runs in a forked worker that is killed on overrun and raisesTimeoutError, so one bad clip can't freeze the run. Unset/0 (default) keeps the original in-process path with zero overhead — non-breaking._decode_with_timeout(func, *args, **kwargs)— generic, env-gated viaget_env_args('media_decode_timeout', ...)to match the existing env-arg convention.load_audiodecodes through it (body split into_load_audioso it's picklable for the worker).forkcontext on purpose:load_audioruns inside the data pipeline where fork is already the norm (the DataLoader itself forks workers), and unlike forkserver/spawn it doesn't re-import the training entrypoint per call. Falls back to the default context where fork is unavailable.Scope is the audio path (the one reproduced in the issue). The helper is reusable for the decord
load_video_*paths — glad to extend it to them in this PR if you'd prefer one change.One note: on a multi-threaded host process Python prints a
fork()DeprecationWarning, the same one the DataLoader triggers; kept fork deliberately for the reasons above, but happy to switch the mechanism if you'd rather.Experiment results
tests/utils/test_vision_utils.py(CPU-only, no GPU/model/network):End-to-end on a real 16 kHz WAV,
load_audioreturns an identical array (shape(3200,)) with the timeout off (in-process) and on (forked decode), confirming the wrapped path decodes correctly. isort + flake8 + yapf (repo config) clean on the changed files.