perfect-postcode/video/tts/mux.py

188 lines
6 KiB
Python

"""Mux per-cue WAVs into recording.mp4 at their narration offsets.
Reads two manifests:
* ``output/audio/index.json`` (synth output) — per-cue WAV filename + measured
duration. Generated BEFORE recording in one batched Qwen3-TTS call.
* ``output/narration.json`` (recorder output) — per-cue ``videoTimeMs`` against
the trimmed video. Generated DURING recording.
Joins them by ``cueIndex`` (index in the cue list, 1:1 between manifests),
runs ffmpeg with one ``adelay`` per cue plus a single ``amix``, copies the
video stream, and writes ``output/recording.narrated.mp4``.
Run from the ``video/`` directory after recording:
uv run --project tts python tts/mux.py
"""
from __future__ import annotations
import argparse
import json
import shutil
import subprocess
import sys
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--audio-dir", type=Path, default=Path("output/audio"))
parser.add_argument(
"--narration",
type=Path,
default=Path("output/narration.json"),
help="Per-cue videoTimeMs manifest written by the recorder.",
)
parser.add_argument("--video", type=Path, default=Path("output/recording.mp4"))
parser.add_argument(
"--out",
type=Path,
default=Path("output/recording.narrated.mp4"),
)
parser.add_argument(
"--replace",
action="store_true",
help="After muxing, atomically replace --video with --out.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if not shutil.which("ffmpeg"):
print("[mux] ffmpeg not on PATH", file=sys.stderr)
return 1
audio_index_path = args.audio_dir / "index.json"
if not audio_index_path.exists():
print(
f"[mux] {audio_index_path} not found; run tts/synth.py first",
file=sys.stderr,
)
return 1
if not args.narration.exists():
print(
f"[mux] {args.narration} not found; the recorder must run before mux",
file=sys.stderr,
)
return 1
if not args.video.exists():
print(f"[mux] video not found: {args.video}", file=sys.stderr)
return 1
audio_index = json.loads(audio_index_path.read_text())
audio_items = [it for it in audio_index.get("items", []) if it.get("wav")]
if not audio_items:
print("[mux] synth produced no cues; copying video unchanged", file=sys.stderr)
shutil.copyfile(args.video, args.out)
return 0
narration = json.loads(args.narration.read_text())
nar_cues = list(narration.get("cues", []))
if len(nar_cues) != len(audio_items):
print(
f"[mux] cue count mismatch: synth has {len(audio_items)} cues, "
f"recorder logged {len(nar_cues)}. Re-run preflight + synth + record.",
file=sys.stderr,
)
return 1
# Sort audio items by cueIndex so list-order matches the recorder's
# cue list (which is also in cue order). Then pair 1:1.
audio_by_index = {int(it["cueIndex"]): it for it in audio_items}
items = []
for i, nar in enumerate(nar_cues):
audio = audio_by_index.get(i)
if audio is None:
print(f"[mux] no synth wav for cue {i}", file=sys.stderr)
return 1
items.append(
{
"cueIndex": i,
"wav": audio["wav"],
"durationMs": int(audio["durationMs"]),
"videoTimeMs": int(nar["videoTimeMs"]),
"text": nar.get("text", ""),
}
)
# Refuse to mux overlapping cues — amix would silently mash voices on top
# of each other. Sort by start so the order matches what we'll actually
# play, then check that each cue ends before the next one starts.
ordered = sorted(items, key=lambda it: it["videoTimeMs"])
overlaps: list[str] = []
for prev, nxt in zip(ordered, ordered[1:]):
prev_end = prev["videoTimeMs"] + prev["durationMs"]
nxt_start = nxt["videoTimeMs"]
if prev_end > nxt_start:
overlaps.append(
f"cue {prev['cueIndex']} ends at {prev_end}ms but cue {nxt['cueIndex']} "
f"starts at {nxt_start}ms (overlap {prev_end - nxt_start}ms)"
)
if overlaps:
raise SystemExit(
"[mux] refusing to produce overlapping narration:\n - "
+ "\n - ".join(overlaps)
)
cmd: list[str] = ["ffmpeg", "-y", "-loglevel", "warning", "-i", str(args.video)]
for it in items:
cmd += ["-i", str(args.audio_dir / it["wav"])]
filter_parts: list[str] = []
mix_inputs: list[str] = []
for n, it in enumerate(items, start=1):
delay_ms = max(0, it["videoTimeMs"])
label = f"a{n}"
# adelay needs one delay per channel; "all=1" applies the same delay
# to every channel, which is what we want for mono narration.
filter_parts.append(
f"[{n}:a]aresample=async=1,adelay={delay_ms}|{delay_ms}:all=1[{label}]"
)
mix_inputs.append(f"[{label}]")
mix = (
f"{''.join(mix_inputs)}amix=inputs={len(items)}"
f":duration=longest:dropout_transition=0:normalize=0[aout]"
)
filter_complex = ";".join(filter_parts + [mix])
cmd += [
"-filter_complex",
filter_complex,
"-map",
"0:v:0",
"-map",
"[aout]",
"-c:v",
"copy",
"-c:a",
"aac",
"-b:a",
"192k",
"-shortest",
"-movflags",
"+faststart",
str(args.out),
]
print(f"[mux] muxing {len(items)} narration cues into {args.out}", flush=True)
result = subprocess.run(cmd)
if result.returncode != 0:
print(f"[mux] ffmpeg exited {result.returncode}", file=sys.stderr)
return result.returncode
if args.replace:
args.out.replace(args.video)
print(f"[mux] replaced {args.video} with narrated copy", flush=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())