"""Mux per-cue WAVs into one storyboard's recording.mp4 at narration offsets. Reads two manifests inside ``output//``: * ``audio/index.json`` (synth output) — per-cue WAV filename + measured duration. Generated BEFORE recording in one batched Qwen3-TTS call. * ``narration.json`` (recorder output) — per-cue ``videoTimeMs`` against the trimmed video. Generated DURING recording. Joins them by ``cueIndex`` (index in the cue list, 1:1 between manifests), runs ffmpeg with one ``adelay`` per cue plus a single ``amix``, copies the video stream, and writes ``output//recording.narrated.mp4``. Run from the ``video/`` directory after recording: uv run --project tts python tts/mux.py --storyboard recording """ from __future__ import annotations import argparse import json import shutil import subprocess import sys from pathlib import Path def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--storyboard", required=True, help="Storyboard slug (matches Storyboard.name in src/storyboard.ts).", ) parser.add_argument( "--output-dir", type=Path, default=Path("output"), help="Root output directory; per-storyboard files live in //.", ) parser.add_argument( "--replace", action="store_true", help="After muxing, atomically replace the storyboard's recording.mp4.", ) return parser.parse_args() def main() -> int: args = parse_args() if not shutil.which("ffmpeg"): print("[mux] ffmpeg not on PATH", file=sys.stderr) return 1 storyboard_dir = args.output_dir / args.storyboard audio_dir = storyboard_dir / "audio" narration_path = storyboard_dir / "narration.json" video_path = storyboard_dir / "recording.mp4" out_path = storyboard_dir / "recording.narrated.mp4" audio_index_path = audio_dir / "index.json" if not audio_index_path.exists(): print( f"[mux] {audio_index_path} not found; run tts/synth.py first", file=sys.stderr, ) return 1 if not narration_path.exists(): print( f"[mux] {narration_path} not found; the recorder must run before mux", file=sys.stderr, ) return 1 if not video_path.exists(): print(f"[mux] video not found: {video_path}", file=sys.stderr) return 1 audio_index = json.loads(audio_index_path.read_text()) audio_items = [it for it in audio_index.get("items", []) if it.get("wav")] if not audio_items: print("[mux] synth produced no cues; copying video unchanged", file=sys.stderr) shutil.copyfile(video_path, out_path) return 0 narration = json.loads(narration_path.read_text()) nar_cues = list(narration.get("cues", [])) if len(nar_cues) != len(audio_items): print( f"[mux] cue count mismatch: synth has {len(audio_items)} cues, " f"recorder logged {len(nar_cues)}. Re-run preflight + synth + record.", file=sys.stderr, ) return 1 # Sort audio items by cueIndex so list-order matches the recorder's # cue list (which is also in cue order). Then pair 1:1. audio_by_index = {int(it["cueIndex"]): it for it in audio_items} items = [] for i, nar in enumerate(nar_cues): audio = audio_by_index.get(i) if audio is None: print(f"[mux] no synth wav for cue {i}", file=sys.stderr) return 1 items.append( { "cueIndex": i, "wav": audio["wav"], "durationMs": int(audio["durationMs"]), "videoTimeMs": int(nar["videoTimeMs"]), "text": nar.get("text", ""), } ) # Refuse to mux overlapping cues — amix would silently mash voices on top # of each other. Sort by start so the order matches what we'll actually # play, then check that each cue ends before the next one starts. ordered = sorted(items, key=lambda it: it["videoTimeMs"]) overlaps: list[str] = [] for prev, nxt in zip(ordered, ordered[1:]): prev_end = prev["videoTimeMs"] + prev["durationMs"] nxt_start = nxt["videoTimeMs"] if prev_end > nxt_start: overlaps.append( f"cue {prev['cueIndex']} ends at {prev_end}ms but cue {nxt['cueIndex']} " f"starts at {nxt_start}ms (overlap {prev_end - nxt_start}ms)" ) if overlaps: raise SystemExit( "[mux] refusing to produce overlapping narration:\n - " + "\n - ".join(overlaps) ) cmd: list[str] = ["ffmpeg", "-y", "-loglevel", "warning", "-i", str(video_path)] for it in items: cmd += ["-i", str(audio_dir / it["wav"])] filter_parts: list[str] = [] mix_inputs: list[str] = [] for n, it in enumerate(items, start=1): delay_ms = max(0, it["videoTimeMs"]) label = f"a{n}" # adelay needs one delay per channel; "all=1" applies the same delay # to every channel, which is what we want for mono narration. filter_parts.append( f"[{n}:a]aresample=async=1,adelay={delay_ms}|{delay_ms}:all=1[{label}]" ) mix_inputs.append(f"[{label}]") mix = ( f"{''.join(mix_inputs)}amix=inputs={len(items)}" f":duration=longest:dropout_transition=0:normalize=0[aout]" ) filter_complex = ";".join(filter_parts + [mix]) cmd += [ "-filter_complex", filter_complex, "-map", "0:v:0", "-map", "[aout]", "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", "-shortest", "-movflags", "+faststart", str(out_path), ] print( f"[mux] [{args.storyboard}] muxing {len(items)} narration cues into {out_path}", flush=True, ) result = subprocess.run(cmd) if result.returncode != 0: print(f"[mux] ffmpeg exited {result.returncode}", file=sys.stderr) return result.returncode if args.replace: out_path.replace(video_path) print(f"[mux] replaced {video_path} with narrated copy", flush=True) return 0 if __name__ == "__main__": raise SystemExit(main())