#!/usr/bin/env python3
"""Recoil Workspace FastAPI Server.

Serves the workspace frontend (static files), media files from project output
directories, and state API endpoints. The frontend polls these endpoints;
the MCP server reads/writes the same state.json file.

# Port allocation: Pre-Prod=8420, Production Console=8430, Workspace=8450

Usage:
    python3 workspace/server.py --project tartarus
    python3 workspace/server.py --project tartarus --port 8450

Endpoints:
    GET  /                          — Redirect to /workspace
    GET  /workspace                 — Serve index.html
    GET  /api/health                — Health check
    GET  /api/state                 — Current viewer state + selection
    POST /api/state/selection       — Update selection (from frontend click)
    POST /api/state/viewer          — Update viewer state (from frontend)
    GET  /api/shot/{project}/{shot_id} — Single shot detail
    GET  /api/pass/{project}/{pass_id} — Coverage pass detail
    GET  /api/activity/{project}    — In-flight generations from ops log
    GET  /media/{path:path}         — Serve any media file from projects root
    GET  /static/{path:path}        — Serve static frontend files
"""

import argparse
import json
import logging
import os
import re
import subprocess
import sys
from pathlib import Path

# ── Path setup ──────────────────────────────────────────────────
# CP-1 (2026-06-01): centralized through core.paths.
# Both roots are needed: CLAUDE_PROJECTS so `from recoil.X` resolves
# (Build D Phase 18.2 import flip), recoil/ so `from workspace.X` still
# resolves for the not-yet-flipped imports below.
_RECOIL_ROOT = Path(__file__).resolve().parent.parent
_PROJECTS_ROOT = _RECOIL_ROOT.parent
for _p in (_PROJECTS_ROOT, _RECOIL_ROOT):
    if str(_p) not in sys.path:
        sys.path.insert(0, str(_p))

from recoil.core.paths import ensure_pipeline_importable  # noqa: E402
ensure_pipeline_importable()                     # injects recoil/pipeline/ onto sys.path

from recoil.core.paths import projects_root, ProjectPaths
from recoil.core.project import get_project

try:
    from fastapi import FastAPI, Request
    from fastapi.responses import (
        FileResponse,
        HTMLResponse,
        JSONResponse,
        RedirectResponse,
    )
    from fastapi.staticfiles import StaticFiles
except ImportError:
    print("FastAPI not installed. Run: pip install fastapi uvicorn", file=sys.stderr)
    sys.exit(1)

from workspace import state as ws_state
from recoil.workspace import board as ws_board
from recoil.workspace import board_comments as ws_board_comments
from recoil.workspace.helpers import (
    get_store as _get_store,
    get_ops_log_path as _get_ops_log_path,
    shot_status_color as _shot_status_color,
)
from workspace import sidecar as ws_sidecar
from recoil.pipeline.core.cost import read_cost_from_record_safe

# ── Extracted domain modules (MF-5, Phase B 2026-04-30) ──────────
# Tree assembly, metadata index, and shot grouping live in workspace.tree.
# Coverage reduction and best-status live in workspace.coverage.
# Routes in this file delegate to those modules; Console v2's v2_dispatch.py
# imports them directly.
from recoil.workspace.tree import (
    PASS_PATTERN,
    SHOT_PATTERN,
    MEDIA_EXTENSIONS,
    parse_pass_filename,
    parse_shot_filename,
    normalize_shot_num,
    group_by_pass_anchors,
    group_flat_by_stem,
    group_episode_files_by_shot,
    build_metadata_index,
    scan_output_dir,
    build_project_tree,
    _SHOT_TOKEN_RE,  # noqa: F401  used by _extract_pass_segments below
)
from recoil.workspace.coverage import (
    STATUS_PRIORITY,
    best_status,
    coverage_summary_for_episode,
    recent_activity_for_episode,
)
from recoil.workspace._project_validation import _SAFE_PROJECT_RE, _validate_project
from recoil.workspace.routes import board as board_routes
from recoil.core.exceptions import (
    ExecutionStoreUnavailableError,
    MediaProbeError,
    SidecarCorruptError,
)
from recoil.pipeline._lib.sanctioned_fallbacks import (
    FallbackRecord,
    fire_sanctioned_fallback,
    register_sanctioned_fallback,
)
from recoil.pipeline._lib.take_keys import read_take_number


# Tenet 6: register the ffprobe-missing fallback so "no probe binary → no
# duration" is named + observable per the registry. Quality-neutral because
# the substitution (None duration) flows only into UI/telemetry, never into
# generation bytes.
register_sanctioned_fallback(
    FallbackRecord(
        name="ffprobe_missing_no_duration",
        justification=(
            "ffprobe binary not found on PATH (e.g. dev shell without "
            "ffmpeg). The duration probe returns None and the workspace "
            "displays the take without a duration field."
        ),
        quality_neutrality_argument=(
            "The substitution (None duration) flows ONLY into the workspace "
            "UI display. Generation pipelines never read duration from this "
            "probe — they read from the provider's RunResult metadata. "
            "Missing ffprobe cannot affect generation bytes."
        ),
        expected_substitution="None (in place of the parsed ffprobe duration)",
        introduced_in="Phase E.debug-R4",
    )
)

logging.basicConfig(level=logging.INFO, format="[workspace] %(levelname)s %(message)s")
log = logging.getLogger("workspace")

# ── App ─────────────────────────────────────────────────────────

app = FastAPI(title="Recoil Workspace", version="0.1.0")
app.include_router(board_routes.router)

import json as _json_module  # alias to avoid collision with any local json imports


@app.exception_handler(_json_module.JSONDecodeError)
async def _json_decode_error_handler(request: Request, exc):
    return JSONResponse({"error": "Malformed JSON"}, status_code=400)


_STATIC_DIR = Path(__file__).parent / "static"
# v2 layout: project tree explicitly ignores _history/ (archives, migration
# tarballs, debug grids). The scan_output_dir function in workspace/tree.py
# enforces this. The /media/{path} endpoint serves any path under
# projects_root() — including _history/ if requested directly by path. That
# is intentional: it lets users link to archived artifacts when needed, but
# the UI never enumerates them.
_DEFAULT_PROJECT: str = "tartarus"

# ── Tree cache (avoid rglob on every 3s poll) ─────────────────
import time as _time

_tree_cache: dict[str, dict] = {}
_tree_cache_mtime: dict[str, float] = {}
_TREE_CACHE_TTL = 5.0  # seconds
_board_cache: dict[str, dict] = {}
_board_cache_time: dict[str, float] = {}
_board_cache_mtime_key: dict[str, int] = {}
_BOARD_CACHE_TTL = 5.0  # seconds

# ── Recent feed cache ─────────────────────────────────────────
_recent_cache: dict[str, dict] = {}
_recent_cache_mtime: dict[str, float] = {}
_RECENT_CACHE_TTL = 5.0  # seconds


def _invalidate_caches(project: str):
    """Invalidate tree and recent caches for a project."""
    _tree_cache.pop(project, None)
    for key in list(_board_cache.keys()):
        if key.startswith(f"{project}:"):
            _board_cache.pop(key, None)
            _board_cache_time.pop(key, None)
            _board_cache_mtime_key.pop(key, None)
    for key in list(_recent_cache.keys()):
        if key.startswith(f"{project}:"):
            _recent_cache.pop(key, None)
            _recent_cache_mtime.pop(key, None)


# ── Routes: Static + Root ──────────────────────────────────────


@app.get("/")
async def root():
    return RedirectResponse(url="/workspace")


@app.get("/workspace")
async def workspace_page():
    index = _STATIC_DIR / "index.html"
    if not index.is_file():
        return HTMLResponse(
            "<h1>Workspace not built yet</h1><p>Run the build phases first.</p>",
            status_code=404,
        )
    return FileResponse(index, media_type="text/html")


# Mount static files
if _STATIC_DIR.is_dir():
    app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static")


# ── Routes: Health ─────────────────────────────────────────────


@app.get("/api/health")
async def health():
    project = ws_state.get_project() or _DEFAULT_PROJECT
    project_dir = projects_root() / project
    return JSONResponse(
        {
            "status": "ok",
            "project": project,
            "projects_root": str(projects_root()),
            "project_exists": project_dir.is_dir(),
        }
    )


@app.get("/api/convergence-status")
def convergence_status():
    """Return convergence health metrics for the workspace dashboard."""
    import yaml

    result = {
        "path_count_delta": None,
        "total_sentinel_functions": None,
        "pending_converge_targets": [],
        "recent_verdicts": [],
        "orphaned_deprecated": [],
    }

    # Path count state
    state_file = Path("recoil/architecture/.path_count_state.json")
    if state_file.exists():
        try:
            state = json.loads(state_file.read_text())
            result["total_sentinel_functions"] = sum(state.get("counts", {}).values())
        except Exception:
            pass

    # Manifest pending targets
    manifest_file = Path("recoil/architecture/ssot_manifest.yaml")
    if manifest_file.exists():
        try:
            manifest = yaml.safe_load(manifest_file.read_text())
            for cap_name, cap in manifest.get("capabilities", {}).items():
                deps = [d for d in cap.get("deprecated_paths", []) if " " not in d]
                if deps and cap.get("state") not in ("tombstoned",):
                    result["pending_converge_targets"].append({
                        "capability": cap_name,
                        "deprecated_count": len(deps),
                        "state": cap.get("state", "deprecated"),
                    })
        except Exception:
            pass

    # Recent verdicts (last 5)
    log_file = Path("recoil/architecture/auditor_log.jsonl")
    if log_file.exists():
        try:
            rows = [json.loads(line) for line in log_file.read_text().splitlines() if line.strip()]
            result["recent_verdicts"] = [
                {"verdict": r["verdict"], "timestamp": r["timestamp"], "reason": r.get("reason", "")[:100]}
                for r in rows[-5:]
            ]
        except Exception:
            pass

    return result


# ── Routes: State ──────────────────────────────────────────────


@app.get("/api/state")
async def get_state():
    state = ws_state.read_state()
    return JSONResponse(state)


def _tree_shot_ids(node: dict) -> set[str]:
    """Collect shot ids exposed by the workspace tree."""
    ids: set[str] = set()
    if isinstance(node, dict):
        shot_id = node.get("shot_id")
        if isinstance(shot_id, str) and shot_id:
            ids.add(shot_id)
        for child in node.get("children", []) or []:
            ids.update(_tree_shot_ids(child))
    return ids


_SHOT_ID_SELECTION_RE = re.compile(r"^EP\d{3}_SH\d+[A-Za-z]?(?:[~_][A-Za-z0-9]+)*$")


def _is_shot_id_selection(item: str) -> bool:
    return bool(_SHOT_ID_SELECTION_RE.match(item))


def _accepted_selection_subset(shot_ids: list[str], project: str | None) -> list[str]:
    """Validate tree shot ids while allowing selected media paths.

    The state write below is flock-protected by ws_state.write_state(), but
    this route's read-modify-write sequence is not one single locked transaction.
    """
    if not project:
        return shot_ids
    if not any(isinstance(item, str) and "/" not in item for item in shot_ids):
        return [item for item in shot_ids if isinstance(item, str)]

    tree_payload = build_project_tree(project)
    valid_shot_ids = _tree_shot_ids(tree_payload.get("tree", {}))
    accepted: list[str] = []
    for item in shot_ids:
        if not isinstance(item, str):
            continue
        if "/" in item:
            accepted.append(item)
        elif _is_shot_id_selection(item):
            if item in valid_shot_ids:
                accepted.append(item)
        else:
            accepted.append(item)
    return accepted


@app.post("/api/state/selection")
async def update_selection(request: Request):
    body = await request.json()
    shot_ids = body.get("shot_ids", [])
    state = ws_state.read_state()
    project = body.get("project") or state.get("project")
    accepted = _accepted_selection_subset(shot_ids, project)
    state["selection"] = accepted
    if len(accepted) == 1:
        state["browse_tab_active"] = False
    ws_state.write_state(state)
    return JSONResponse({"selection": accepted})


@app.post("/api/state/viewer")
async def update_viewer(request: Request):
    body = await request.json()
    ws_state.set_viewer_state(
        shot_id=body.get("shot_id"),
        take_index=body.get("take_index"),
        file_path=body.get("file_path"),
        media_type=body.get("media_type"),
        context=body.get("context"),
    )
    return JSONResponse(ws_state.get_viewer_state())


# ── Routes: Shots (legacy, kept for compatibility) ────────────

# ── Routes: Filesystem Tree (source of truth) ─────────────────
# Filename parsing constants/patterns + grouping strategy registry live in
# workspace.tree (extracted in MF-5 / Phase B 2026-04-30). Backward-compat
# aliases for the old underscore names appear at the bottom of this module.

def _run_scene_detect(
    video_path: Path,
    threshold: float,
) -> list[float]:
    """Return detected scene-cut timestamps (seconds) via PySceneDetect.

    Only the cut *points* are returned — not full scene tuples.
    Returns an empty list if detection fails or finds zero scenes.
    """
    try:
        from scenedetect import open_video, SceneManager
        from scenedetect.detectors import ContentDetector
    except ImportError:
        log.warning("scenedetect not installed — install via pip install scenedetect")
        return []

    try:
        video = open_video(str(video_path))
        sm = SceneManager()
        sm.add_detector(ContentDetector(threshold=threshold))
        sm.detect_scenes(video)
        scenes = sm.get_scene_list()
    except Exception as e:
        log.warning(
            "PySceneDetect failed for %s at threshold %.1f: %s",
            video_path.name,
            threshold,
            e,
        )
        return []

    cut_points: list[float] = []
    for i, (start, _end) in enumerate(scenes):
        if i == 0:
            continue
        cut_points.append(float(start.get_seconds()))
    return cut_points


def _align_detected_to_expected(
    detected: list[float],
    intended_durations: list[float],
    pass_duration: float,
    tolerance_s: float = 0.5,
) -> dict:
    """Align detected cuts to planner-intended boundaries.

    Expected timestamps are cumulative sums of `intended_durations` EXCLUDING the
    final cumulative sum (which equals the pass end, not a cut). For durations
    [3, 5, 2, 4] the expected cuts are [3, 8, 10].

    Greedy matching: iterate expected_ts in order. For each expected boundary,
    find the nearest unclaimed detected cut within `tolerance_s`. Ties broken
    by earlier detected timestamp (deterministic).

    Args:
        detected:           Detected cut timestamps from PySceneDetect.
        intended_durations: Planner-intended per-segment durations.
        pass_duration:      Total pass duration (informational; used for clamping
                            if needed — currently unused beyond documentation).
        tolerance_s:        Maximum distance a detected cut can be from an
                            expected boundary and still count as aligned.

    Returns:
        {
            "aligned_cuts":      list[float],  # successfully aligned detected timestamps,
                                               # length = expected_count - len(missed_boundaries).
                                               # Callers may need to fall back to expected_ts
                                               # for missed slots when segmenting.
            "model_added_cuts":  list[float],  # detected cuts that did not align (sorted).
            "alignment_score":   float,        # aligned_count / expected_count, or 1.0
                                               # if expected_count == 0 (div-by-zero guard).
            "missed_boundaries": list[float],  # expected_ts values that had no detected
                                               # cut within tolerance_s.
        }
    """
    # expected_ts = cumulative sums excluding the final sum (N-1 expected cuts).
    expected_ts: list[float] = []
    running = 0.0
    for d in intended_durations:
        running += float(d)
        expected_ts.append(running)
    if expected_ts:
        expected_ts.pop()  # drop final sum = pass end
    expected_count = len(expected_ts)
    _ = pass_duration  # reserved for future clamping; keeps signature stable

    # Zero expected boundaries → everything detected is "model-added"; score is
    # vacuously 1.0 (prevents div-by-zero in callers).
    if expected_count == 0:
        return {
            "aligned_cuts": [],
            "model_added_cuts": sorted(detected),
            "alignment_score": 1.0,
            "missed_boundaries": [],
        }

    claimed = [False] * len(detected)
    aligned_cuts: list[float] = []
    missed_boundaries: list[float] = []

    for exp in expected_ts:
        best_idx = -1
        best_delta = tolerance_s + 1.0  # initialise outside the window
        best_ts = float("inf")
        for i, d in enumerate(detected):
            if claimed[i]:
                continue
            delta = abs(float(d) - exp)
            if delta > tolerance_s:
                continue
            # Tie-break: earlier detected timestamp wins when deltas are equal.
            if delta < best_delta or (delta == best_delta and float(d) < best_ts):
                best_idx = i
                best_delta = delta
                best_ts = float(d)
        if best_idx >= 0:
            claimed[best_idx] = True
            aligned_cuts.append(float(detected[best_idx]))
        else:
            missed_boundaries.append(exp)

    model_added_cuts = sorted(
        float(detected[i]) for i in range(len(detected)) if not claimed[i]
    )
    aligned_count = len(aligned_cuts)
    alignment_score = aligned_count / expected_count

    return {
        "aligned_cuts": aligned_cuts,
        "model_added_cuts": model_added_cuts,
        "alignment_score": alignment_score,
        "missed_boundaries": missed_boundaries,
    }


_THRESHOLD_LADDER_DOWN = [27.0, 24.0, 22.0, 20.0]
# Backward-compat fallback only. Fires ONLY when `intended_durations is None`
# in `_detect_cuts_with_retry` — callers that have planner intent use
# `_align_detected_to_expected` instead (preserves model "happy accidents"
# as additional coverage rather than suppressing them via higher thresholds).
_THRESHOLD_LADDER_UP = [27.0, 30.0, 35.0, 40.0]


def _detect_cuts_with_retry(
    video_path: Path,
    expected_cuts: int,
    intended_durations: list[float] | None = None,
) -> dict:
    """Run PySceneDetect and reconcile the detected count against expectation.

    Three branches:
      - detected == expected: return converged immediately.
      - detected <  expected: descend the DOWN ladder (existing behaviour).
      - detected >  expected: if `intended_durations` is provided, align via
        `_align_detected_to_expected` and preserve extras as "model-added".
        Otherwise fall back to the guarded UP ladder (backward compat).

    Returns:
        {
          "detected_cuts": N,
          "cut_points": [float, ...],       # aligned cuts on the aligned path;
                                            # best-effort picks on ladder paths.
          "threshold_used": float,
          "converged": bool,
          "history": [{"threshold": float, "cuts": int}, ...],
          "model_added_cuts": [float, ...],  # only on the aligned over-detection path
          "alignment_score":  float | None,  # only on the aligned over-detection path
        }
    """
    history: list[dict] = []
    initial = _run_scene_detect(video_path, 27.0)
    history.append({"threshold": 27.0, "cuts": len(initial)})
    if len(initial) == expected_cuts:
        return {
            "detected_cuts": len(initial),
            "cut_points": initial,
            "threshold_used": 27.0,
            "converged": True,
            "history": history,
            "model_added_cuts": [],
            "alignment_score": None,
        }

    # Over-detection path: if the planner told us the intended boundaries,
    # align rather than suppress. This preserves "happy accidents" the model
    # introduced as additional coverage.
    if len(initial) > expected_cuts and intended_durations is not None:
        pass_duration = sum(float(d) for d in intended_durations)
        alignment = _align_detected_to_expected(
            initial, intended_durations, pass_duration
        )

        # Zero-boundary special case (single-segment pass): expected_count == 0.
        if len(intended_durations) <= 1:
            return {
                "detected_cuts": 0,
                "cut_points": [],
                "threshold_used": 27.0,
                "converged": True,
                "history": history,
                "model_added_cuts": list(initial),
                "alignment_score": 1.0,
            }

        if alignment["alignment_score"] >= 0.8:
            return {
                "detected_cuts": len(alignment["aligned_cuts"]),
                "cut_points": alignment["aligned_cuts"],
                "threshold_used": 27.0,
                "converged": True,
                "history": history,
                "model_added_cuts": alignment["model_added_cuts"],
                "alignment_score": alignment["alignment_score"],
            }

        # Alignment below 0.8 → surface to the human-gate path via converged=False.
        return {
            "detected_cuts": len(initial),
            "cut_points": initial,
            "threshold_used": 27.0,
            "converged": False,
            "history": history,
            "model_added_cuts": alignment["model_added_cuts"],
            "alignment_score": alignment["alignment_score"],
        }

    if len(initial) > expected_cuts:
        # Backward-compat fallback: no planner intent was provided, so we
        # cannot align. Use the guarded UP ladder to try to suppress spurious
        # cuts. New callers should always pass `intended_durations` and take
        # the aligned path above — this branch is the single sanctioned
        # fallback for legacy callsites without planner intent.
        log.warning(
            "alignment skipped for %s — intended_durations not provided",
            video_path.name,
        )
        ladder = _THRESHOLD_LADDER_UP
    else:
        ladder = _THRESHOLD_LADDER_DOWN

    best = {"threshold": 27.0, "cut_points": initial}
    for thr in ladder[1:]:
        pts = _run_scene_detect(video_path, thr)
        history.append({"threshold": thr, "cuts": len(pts)})
        if len(pts) == expected_cuts:
            return {
                "detected_cuts": len(pts),
                "cut_points": pts,
                "threshold_used": thr,
                "converged": True,
                "history": history,
                "model_added_cuts": [],
                "alignment_score": None,
            }
        if abs(len(pts) - expected_cuts) < abs(len(best["cut_points"]) - expected_cuts):
            best = {"threshold": thr, "cut_points": pts}

    return {
        "detected_cuts": len(best["cut_points"]),
        "cut_points": best["cut_points"],
        "threshold_used": best["threshold"],
        "converged": False,
        "history": history,
        "model_added_cuts": [],
        "alignment_score": None,
    }


def _probe_duration_seconds(video_path: Path) -> float | None:
    """Use ffprobe to return the video duration in seconds.

    Tenet 6: distinguish "ffprobe binary missing" (sanctioned fallback to
    None — caller renders a "duration unknown" UI) from "ffprobe ran and
    failed on this media" (real probe failure → MediaProbeError so the
    caller can surface it instead of pretending the file is 0s long).
    """
    try:
        out = subprocess.run(
            [
                "ffprobe",
                "-v",
                "error",
                "-show_entries",
                "format=duration",
                "-of",
                "default=noprint_wrappers=1:nokey=1",
                str(video_path),
            ],
            capture_output=True,
            timeout=15,
            text=True,
        )
    except FileNotFoundError:
        # ffprobe binary missing — sanctioned fallback (named, observable,
        # quality-neutral: no probe means no duration field, no generation
        # bytes affected). Registered below at module load.
        fire_sanctioned_fallback(
            "ffprobe_missing_no_duration",
            video_path=str(video_path),
        )
        return None
    except (subprocess.SubprocessError, OSError) as e:
        # PermissionError, "Exec format error", etc. all subclass OSError;
        # treat them as probe failure (Tenet 6 raise) rather than crashing.
        log.warning("ffprobe failed on %s (%s)", video_path, e)
        raise MediaProbeError(
            str(video_path), probe_kind="duration", message=str(e)
        ) from e
    if out.returncode == 0 and out.stdout.strip():
        try:
            return float(out.stdout.strip())
        except ValueError as e:
            log.warning(
                "ffprobe produced non-numeric duration for %s: %r",
                video_path, out.stdout,
            )
            raise MediaProbeError(
                str(video_path), probe_kind="duration", message=str(e)
            ) from e
    if out.returncode != 0:
        log.warning(
            "ffprobe non-zero (%d) on %s: %s",
            out.returncode, video_path, out.stderr.strip(),
        )
        raise MediaProbeError(
            str(video_path),
            probe_kind="duration",
            message=f"ffprobe rc={out.returncode}: {out.stderr.strip()}",
        )
    return None


def _extract_pass_segments(
    pass_video_path: Path,
    pass_id: str,
    segment_shot_ids: list[str],
    take_num: int,
    output_dir: Path,
    confirmed_timestamps: dict | None = None,
    intended_durations: list[float] | None = None,
) -> tuple[list[Path], dict]:
    """Extract per-shot segments via PySceneDetect auto-gate.

    If `confirmed_timestamps` is provided, skip detection and trust the map.

    When the model produces MORE cuts than expected and `intended_durations`
    is supplied, detected cuts are aligned to the planner-intended boundaries
    and the extras are preserved as `model-added` segments (alternate
    coverage). Planner-intended segments keep their natural bounds; each
    model-added cut at time `t` emits an additional segment from `t` to the
    next adjacent aligned boundary (or pass end). The two classes therefore
    OVERLAP — by design.

    Returns:
        (extracted_paths, meta)
      meta = {
        "extraction_method": "auto_verified" | "human_confirmed" | None,
        "scene_detect_threshold": float | None,
        "scene_detection_raw": [float, ...],
        "cuts_diverged": bool,
        "confirmed_timestamps": {...},
        "alignment_score": float | None,
        "model_added_count": int,
        "segment_classes": {key: "planner-intended" | "model-added"},
        "parent_shot_ids": {key: str | None},
        "alignment_deltas": {key: float | None},
      }

    On mismatch WITHOUT confirmed_timestamps, returns ([], meta) and the
    caller writes PassStore status = 'segmentation_review'.
    """
    import shutil as _shutil

    expected_cuts = max(0, len(segment_shot_ids) - 1)
    alignment_score: float | None = None
    model_added_cuts: list[float] = []
    segment_classes: dict[str, str] = {}
    parent_shot_ids: dict[str, str | None] = {}
    alignment_deltas: dict[str, float | None] = {}

    if confirmed_timestamps:
        cuts_diverged = False
        detection_raw: list[float] = []
        threshold_used = None
        timestamps_map = dict(confirmed_timestamps)
        extraction_method = "human_confirmed"
        # Classify every confirmed segment as planner-intended (the human
        # operator IS the planner at this point).
        for key in timestamps_map.keys():
            segment_classes[str(key)] = "planner-intended"
            parent_shot_ids[str(key)] = None
            alignment_deltas[str(key)] = None
    else:
        detect_result = _detect_cuts_with_retry(
            pass_video_path,
            expected_cuts,
            intended_durations=intended_durations,
        )
        detection_raw = detect_result["cut_points"]
        threshold_used = detect_result["threshold_used"]
        cuts_diverged = not detect_result["converged"]
        alignment_score = detect_result.get("alignment_score")
        model_added_cuts = list(detect_result.get("model_added_cuts") or [])

        if cuts_diverged:
            return [], {
                "extraction_method": None,
                "scene_detect_threshold": threshold_used,
                "scene_detection_raw": detection_raw,
                "cuts_diverged": True,
                "confirmed_timestamps": {},
                "alignment_score": alignment_score,
                "model_added_count": len(model_added_cuts),
                "segment_classes": {},
                "parent_shot_ids": {},
                "alignment_deltas": {},
            }

        try:
            duration = _probe_duration_seconds(pass_video_path) or 0.0
        except MediaProbeError:
            # Probe failed — extraction can't proceed without duration. Re-raise
            # so the outer caller (extraction route) surfaces the real failure
            # instead of silently emitting zero-length segments.
            raise

        # Build expected_ts (cumulative sums excluding final) to serve as fall-
        # back boundaries for any missed planner-intended slots and to compute
        # per-slot alignment deltas.
        expected_ts: list[float] = []
        if intended_durations is not None:
            running = 0.0
            for d in intended_durations:
                running += float(d)
                expected_ts.append(running)
            if expected_ts:
                expected_ts.pop()

        # Determine the ordered list of planner-intended boundaries that the
        # segment emission loop will use. `aligned_boundaries[i]` is the
        # boundary BETWEEN segment i and segment i+1. Length = expected_cuts.
        aligned_boundaries: list[float] = []
        if intended_durations is not None and expected_ts:
            # Re-run the alignment to recover per-slot fill/miss info — the
            # upstream `_detect_cuts_with_retry` return does not carry the
            # per-expected-boundary mapping we need for deltas here.
            alignment = _align_detected_to_expected(
                detection_raw, intended_durations, duration
            )
            missed = set(alignment["missed_boundaries"])
            aligned_idx = 0
            aligned_list = alignment["aligned_cuts"]
            for exp in expected_ts:
                if exp in missed:
                    aligned_boundaries.append(exp)  # fallback to planner intent
                    alignment_deltas[str(len(aligned_boundaries) - 1)] = None
                else:
                    if aligned_idx < len(aligned_list):
                        act = float(aligned_list[aligned_idx])
                        aligned_idx += 1
                        aligned_boundaries.append(act)
                        alignment_deltas[str(len(aligned_boundaries) - 1)] = abs(
                            act - exp
                        )
                    else:
                        aligned_boundaries.append(exp)
                        alignment_deltas[str(len(aligned_boundaries) - 1)] = None
        else:
            # No planner intent → detection_raw is already the cut list (count
            # matches expected_cuts per the converged path).
            aligned_boundaries = list(detection_raw)

        # Planner-intended segments.
        boundaries = [0.0] + aligned_boundaries + [duration]
        timestamps_map: dict = {}
        for i in range(len(segment_shot_ids)):
            start = boundaries[i] if i < len(boundaries) else 0.0
            end = boundaries[i + 1] if (i + 1) < len(boundaries) else duration
            timestamps_map[str(i)] = {"start": start, "end": end}
            segment_classes[str(i)] = "planner-intended"
            parent_shot_ids[str(i)] = None
            alignment_deltas.setdefault(str(i), None)

        # Model-added segments. Each model-added cut `t` emits an interval
        # [t, next_adjacent_boundary]. The next adjacent boundary is the
        # nearest aligned_boundary strictly greater than t, or `duration`.
        # The parent shot is the planner-intended segment that contains `t`.
        if model_added_cuts:
            # Reject extremely unusual passes rather than run out of letters.
            if len(model_added_cuts) > 26:
                return [], {
                    "extraction_method": None,
                    "scene_detect_threshold": threshold_used,
                    "scene_detection_raw": detection_raw,
                    "cuts_diverged": True,
                    "confirmed_timestamps": {},
                    "alignment_score": alignment_score,
                    "model_added_count": len(model_added_cuts),
                    "segment_classes": {},
                    "parent_shot_ids": {},
                    "alignment_deltas": {},
                }

            for j, t in enumerate(model_added_cuts):
                # Find parent planner-segment (the segment that contains t).
                parent_i = 0
                for i in range(len(segment_shot_ids)):
                    seg_start = boundaries[i]
                    seg_end = boundaries[i + 1]
                    if seg_start <= t < seg_end:
                        parent_i = i
                        break
                # Next adjacent aligned boundary after t, else duration.
                next_bound = duration
                for b in aligned_boundaries:
                    if b > t:
                        next_bound = b
                        break
                seg_start = float(t)
                seg_end = float(next_bound)
                if seg_end <= seg_start:
                    continue

                key = f"model_added_{j}"
                timestamps_map[key] = {"start": seg_start, "end": seg_end}
                segment_classes[key] = "model-added"
                parent_shot_id = segment_shot_ids[parent_i]
                parent_shot_ids[key] = parent_shot_id
                alignment_deltas[key] = None

        extraction_method = "auto_verified"

    # Parse counter + tag from pass_id for output naming.
    # `_SHOT_TOKEN_RE` is the module-level constant defined alongside
    # `_PASS_PATTERN` in server.py (added by Phase 1b).
    parts = pass_id.split("_")
    counter = parts[2] if len(parts) >= 3 else "000"
    tag_start = 3
    while tag_start < len(parts) and _SHOT_TOKEN_RE.match(parts[tag_start]):
        tag_start += 1
    semantic_tag = "_".join(parts[tag_start:]) or "UNTAGGED"

    def _run_ffmpeg_cut(start: float, duration_s: float, out_path: Path) -> bool:
        """Extract [start, start+duration_s] from the pass video to out_path.

        Returns True on success (or if the file already exists).
        """
        if out_path.exists():
            return True
        cmd = [
            "ffmpeg",
            "-y",
            "-ss",
            str(start),
            "-t",
            str(duration_s),
            "-i",
            str(pass_video_path),
            "-c:v",
            "libx264",
            "-crf",
            "18",
            "-c:a",
            "aac",
            str(out_path),
        ]
        try:
            result = subprocess.run(cmd, capture_output=True, timeout=120)
            return result.returncode == 0 and out_path.exists()
        except Exception as e:
            log.warning("ffmpeg error for %s: %s", out_path.name, e)
            return False

    # Single-shot pass with zero expected cuts -> file copy the whole video as
    # the planner-intended segment, then emit every model-added cut as an
    # additional `~a` / `~b` / ... sub-segment. The spec requires:
    # "Single-segment pass with happy-accident cuts is always auto-accepted;
    # the whole pass body is one planner-intended segment and every detected
    # cut becomes a model-added sub-segment."
    if expected_cuts == 0 and len(segment_shot_ids) == 1:
        shot_id = segment_shot_ids[0]
        shot_num = _normalize_shot_num(shot_id)
        if not shot_num:
            return [], {
                "extraction_method": extraction_method,
                "scene_detect_threshold": threshold_used,
                "scene_detection_raw": detection_raw,
                "cuts_diverged": cuts_diverged,
                "confirmed_timestamps": timestamps_map,
                "alignment_score": alignment_score,
                "model_added_count": len(model_added_cuts),
                "segment_classes": segment_classes,
                "parent_shot_ids": parent_shot_ids,
                "alignment_deltas": alignment_deltas,
            }
        out_name = (
            f"shot_{shot_num}_FROM_PASS_{counter}_{semantic_tag}_take{take_num}.mp4"
        )
        out_path = output_dir / out_name
        if not out_path.exists():
            _shutil.copy2(str(pass_video_path), str(out_path))
        extracted_single: list[Path] = [out_path]

        # Emit model-added segments. Use existing `timestamps_map` /
        # `segment_classes` / `parent_shot_ids` entries populated upstream
        # (lines ~776-781). If >26 model-added cuts, fall through to
        # cuts_diverged per spec (defensive — upstream already rejects >26).
        if model_added_cuts:
            if len(model_added_cuts) > 26:
                return [], {
                    "extraction_method": None,
                    "scene_detect_threshold": threshold_used,
                    "scene_detection_raw": detection_raw,
                    "cuts_diverged": True,
                    "confirmed_timestamps": {},
                    "alignment_score": alignment_score,
                    "model_added_count": len(model_added_cuts),
                    "segment_classes": {},
                    "parent_shot_ids": {},
                    "alignment_deltas": {},
                }
            try:
                pass_duration = _probe_duration_seconds(pass_video_path)
            except MediaProbeError as e:
                log.warning(
                    "model-added segment path: probe failed for %s — "
                    "skipping emission (%s)",
                    pass_id, e,
                )
                pass_duration = None
            if pass_duration is None:
                log.warning(
                    "Could not probe duration for %s; skipping model-added "
                    "segment emission on single-shot pass",
                    pass_id,
                )
            else:
                sorted_cuts = sorted(float(t) for t in model_added_cuts)
                for letter_idx, t in enumerate(sorted_cuts):
                    # Next adjacent boundary is the next model-added cut,
                    # else pass duration.
                    if letter_idx + 1 < len(sorted_cuts):
                        next_bound = sorted_cuts[letter_idx + 1]
                    else:
                        next_bound = pass_duration
                    seg_start = float(t)
                    seg_end = float(next_bound)
                    if seg_end <= seg_start:
                        continue
                    letter = chr(ord("a") + letter_idx)
                    key = f"model_added_{letter_idx}"
                    # Populate sidecar-facing metadata for this segment. The
                    # general path writes these in the else branch (~776-781),
                    # but the confirmed_timestamps path leaves them empty —
                    # set them here unconditionally so _write_segment_sidecars
                    # sees the right class / parent / delta regardless of
                    # which upstream branch we came through.
                    timestamps_map[key] = {"start": seg_start, "end": seg_end}
                    segment_classes[key] = "model-added"
                    parent_shot_ids[key] = shot_id
                    alignment_deltas[key] = None
                    sub_name = (
                        f"shot_{shot_num}~{letter}_FROM_PASS_{counter}_"
                        f"{semantic_tag}_take{take_num}.mp4"
                    )
                    sub_path = output_dir / sub_name
                    if _run_ffmpeg_cut(seg_start, seg_end - seg_start, sub_path):
                        extracted_single.append(sub_path)
                    else:
                        log.warning(
                            "ffmpeg failed for model-added segment %d of %s",
                            letter_idx,
                            pass_id,
                        )

        return extracted_single, {
            "extraction_method": extraction_method,
            "scene_detect_threshold": threshold_used,
            "scene_detection_raw": detection_raw,
            "cuts_diverged": False,
            "confirmed_timestamps": timestamps_map,
            "alignment_score": alignment_score,
            "model_added_count": len(model_added_cuts),
            "segment_classes": segment_classes,
            "parent_shot_ids": parent_shot_ids,
            "alignment_deltas": alignment_deltas,
        }

    extracted: list[Path] = []

    # Planner-intended segments.
    for i, shot_id in enumerate(segment_shot_ids):
        ts = timestamps_map.get(str(i)) or timestamps_map.get(i) or {}
        start = float(ts.get("start", 0))
        end = float(ts.get("end", 0))
        duration = end - start
        if duration <= 0:
            log.warning("Zero-duration segment %d of %s", i, pass_id)
            continue
        shot_num = _normalize_shot_num(shot_id)
        if not shot_num:
            continue
        out_name = (
            f"shot_{shot_num}_FROM_PASS_{counter}_{semantic_tag}_take{take_num}.mp4"
        )
        out_path = output_dir / out_name
        if _run_ffmpeg_cut(start, duration, out_path):
            extracted.append(out_path)
        else:
            log.warning("ffmpeg failed for segment %d of %s", i, pass_id)

    # Model-added segments — emitted with a `~a` / `~b` / ... suffix on the
    # parent shot number (one letter per occurrence within the same parent
    # segment). The `~` prefix does NOT match `_SHOT_TOKEN_RE`, keeping
    # these files cleanly distinguishable from hand-authored `SH33A` takes.
    per_parent_letter: dict[int, int] = {}
    for j, t in enumerate(model_added_cuts):
        key = f"model_added_{j}"
        ts = timestamps_map.get(key)
        if not ts:
            continue
        parent_shot_id = parent_shot_ids.get(key)
        if not parent_shot_id:
            continue
        # Determine parent index from segment_shot_ids order.
        try:
            parent_i = segment_shot_ids.index(parent_shot_id)
        except ValueError:
            continue
        parent_shot_num = _normalize_shot_num(parent_shot_id)
        if not parent_shot_num:
            continue
        letter_idx = per_parent_letter.get(parent_i, 0)
        per_parent_letter[parent_i] = letter_idx + 1
        letter = chr(ord("a") + letter_idx)
        start = float(ts.get("start", 0))
        end = float(ts.get("end", 0))
        duration = end - start
        if duration <= 0:
            continue
        out_name = (
            f"shot_{parent_shot_num}~{letter}_FROM_PASS_{counter}_"
            f"{semantic_tag}_take{take_num}.mp4"
        )
        out_path = output_dir / out_name
        if _run_ffmpeg_cut(start, duration, out_path):
            extracted.append(out_path)
        else:
            log.warning("ffmpeg failed for model-added segment %d of %s", j, pass_id)

    return extracted, {
        "extraction_method": extraction_method,
        "scene_detect_threshold": threshold_used,
        "scene_detection_raw": detection_raw,
        "cuts_diverged": cuts_diverged,
        "confirmed_timestamps": timestamps_map,
        "alignment_score": alignment_score,
        "model_added_count": len(model_added_cuts),
        "segment_classes": segment_classes,
        "parent_shot_ids": parent_shot_ids,
        "alignment_deltas": alignment_deltas,
    }


def _write_segment_sidecars(
    extracted_paths: list[Path],
    pass_id: str,
    segment_shot_ids: list[str],
    segment_timestamps: dict,
    take_num: int,
    source_video_name: str,
    pass_record: dict,
    segment_classes: dict | None = None,
    parent_shot_ids: dict | None = None,
    alignment_deltas: dict | None = None,
):
    """Write sidecar JSON for each extracted segment.

    Recognises two filename shapes:
      - Planner-intended: ``shot_{num}_FROM_PASS_...`` → segment_index = i.
      - Model-added:      ``shot_{num}~{letter}_FROM_PASS_...`` → keyed by
        the model_added entry in the returned meta dicts.
    """
    total_segments = len(segment_shot_ids)
    pass_cost = read_cost_from_record_safe(pass_record)
    model = None
    for take in pass_record.get("takes", []):
        if read_take_number(take) == take_num:
            model = take.get("model")
            break

    segment_classes = segment_classes or {}
    parent_shot_ids = parent_shot_ids or {}
    alignment_deltas = alignment_deltas or {}

    for path in extracted_paths:
        # Model-added filenames contain `~<letter>_FROM_`. Find the matching
        # model_added_j entry in segment_classes by comparing parent_shot and
        # the letter-ordered occurrence.
        model_added_match = re.search(r"^shot_(\d{1,4}[a-z]?)~([a-z])_FROM_", path.name)
        if model_added_match:
            parent_num = model_added_match.group(1)
            letter = model_added_match.group(2)
            letter_idx = ord(letter) - ord("a")

            # Find the model_added_{j} key whose parent maps to this parent_num
            # and whose order within the parent matches letter_idx.
            seen_per_parent: dict[str, int] = {}
            matched_key: str | None = None
            for key, cls in segment_classes.items():
                if cls != "model-added":
                    continue
                parent_shot = parent_shot_ids.get(key)
                if not parent_shot:
                    continue
                parent_shot_num = _normalize_shot_num(parent_shot)
                if parent_shot_num != parent_num:
                    continue
                idx = seen_per_parent.get(parent_num, 0)
                seen_per_parent[parent_num] = idx + 1
                if idx == letter_idx:
                    matched_key = key
                    break
            if not matched_key:
                continue
            ts = segment_timestamps.get(matched_key) or {}
            start = float(ts.get("start", 0))
            end = float(ts.get("end", 0))
            parent_shot = parent_shot_ids.get(matched_key)

            sidecar_data = ws_sidecar.ensure_sidecar(path)
            sidecar_data.update(
                {
                    "source": "pass_extraction",
                    "source_type": "pass_segment",
                    "source_pass_id": pass_id,
                    "source_shot_id": parent_shot,
                    "source_take": take_num,
                    "segment_index": None,
                    "segment_count": total_segments,
                    "segment_timestamp_start": start,
                    "segment_timestamp_end": end,
                    "source_video": source_video_name,
                    "status": "candidate",
                    "segment_class": "model-added",
                    "parent_shot_id": parent_shot,
                    "alignment_delta_s": None,
                }
            )
            if model:
                sidecar_data["model"] = model
            # Model-added segments share the pass cost via the same per-segment
            # denominator as planner-intended for rough attribution.
            if pass_cost and total_segments > 0:
                sidecar_data["cost_usd_segment"] = round(pass_cost / total_segments, 4)
            ws_sidecar.write_sidecar(path, sidecar_data)
            continue

        # Planner-intended sidecar path.
        for i, shot_id in enumerate(segment_shot_ids):
            shot_num = _normalize_shot_num(shot_id)
            if shot_num and f"shot_{shot_num}_FROM_" in path.name:
                ts_key = str(i)
                ts = segment_timestamps.get(ts_key) or segment_timestamps.get(i, {})
                start = float(ts.get("start", 0))
                end = float(ts.get("end", 0))

                sidecar_data = ws_sidecar.ensure_sidecar(path)
                sidecar_data.update(
                    {
                        "source": "pass_extraction",
                        "source_type": "pass_segment",
                        "source_pass_id": pass_id,
                        "source_shot_id": shot_id,
                        "source_take": take_num,
                        "segment_index": i,
                        "segment_count": total_segments,
                        "segment_timestamp_start": start,
                        "segment_timestamp_end": end,
                        "source_video": source_video_name,
                        "status": "candidate",
                        "segment_class": segment_classes.get(
                            ts_key, "planner-intended"
                        ),
                        "parent_shot_id": None,
                        "alignment_delta_s": alignment_deltas.get(ts_key),
                    }
                )
                if model:
                    sidecar_data["model"] = model
                if pass_cost and total_segments > 0:
                    sidecar_data["cost_usd_segment"] = round(
                        pass_cost / total_segments,
                        4,
                    )

                ws_sidecar.write_sidecar(path, sidecar_data)
                break


def _read_video_sidecar(video_file: Path) -> dict:
    sidecar_path = video_file.parent / f"{video_file.name}.json"
    if not sidecar_path.is_file():
        return {}
    try:
        return json.loads(sidecar_path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError):
        return {}


def _shot_num_set(shot_ids: list[str]) -> set[str]:
    nums = {normalize_shot_num(sid) for sid in shot_ids}
    nums.discard(None)
    return nums


def _legacy_pass_id_from_parsed(parsed: dict) -> str | None:
    tag = parsed.get("semantic_tag")
    if not tag:
        return None
    return (
        f"{parsed['episode']}_PASS_{parsed['counter']}_"
        f"SH{parsed['shot_list']}_{tag}"
    )


def _resolve_pass_record_for_video(
    *,
    project: str,
    video_file: Path,
    parsed: dict,
    records: dict[str, dict],
) -> tuple[str | None, dict | None]:
    sidecar = _read_video_sidecar(video_file)
    provenance = sidecar.get("provenance") if isinstance(sidecar, dict) else None
    grouping = provenance.get("grouping") if isinstance(provenance, dict) else None
    if isinstance(grouping, dict):
        source_pass_id = grouping.get("source_pass_id")
        if isinstance(source_pass_id, str) and source_pass_id in records:
            return source_pass_id, records[source_pass_id]
    sidecar_pass_id = sidecar.get("pass_id") if isinstance(sidecar, dict) else None
    if isinstance(sidecar_pass_id, str) and sidecar_pass_id in records:
        return sidecar_pass_id, records[sidecar_pass_id]

    legacy_pass_id = _legacy_pass_id_from_parsed(parsed)
    if legacy_pass_id and legacy_pass_id in records:
        return legacy_pass_id, records[legacy_pass_id]

    try:
        rel_path = str(video_file.relative_to(projects_root() / project))
    except ValueError:
        rel_path = None
    if rel_path:
        for pass_id, record in records.items():
            if record.get("video_path") == rel_path:
                return pass_id, record

    parsed_shots = _shot_num_set(parsed.get("shot_ids") or [])
    for pass_id, record in records.items():
        if parsed.get("counter"):
            m = re.match(r"^EP\d+_PASS_(\d{3})", pass_id)
            if m and m.group(1) != parsed["counter"]:
                continue
        if parsed_shots and _shot_num_set(record.get("segment_shot_ids") or []) == parsed_shots:
            return pass_id, record

    return None, None


def _maybe_extract_passes(project: str):
    """For every pass video on disk: run PySceneDetect (or trust confirmed
    timestamps if present), extract segments, update PassStore, mark done."""
    # Mode guard: coverage-segment extraction is a microdrama concept.
    if not get_project(project).auto_extracts_segments:
        return

    from recoil.execution.pass_store import PassStore

    project_dir = projects_root() / project
    # v2 layout: pass videos live under renders/ep_*/ (output/video/ deleted by
    # migration Step 3i). Per-episode iteration is otherwise unchanged.
    renders_root = project_dir / "renders"

    for ep_dir in renders_root.glob("ep_*"):
        if not ep_dir.is_dir():
            continue
        for video_file in ep_dir.iterdir():
            if not video_file.is_file():
                continue
            parsed = parse_pass_filename(video_file.name)
            if not parsed or parsed.get("strategy") != "coverage":
                continue

            marker = ep_dir / f".{video_file.name}.extracted"
            if marker.exists():
                continue

            ep_id = parsed["episode"]
            take_num = int(parsed["take_num"])

            store = PassStore(project)
            try:
                records = {p["pass_id"]: p for p in store.list_passes(ep_id)}
            finally:
                store.close()

            pass_id, record = _resolve_pass_record_for_video(
                project=project,
                video_file=video_file,
                parsed=parsed,
                records=records,
            )
            if not record:
                continue

            # Skip if already awaiting user confirmation (cuts diverged) — don't re-detect on every poll
            if record.get("status") == "segmentation_review" and not record.get(
                "confirmed_timestamps"
            ):
                continue

            shot_ids = record.get("segment_shot_ids", [])
            confirmed = record.get("confirmed_timestamps") or {}
            if not shot_ids:
                continue

            # Load intended_durations from the CoveragePass JSON (source of
            # truth for planner intent). Fallback to None engages the guarded
            # backward-compat ladder branch in _detect_cuts_with_retry.
            #
            # State path: state/<STATE_NAMESPACE>/coverage_passes (currently "visual").
            intended_durations: list[float] | None = None
            ep_m = re.match(r"ep_(\d+)", ep_dir.name)
            ep_num_padded = ep_m.group(1).zfill(3) if ep_m else ep_dir.name
            candidate_paths = [
                ProjectPaths.from_root(project_dir).coverage_passes_dir
                / f"ep_{ep_num_padded}_passes.json",
            ]
            for coverage_passes_path in candidate_paths:
                if not coverage_passes_path.exists():
                    continue
                try:
                    with open(coverage_passes_path) as f:
                        cp_data = json.load(f)
                    # File may be a dict {"passes": [...]} OR a top-level list.
                    passes_list = (
                        cp_data.get("passes") if isinstance(cp_data, dict) else cp_data
                    )
                    if isinstance(passes_list, list):
                        for p in passes_list:
                            if p.get("pass_id") == pass_id:
                                intended_durations = [
                                    float(s.get("duration_s", 0))
                                    for s in p.get("segments", [])
                                ]
                                break
                except (OSError, json.JSONDecodeError, ValueError, TypeError) as e:
                    log.warning("Failed to load coverage_passes for %s: %s", pass_id, e)
                    intended_durations = None
                if intended_durations is not None:
                    break

            extracted, meta = _extract_pass_segments(
                pass_video_path=video_file,
                pass_id=pass_id,
                segment_shot_ids=shot_ids,
                take_num=take_num,
                output_dir=ep_dir,
                confirmed_timestamps=confirmed if confirmed else None,
                intended_durations=intended_durations,
            )

            store = PassStore(project)
            try:
                if meta["cuts_diverged"] and not confirmed:
                    store.update_pass(
                        pass_id,
                        status="segmentation_review",
                        detected_cuts=len(meta["scene_detection_raw"]),
                        scene_detect_threshold=meta["scene_detect_threshold"],
                        scene_detection_raw=meta["scene_detection_raw"],
                        cuts_diverged=True,
                        alignment_score=meta.get("alignment_score"),
                        model_added_count=meta.get("model_added_count", 0),
                    )
                    continue  # do not mark .extracted — re-enter after human correction

                store.update_pass(
                    pass_id,
                    status="extracted",
                    detected_cuts=len(meta["scene_detection_raw"]),
                    scene_detect_threshold=meta["scene_detect_threshold"],
                    scene_detection_raw=meta["scene_detection_raw"],
                    cuts_diverged=False,
                    extraction_method=meta["extraction_method"],
                    confirmed_timestamps=meta["confirmed_timestamps"],
                    segment_timestamps=meta["confirmed_timestamps"],
                    alignment_score=meta.get("alignment_score"),
                    model_added_count=meta.get("model_added_count", 0),
                )
            finally:
                store.close()

            if extracted:
                _write_segment_sidecars(
                    extracted_paths=extracted,
                    pass_id=pass_id,
                    segment_shot_ids=shot_ids,
                    segment_timestamps=meta["confirmed_timestamps"],
                    take_num=take_num,
                    source_video_name=video_file.name,
                    pass_record=record,
                    segment_classes=meta.get("segment_classes"),
                    parent_shot_ids=meta.get("parent_shot_ids"),
                    alignment_deltas=meta.get("alignment_deltas"),
                )
                marker.write_text(f"extracted {len(extracted)} segments\n")


# Detects extracted-segment filenames regardless of naming era
_OLD_EXTRACT_LEGACY_RE = re.compile(r"^shot_\d{1,4}[a-z]?_FROM_")


def _move_with_companions(video: Path, orphans_dir: Path) -> None:
    """Move a video file plus its sidecar, marker, and boundary frames to orphans_dir."""
    import shutil

    parent = video.parent
    orphans_dir.mkdir(parents=True, exist_ok=True)
    shutil.move(str(video), str(orphans_dir / video.name))
    sc = parent / f"{video.name}.json"
    if sc.exists():
        shutil.move(str(sc), str(orphans_dir / sc.name))
    mk = parent / f".{video.name}.extracted"
    if mk.exists():
        shutil.move(str(mk), str(orphans_dir / mk.name))
    bf_dir = parent / "boundary_frames"
    if bf_dir.is_dir():
        target_bf = orphans_dir / "boundary_frames"
        for bf in bf_dir.glob(f"{video.stem}_seg*.jpg"):
            target_bf.mkdir(parents=True, exist_ok=True)
            shutil.move(str(bf), str(target_bf / bf.name))


def _sweep_orphans(project: str, episode_id: str | None = None) -> dict:
    """Move un-claimed pass/shot videos into ep_NNN/_orphans/.

    A file is orphaned iff ALL of the following are true:
      1. Its mtime is older than 5 seconds (race protection against
         StepRunner having written the video but not yet the sidecar).
      2. It is NOT already under an _orphans/ subdir.
      3. Either:
         a. Matches _SHOT_PATTERN (legacy shot file in the wrong place)   OR
         b. Parses as a legacy semantic PASS but has no backing PassStore record OR
         c. Matches neither pattern (stray file with unknown provenance)
         UNLESS (for 3a) the filename is a valid extracted-segment
         (shot_NNN_FROM_PASS_... .mp4), which always has a legit parent.

    Moved set includes companion sidecar, marker, and boundary frames.

    Args:
        project:    Project name (e.g. 'tartarus').
        episode_id: Optional 'EP001'-style ID to limit the sweep.

    Returns:
        {"moved": N, "orphan_dirs": [str, ...]}
    """
    # Mode guard: only microdrama projects get orphan-swept.
    if not get_project(project).sweeps_orphans:
        return {"moved": 0, "orphan_dirs": [], "skipped_reason": "mode_disabled"}

    from recoil.execution.pass_store import PassStore as _PS

    project_dir = projects_root() / project
    video_roots = [
        project_dir / "renders",
        project_dir / "output" / "video",
    ]
    video_roots = [root for root in video_roots if root.is_dir()]
    if not video_roots:
        return {"moved": 0, "orphan_dirs": []}

    ep_dirs: list[Path] = []
    if episode_id:
        ep_num_match = re.match(r"EP(\d+)", episode_id, re.IGNORECASE)
        if ep_num_match:
            ep_name = f"ep_{int(ep_num_match.group(1)):03d}"
            ep_dirs = [
                root / ep_name
                for root in video_roots
                if (root / ep_name).is_dir()
            ]
    else:
        ep_dirs = [
            d
            for root in video_roots
            for d in root.glob("ep_*")
            if d.is_dir()
        ]

    moved = 0
    orphan_dirs: list[str] = []
    now = _time.time()

    for ep_dir in ep_dirs:
        orphans_dir = ep_dir / "_orphans"
        ep_num_match = re.match(r"ep_(\d+)", ep_dir.name)
        if not ep_num_match:
            continue
        ep_id = f"EP{int(ep_num_match.group(1)):03d}"

        # Snapshot filesystem first so the set and list are consistent
        items = list(ep_dir.iterdir())
        store = _PS(project)
        try:
            known_pass_ids = {p["pass_id"] for p in store.list_passes(ep_id)}
        except Exception as e:
            log.warning(
                "Orphan sweep aborted for %s: PassStore read failed: %s", ep_id, e
            )
            continue  # skip this episode — do NOT mass-quarantine on transient read error
        finally:
            store.close()

        for item in items:
            if not item.is_file():
                continue
            if item.suffix.lower() != ".mp4":
                continue
            try:
                if now - item.stat().st_mtime < 5:
                    continue
            except FileNotFoundError:
                continue

            name = item.name
            is_extract = "_FROM_PASS_" in name or bool(
                _OLD_EXTRACT_LEGACY_RE.match(name)
            )
            if is_extract:
                continue  # extracted segments ride with their parent pass

            parsed_name = parse_pass_filename(name)
            shot_m = _SHOT_PATTERN.match(name)

            is_orphan = False
            if parsed_name:
                if parsed_name.get("semantic_tag"):
                    inferred_pass_id = _legacy_pass_id_from_parsed(parsed_name)
                    if inferred_pass_id not in known_pass_ids:
                        is_orphan = True
                else:
                    # Core short/new grammar is valid workspace output. Coverage
                    # semantics are resolved through sidecar/PassStore in tree and
                    # pass lookup paths; do not quarantine by filename inference.
                    is_orphan = False
            elif shot_m:
                is_orphan = True
            else:
                is_orphan = True

            if parsed_name and parsed_name.get("strategy") == "coverage":
                _, record = _resolve_pass_record_for_video(
                    project=project,
                    video_file=item,
                    parsed=parsed_name,
                    records={pid: {"pass_id": pid} for pid in known_pass_ids},
                )
                if parsed_name.get("semantic_tag") and not record:
                    is_orphan = True

            if not is_orphan:
                continue

            _move_with_companions(item, orphans_dir)
            moved += 1

        if orphans_dir.is_dir():
            orphan_dirs.append(str(orphans_dir))

    return {"moved": moved, "orphan_dirs": orphan_dirs}


def _collect_quarantine_node(project: str, episode_id: str) -> dict | None:
    """Build a tree node representing the quarantine banner for an episode.

    Returns None if no _orphans/ directory exists or it is empty.
    The returned node carries `episode_id` on the banner and on every child,
    so the frontend can POST Register/Delete operations without re-deriving it.
    """
    project_dir = projects_root() / project
    ep_num_match = re.match(r"EP(\d+)", episode_id, re.IGNORECASE)
    if not ep_num_match:
        return None
    orphans_dir = (
        project_dir
        / "output"
        / "video"
        / f"ep_{int(ep_num_match.group(1)):03d}"
        / "_orphans"
    )
    if not orphans_dir.is_dir():
        return None

    items: list[dict] = []
    newest_mtime = 0.0
    for f in sorted(orphans_dir.iterdir(), key=lambda p: p.name.lower()):
        if not f.is_file():
            continue
        if f.suffix.lower() not in MEDIA_EXTENSIONS:
            continue
        try:
            mtime = f.stat().st_mtime
        except FileNotFoundError:
            continue
        rel = str(f.relative_to(project_dir))
        items.append(
            {
                "name": f.name,
                "type": "quarantine_item",
                "path": rel,
                "media_url": f"/media/{project}/{rel}",
                "mtime": mtime,
                "episode_id": episode_id,
            }
        )
        newest_mtime = max(newest_mtime, mtime)

    if not items:
        return None

    now = _time.time()
    auto_expand = (now - newest_mtime) < 3600.0
    return {
        "name": f"⚠ {len(items)} quarantined — expand",
        "type": "quarantine",
        "count": len(items),
        "auto_expand": auto_expand,
        "episode_id": episode_id,
        "children": items,
    }


# Coverage reduction (`_STATUS_PRIORITY`, `_best_status`, `_coverage_summary_for_episode`,
# `_recent_activity_for_episode`) lives in workspace.coverage as of MF-5 / Phase B
# 2026-04-30. Backward-compat aliases for the underscore names appear at the
# bottom of this module.


@app.get("/api/projects")
async def list_projects():
    """List all available projects."""
    if not projects_root().is_dir():
        return JSONResponse({"projects": []})
    projects = sorted(
        [
            d.name
            for d in projects_root().iterdir()
            if d.is_dir() and not d.name.startswith("_") and not d.name.startswith(".")
        ]
    )
    current = ws_state.get_project() or _DEFAULT_PROJECT
    return JSONResponse({"projects": projects, "current": current})


# ── Generation Helpers ─────────────────────────────────────────


def _pid_is_alive(pid: int) -> bool:
    """Return True if the given PID is still running.

    Mirrors the lockfile liveness logic in recoil/pipeline/cli/generate.py
    (acquire_episode_lock). Keep the two in sync if either is updated.
    """
    try:
        os.kill(pid, 0)
    except ProcessLookupError:
        return False
    except PermissionError:
        return True  # process exists but is foreign-owned
    except OSError:
        return False
    return True


def _build_generate_command(
    project: str,
    episode: int,
    pass_ids: list | None = None,
    dry_run: bool = False,
    validate: bool = False,
) -> list:
    """Construct the CLI argv for python3 recoil/pipeline/cli/generate.py.

    This is the single place in the workspace that maps HTTP request
    fields to CLI flags. All endpoint handlers invoking the CLI route
    through this function so a CLI flag change requires one edit, not many.
    """
    cli_path = _RECOIL_ROOT / "pipeline" / "cli" / "generate.py"
    argv = ["python3", str(cli_path), "--project", project, "--episode", str(episode)]

    if validate:
        # --validate: skip pass selector flags
        argv.append("--validate")
    elif pass_ids is None:
        # No pass_ids → run all
        argv.append("--all")
    elif len(pass_ids) == 1:
        argv += ["--pass", pass_ids[0]]
    else:
        # Multiple pass IDs → comma-joined
        argv += ["--passes", ",".join(pass_ids)]

    if dry_run:
        argv.append("--dry-run")

    return argv


@app.post("/api/project/switch")
async def switch_project(request: Request):
    """Switch the active project."""
    body = await request.json()
    project = body.get("project", "")
    project_dir = projects_root() / project
    if not project_dir.is_dir():
        return JSONResponse(
            {"error": f"Project '{project}' not found"}, status_code=404
        )
    state = ws_state.read_state()
    state["project"] = project
    state["selection"] = []
    state["viewer"] = {}
    ws_state.write_state(state)
    return JSONResponse({"project": project})


@app.get("/api/tree/{project}")
async def get_tree(project: str):
    """Virtual Aggregation Tree with 5s cache.

    Tree assembly is delegated to `workspace.tree.build_project_tree`. Side
    effects (orphan sweep, pass extraction, quarantine banner, auto-stub,
    cache management) stay in this route.
    """
    _validate_project(project)
    project_dir = projects_root() / project
    if not project_dir.is_dir():
        return JSONResponse(
            {"error": f"Project '{project}' not found"}, status_code=404
        )

    # Return cached tree if fresh (avoids rglob every 3s poll)
    now = _time.monotonic()
    if (
        project in _tree_cache
        and now - _tree_cache_mtime.get(project, 0) < _TREE_CACHE_TTL
    ):
        return JSONResponse(_tree_cache[project])

    # Sweep orphaned video files before extraction (move un-claimed files to _orphans/)
    try:
        _sweep_orphans(project)
    except Exception as e:
        log.warning("Orphan sweep failed for %s: %s", project, e)

    # Trigger pass extraction if needed (lightweight — checks marker files first)
    try:
        _maybe_extract_passes(project)
    except Exception as e:
        log.warning("Pass extraction check failed for %s: %s", project, e)

    # v2 layout: empty-project guard is "all four v2 media roots missing".
    # output/ no longer exists post-migration; checking it alone would fire
    # for every migrated project and return an empty tree.
    _V2_MEDIA_ROOTS = ("assets", "sequences", "renders", "state")
    if not any((project_dir / r).is_dir() for r in _V2_MEDIA_ROOTS):
        resp = {"project": project, "tree": {"children": []}, "file_count": 0}
        _tree_cache[project] = resp
        _tree_cache_mtime[project] = now
        return JSONResponse(resp)

    # Build the tree via the extracted module (workspace.tree).
    try:
        resp = build_project_tree(project)
    except ExecutionStoreUnavailableError as e:
        log.exception("get_tree: ExecutionStore unavailable for %s", project)
        return JSONResponse(
            {"error": "execution store unavailable", "detail": str(e)},
            status_code=503,
        )
    except SidecarCorruptError as e:
        log.exception("get_tree: corrupt sidecar encountered for %s", project)
        return JSONResponse(
            {"error": "corrupt sidecar", "path": e.path, "detail": str(e)},
            status_code=500,
        )

    # Prepend per-episode quarantine banners (orphan domain stays in server.py).
    # build_project_tree returns a tree WITHOUT quarantine banners; we walk the
    # Episodes category and prepend `_collect_quarantine_node` output to each
    # episode's children list.
    for cat in resp["tree"].get("children", []):
        if cat.get("name") == "Episodes":
            for ep_dir in cat.get("children", []):
                entity_name = ep_dir.get("name", "")
                ep_m = re.match(
                    r"^EP(\d+)",
                    entity_name.replace("Episode ", "EP").replace(" ", ""),
                    re.IGNORECASE,
                )
                if ep_m:
                    episode_id = f"EP{int(ep_m.group(1)):03d}"
                    q = _collect_quarantine_node(project, episode_id)
                    if q:
                        ep_dir["children"] = [q] + list(ep_dir.get("children", []))

    # Auto-stub missing sidecars (cheap operation — only creates files that don't exist).
    # v2 layout: walk each v2 media root since output/ no longer exists.
    try:
        stubbed = 0
        for _root_name in _V2_MEDIA_ROOTS:
            _root = project_dir / _root_name
            if _root.is_dir():
                stubbed += ws_sidecar.auto_stub_missing(_root)
        if stubbed > 0:
            log.info(
                "Auto-stubbed %d missing sidecars for project %s", stubbed, project
            )
    except Exception as e:
        log.warning("Auto-stub failed for project %s: %s", project, e)

    _tree_cache[project] = resp
    _tree_cache_mtime[project] = _time.monotonic()
    return JSONResponse(resp)


def _episode_board_mtime_key(project: str, ids: ws_board.EpisodeIds) -> int:
    """Max mtime_ns across scene files, matching storyboard sidecars, AND each
    batch's versions manifest.

    REC-231 Phase 4: a manifest-only conform/revert rewrites ``.versions.json`` but
    NOT the scene body's mtime, so folding the manifest mtime into the key busts the
    board cache on a pointer move (else the endpoint serves stale pointer state).
    Enumeration goes through the manifest-aware ``list_scenes`` (sidecar-safe), not a
    raw orchestration-dir glob.
    """
    from recoil.pipeline.core.persistence import (
        list_scenes,
        load_manifest,
        manifest_artifact_path,
        scene_manifest_path,
    )

    paths = ProjectPaths.for_project(project)
    scene_re = re.compile(rf"^{re.escape(ids.scene_token)}_BATCH_(\d+)\.json$")
    sidecar_template = r"^{}_v\d+\.png\.json$"
    mtimes: list[int] = []
    batch_ids: list[str] = []

    for scene_file in list_scenes(project, ids.prep_token):
        if not scene_re.match(scene_file.name):
            continue
        mtimes.append(scene_file.stat().st_mtime_ns)
        batch_id = scene_file.stem.removeprefix(f"{ids.scene_token}_")
        batch_ids.append(batch_id)
        manifest_path = scene_manifest_path(project, ids.prep_token, batch_id)
        if manifest_path.is_file():
            mtimes.append(manifest_path.stat().st_mtime_ns)
            manifest = load_manifest(project, ids.prep_token, batch_id)
            if manifest is not None:
                for entry in manifest.get("versions", []):
                    version = entry.get("version") if isinstance(entry, dict) else None
                    if not isinstance(version, int):
                        continue
                    body_path = manifest_artifact_path(
                        project, ids.prep_token, batch_id, manifest, version
                    )
                    if body_path.is_file() and re.search(r"\.v\d+\.json$", body_path.name):
                        mtimes.append(body_path.stat().st_mtime_ns)

    storyboards_dir = paths.project_root / "prep" / ids.prep_token / "storyboards"
    if storyboards_dir.is_dir():
        for batch_id in batch_ids:
            sidecar_re = re.compile(sidecar_template.format(re.escape(batch_id)))
            for sidecar_path in storyboards_dir.iterdir():
                if sidecar_path.is_file() and sidecar_re.match(sidecar_path.name):
                    mtimes.append(sidecar_path.stat().st_mtime_ns)

    return max(mtimes, default=0)


@app.get("/api/episode/{project}/{episode_id}/board")
async def get_episode_board(project: str, episode_id: str):
    """Episode board payload with a 5s cache keyed by source file mtimes."""
    _validate_project(project)
    project_dir = projects_root() / project
    if not project_dir.is_dir():
        return JSONResponse(
            {"error": f"Project '{project}' not found"}, status_code=404
        )

    try:
        ids = ws_board.normalize_episode(episode_id)
    except ValueError as e:
        return JSONResponse(
            {"error": str(e), "episode_id": episode_id},
            status_code=400,
        )

    cache_key = f"{project}:{ids.coverage_id}"
    now = _time.monotonic()
    # A single corrupt/cross-batch manifest must NOT 500 the whole episode wall: the
    # mtime-key computation can raise (load_manifest → KeyError on schema_version mismatch
    # / ValueError on batch_id identity; manifest_artifact_path → ValueError on an unsafe
    # artifact). Guard it BEFORE the cache check — serve a fresh cached payload if one
    # exists, else force a rebuild (which degrades to 400, not an unhandled 500).
    try:
        mtime_key = _episode_board_mtime_key(project, ids)
    except (KeyError, ValueError) as e:
        log.warning("episode board mtime-key failed for %s/%s: %s", project, episode_id, e)
        if (
            cache_key in _board_cache
            and now - _board_cache_time.get(cache_key, 0) < _BOARD_CACHE_TTL
        ):
            return JSONResponse(_board_cache[cache_key])
        mtime_key = None  # un-cacheable this round → fall through to the rebuild path
    if (
        mtime_key is not None
        and cache_key in _board_cache
        and _board_cache_mtime_key.get(cache_key) == mtime_key
        and now - _board_cache_time.get(cache_key, 0) < _BOARD_CACHE_TTL
    ):
        return JSONResponse(_board_cache[cache_key])

    try:
        payload = ws_board.build_episode_board(project, episode_id)
    except (KeyError, ValueError) as e:
        return JSONResponse(
            {"error": str(e), "episode_id": episode_id},
            status_code=400,
        )
    _board_cache[cache_key] = payload
    _board_cache_mtime_key[cache_key] = mtime_key
    _board_cache_time[cache_key] = _time.monotonic()
    return JSONResponse(payload)


@app.get("/api/episode/{project}/{episode_id}/comments")
async def get_episode_comments(project: str, episode_id: str):
    _validate_project(project)
    if not (projects_root() / project).exists():
        return JSONResponse({"error": f"unknown project '{project}'"}, status_code=404)
    try:
        payload = ws_board_comments.load_comments(project, episode_id)
    except ValueError as e:
        return JSONResponse({"error": str(e), "episode_id": episode_id}, status_code=400)
    return JSONResponse(payload)


def _board_segments_by_batch(project: str, episode_id: str) -> dict[str, set[str]]:
    """Map of batch_id -> set of its panel segment_ids for the current episode board.

    Raises ValueError on an unresolvable episode id (propagated from
    build_episode_board / normalize_episode).
    """
    board = ws_board.build_episode_board(project, episode_id)
    return {
        b["batch_id"]: {
            p["segment_id"] for p in b["panels"] if p.get("segment_id")
        }
        for b in board["batches"]
    }


@app.post("/api/episode/{project}/{episode_id}/comments")
async def post_episode_comment(project: str, episode_id: str, request: Request):
    _validate_project(project)
    if not (projects_root() / project).exists():
        return JSONResponse({"error": f"unknown project '{project}'"}, status_code=404)
    body = await request.json()
    # Validate the target exists in the LIVE board BEFORE writing (no orphan comments).
    try:
        segments_by_batch = _board_segments_by_batch(project, episode_id)
    except ValueError as e:
        return JSONResponse({"error": str(e), "episode_id": episode_id}, status_code=400)
    target_batch = body.get("batch_id")
    if target_batch not in segments_by_batch:
        return JSONResponse(
            {"error": f"unknown batch_id '{target_batch}' for this episode board"},
            status_code=400,
        )
    if body.get("target_type") == "panel" and body.get("segment_id") not in segments_by_batch[target_batch]:
        # Pair-validate: the segment_id must belong to THIS batch, not just exist somewhere.
        return JSONResponse(
            {"error": f"unknown segment_id '{body.get('segment_id')}' for batch '{target_batch}'"},
            status_code=400,
        )
    try:
        comment = ws_board_comments.add_comment(
            project, episode_id,
            target_type=body.get("target_type"),
            batch_id=body.get("batch_id"),
            segment_id=body.get("segment_id"),
            body=body.get("body", ""),
            tag=body.get("tag", "note"),
            author=body.get("author", "JT"),
        )
    except ValueError as e:
        return JSONResponse({"error": str(e)}, status_code=400)
    return JSONResponse(comment, status_code=201)


@app.post("/api/episode/{project}/{episode_id}/comments/{comment_id}/resolve")
async def post_resolve_comment(project: str, episode_id: str, comment_id: str, request: Request):
    _validate_project(project)
    if not (projects_root() / project).exists():
        return JSONResponse({"error": f"unknown project '{project}'"}, status_code=404)
    body = await request.json() if await request.body() else {}
    resolved = bool(body.get("resolved", True))
    updated = ws_board_comments.resolve_comment(project, episode_id, comment_id, resolved=resolved)
    if updated is None:
        return JSONResponse({"error": "comment not found", "comment_id": comment_id}, status_code=404)
    return JSONResponse(updated)


@app.delete("/api/episode/{project}/{episode_id}/comments/{comment_id}")
async def delete_comment(project: str, episode_id: str, comment_id: str):
    _validate_project(project)
    if not (projects_root() / project).exists():
        return JSONResponse({"error": f"unknown project '{project}'"}, status_code=404)
    removed = ws_board_comments.delete_comment(project, episode_id, comment_id)
    if removed is None:
        return JSONResponse({"error": "comment not found", "comment_id": comment_id}, status_code=404)
    return JSONResponse(removed)


# ── Routes: Archive ────────────────────────────────────────────


@app.post("/api/archive/{project}")
async def archive_item(project: str, request: Request):
    """Move a file or folder to _archive/, preserving relative path.

    Also moves the companion sidecar JSON if it exists.
    """
    _validate_project(project)
    body = await request.json()
    rel_path = body.get("path", "")
    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = projects_root() / project
    source = project_dir / rel_path

    # Security check
    try:
        source.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not source.exists():
        return JSONResponse({"error": f"Not found: {rel_path}"}, status_code=404)

    if source.is_file():
        # Use sidecar-aware archive for individual files
        try:
            archive_dest = ws_sidecar.archive_with_sidecar(source, project_dir)
        except Exception as e:
            return JSONResponse({"error": str(e)}, status_code=500)
    else:
        # Directory archive — move the whole directory, then update sidecars within
        import shutil

        # R9.1: v2 layout writes archives to _history/archives/ — NOT the deprecated
        # output/_archive/ tree (which is removed by migrate_v2_layout.py Step 3i).
        # archive_with_sidecar in sidecar.py was already fixed in Phase 6; this
        # parallel directory branch in server.py was missed.
        #
        # If rel_path itself carries a stale v1 "output/" prefix (e.g. a UI client
        # cached the pre-migration tree), refuse rather than re-create the deprecated
        # tree under _history/archives/.
        # R10.2: parts-based check catches bare "output" (no trailing slash) too —
        # the old startswith("output/") check let a bare "output" rel_path slip
        # through and re-create _history/archives/output.
        rel_parts = Path(rel_path).parts
        if rel_parts and rel_parts[0] == "output":
            return JSONResponse(
                {"error": f"Refusing to archive stale v1 path: {rel_path}"},
                status_code=400,
            )
        archive_dest = project_dir / "_history" / "archives" / rel_path
        archive_dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(source), str(archive_dest))

        # Recursively update all sidecar statuses inside the moved directory
        for sc_file in archive_dest.rglob("*.json"):
            if not ws_sidecar._is_sidecar_file(sc_file):
                continue
            try:
                sc_data = json.loads(sc_file.read_text(encoding="utf-8"))
                sc_data["status"] = "archived"
                sc_data["archived_at"] = ws_sidecar._now_iso()
                # Compute archived_from as original pre-move path for restore support
                media_file = sc_file.parent / Path(sc_file.stem)
                try:
                    sub_path = media_file.relative_to(archive_dest)
                    sc_data["archived_from"] = str(Path(rel_path) / sub_path)
                except ValueError:
                    pass
                ws_sidecar._atomic_write_json(sc_file, sc_data)
            except Exception:
                continue  # Best effort — don't fail the archive operation

    # Invalidate tree cache
    _invalidate_caches(project)

    return JSONResponse(
        {
            "archived": rel_path,
            "destination": str(archive_dest.relative_to(project_dir)),
        }
    )


# ── Routes: Generate ──────────────────────────────────────────


@app.post("/api/generate/{project}")
async def generate_video(project: str, request: Request):
    """Dispatch the CLI generation subprocess for a coverage pass.

    Request body (JSON):
        episode   — required; int or string like "EP001" or "1"
        pass_ids  — optional list of pass IDs; omit/null for all; [] → 422
        dry_run   — optional bool, default false
                    If true: runs CLI synchronously (max 30s) and returns
                    the JSON cost estimate in an HTTP 200 response.
                    If false: spawns CLI detached, returns 202 immediately.
        validate  — optional bool, default false

    Returns:
        200  — dry-run result (parsed JSON from CLI stdout)
        202  — live run spawned: {"ok": True, "pid": N, "log_path": ..., "argv": [...]}
        409  — generation already running: {"error": "generation_running", "pid": N}
        422  — validation error: {"error": "empty_pass_ids"}
        500  — spawn failure: {"error": "spawn_failed", "message": ...}
    """
    body = await request.json()

    # ── Parse episode (accept int or "EP001" or "1") ──
    raw_episode = body.get("episode")
    if raw_episode is None:
        return JSONResponse({"error": "episode required"}, status_code=422)
    if isinstance(raw_episode, int):
        episode = raw_episode
    else:
        ep_str = str(raw_episode).strip()
        if ep_str.upper().startswith("EP"):
            try:
                episode = int(ep_str[2:])
            except ValueError:
                return JSONResponse(
                    {"error": f"Cannot parse episode: {raw_episode}"}, status_code=422
                )
        else:
            try:
                episode = int(ep_str)
            except ValueError:
                return JSONResponse(
                    {"error": f"Cannot parse episode: {raw_episode}"}, status_code=422
                )

    # ── Parse pass_ids ──
    pass_ids_raw = body.get("pass_ids")
    if pass_ids_raw is not None:
        if not isinstance(pass_ids_raw, list):
            return JSONResponse({"error": "pass_ids must be a list"}, status_code=422)
        if len(pass_ids_raw) == 0:
            return JSONResponse({"error": "empty_pass_ids"}, status_code=422)
        pass_ids = [str(p) for p in pass_ids_raw]
    else:
        pass_ids = None  # → --all

    dry_run = bool(body.get("dry_run", False))
    validate = bool(body.get("validate", False))

    # ── Lockfile liveness check (skip for dry-run and validate-only) ──
    if not dry_run and not validate:
        lock_path = (
            ProjectPaths.for_project(project).passes_dir
            / f"ep_{episode:03d}_pass_state.lock"
        )
        if lock_path.exists():
            try:
                pid_text = lock_path.read_text(encoding="utf-8").strip()
                if pid_text and pid_text.isdigit():
                    pid = int(pid_text)
                    if _pid_is_alive(pid):
                        return JSONResponse(
                            {"error": "generation_running", "pid": pid},
                            status_code=409,
                        )
            except Exception as e:
                log.warning("Error reading lockfile %s: %s", lock_path, e)

    # ── Build argv ──
    argv = _build_generate_command(project, episode, pass_ids, dry_run, validate)
    repo_root = _RECOIL_ROOT.parent

    # ── Dry-run: synchronous with 30s timeout ──
    if dry_run:
        try:
            result = subprocess.run(
                argv,
                cwd=str(repo_root),
                capture_output=True,
                text=True,
                timeout=30,
            )
        except subprocess.TimeoutExpired:
            return JSONResponse(
                {"error": "spawn_failed", "message": "dry-run timed out after 30s"},
                status_code=500,
            )
        except Exception as e:
            return JSONResponse(
                {"error": "spawn_failed", "message": str(e)},
                status_code=500,
            )
        try:
            parsed = json.loads(result.stdout)
        except json.JSONDecodeError:
            parsed = {
                "success": False,
                "error": "invalid_json",
                "raw": result.stdout[:500],
            }
        return JSONResponse(parsed)

    # ── Live run: spawn detached subprocess ──
    log_path = (
        ProjectPaths.for_project(project).passes_dir
        / f"ep_{episode:03d}_generate.log"
    )
    log_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        log_file = open(log_path, "wb")  # truncate per run; child inherits the fd
        proc = subprocess.Popen(
            argv,
            cwd=str(repo_root),
            stdout=subprocess.DEVNULL,  # CLI stdout is only a final JSON; workspace reads PassStore instead
            stderr=log_file,
            start_new_session=True,  # detach from workspace server's process group
        )
        # Parent closes its copy immediately — child process holds the fd independently.
        # Without this close the server leaks one file descriptor per generation.
        log_file.close()
    except Exception as e:
        return JSONResponse(
            {"error": "spawn_failed", "message": str(e)},
            status_code=500,
        )

    return JSONResponse(
        {"ok": True, "pid": proc.pid, "log_path": str(log_path), "argv": argv},
        status_code=202,
    )


# ── Routes: Sidecar CRUD ─────────────────────────────────────


@app.post("/api/sidecar/{project}")
async def sidecar_crud(project: str, request: Request):
    """Read or write a sidecar for a media file.

    Body:
        {"path": "assets/char/sadie/base/sadie_identity_hero_v04.jpeg"}
        — Returns the sidecar data (GET-style read via POST)

        {"path": "...", "data": {"notes": "Best framing", "tags": ["hero-candidate"]}}
        — Merges data into existing sidecar (write)
    """
    _validate_project(project)
    body = await request.json()
    rel_path = body.get("path", "")
    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = projects_root() / project
    media_path = project_dir / rel_path

    # Security check
    try:
        media_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not media_path.is_file():
        return JSONResponse({"error": f"Not found: {rel_path}"}, status_code=404)

    write_data = body.get("data")
    if write_data:
        # Merge write: read existing, overlay new fields, write back
        existing = ws_sidecar.ensure_sidecar(media_path)
        existing.update(write_data)
        ws_sidecar.write_sidecar(media_path, existing)
        # Invalidate tree cache so sidecar changes appear immediately
        _invalidate_caches(project)
        return JSONResponse({"path": rel_path, "sidecar": existing})
    else:
        # Read only
        try:
            data = ws_sidecar.read_sidecar(media_path)
        except SidecarCorruptError as e:
            log.exception("read sidecar: corrupt at %s", media_path)
            return JSONResponse(
                {"error": "corrupt sidecar", "path": rel_path, "detail": str(e)},
                status_code=500,
            )
        if data is None:
            return JSONResponse({"path": rel_path, "sidecar": None})
        return JSONResponse({"path": rel_path, "sidecar": data})


@app.post("/api/promote/{project}")
async def promote_item(project: str, request: Request):
    """Promote a file's sidecar status.

    Body:
        {"path": "assets/char/sadie/base/sadie_identity_hero_v04.jpeg", "status": "pinned"}
        — Sets status to pinned

        {"path": "...", "status": "canonical", "asset_type": "characters", "entity_id": "sadie"}
        — Full canonical promotion (pool-first + hero copy, updates casting_state.json)
    """
    _validate_project(project)
    body = await request.json()
    rel_path = body.get("path", "")
    status = body.get("status", "pinned")

    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = projects_root() / project
    media_path = project_dir / rel_path

    # Security check
    try:
        media_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not media_path.is_file():
        return JSONResponse({"error": f"Not found: {rel_path}"}, status_code=404)

    if status == "canonical":
        # Full canonical promotion
        asset_type = body.get("asset_type", "")
        entity_id = body.get("entity_id", "")
        if not asset_type or not entity_id:
            return JSONResponse(
                {"error": "asset_type and entity_id required for canonical promotion"},
                status_code=400,
            )
        try:
            data = ws_sidecar.promote_to_canonical(
                media_path, asset_type, entity_id, project_dir
            )
        except Exception as e:
            return JSONResponse({"error": str(e)}, status_code=500)

        # Invalidate tree cache
        _invalidate_caches(project)

        return JSONResponse(
            {
                "path": rel_path,
                "status": "canonical",
                "promoted_to": data.get("promoted_to"),
                "sidecar": data,
            }
        )
    else:
        # Simple status change (pinned, candidate, etc.)
        try:
            data = ws_sidecar.set_status(media_path, status)
        except ValueError as e:
            return JSONResponse({"error": str(e)}, status_code=400)

        # Invalidate tree cache
        _invalidate_caches(project)

        return JSONResponse(
            {
                "path": rel_path,
                "status": status,
                "sidecar": data,
            }
        )


@app.post("/api/reject/{project}")
async def reject_shot(project: str, request: Request):
    """Reject a shot take with failure mode classification.

    Body:
        {"shot_id": "EP001_SH03", "failure_mode": "identity_drift", "reason": "...", "take_id": "..."}
    """
    _validate_project(project)
    body = await request.json()
    shot_id = body.get("shot_id", "")
    failure_mode = body.get("failure_mode", "unknown")
    reason = body.get("reason", "")
    take_id = body.get("take_id")

    if not shot_id:
        return JSONResponse({"error": "shot_id required"}, status_code=400)

    store = _get_store(project)
    shot = store.get_shot(shot_id)
    if shot is None:
        store.close()
        return JSONResponse({"error": f"Shot '{shot_id}' not found"}, status_code=404)

    current_status = shot.get("status", "previs_pending")

    # Determine target rejected status
    if "previs" in current_status:
        target_status = "previs_rejected"
    elif "keyframe" in current_status:
        target_status = "keyframe_rejected"
    elif "video" in current_status:
        target_status = "video_rejected"
    else:
        target_status = "rejected"

    # Find and mark the take
    takes = shot.get("takes", [])
    if takes:
        if take_id:
            for t in takes:
                if t.get("take_id") == take_id:
                    t["rejected"] = True
                    t["rejection_reason"] = reason
                    t["failure_mode"] = failure_mode
                    break
        else:
            take = takes[-1]
            take["rejected"] = True
            take["rejection_reason"] = reason
            take["failure_mode"] = failure_mode
            take_id = take.get("take_id", f"take_{len(takes)}")

    try:
        store.update_shot(shot_id, status=target_status, takes=takes)
    except Exception as e:
        store.close()
        return JSONResponse({"error": f"Failed to update shot: {e}"}, status_code=500)
    store.close()

    # Ingest into LearningEngine if available
    try:
        from recoil.pipeline.orchestrator.learning_engine import LearningEngine

        le = LearningEngine(project=project)
        le.ingest_retry(
            shot_id=shot_id,
            failure_mode=failure_mode,
            source="human_reject",
            notes=reason,
        )
        le.flush()
    except Exception as e:
        log.warning("Could not log reject to LearningEngine: %s", e)

    # Invalidate tree cache
    _invalidate_caches(project)

    return JSONResponse(
        {
            "shot_id": shot_id,
            "take_id": take_id,
            "previous_status": current_status,
            "new_status": target_status,
            "failure_mode": failure_mode,
            "reason": reason,
        }
    )


@app.post("/api/restore/{project}")
async def restore_item(project: str, request: Request):
    """Restore a file from the archive root back to its original location.

    v2 layout: archives live at _history/archives/ (NOT the deprecated
    output/_archive/ tree which was removed by migrate_v2_layout.py Step 3i).

    Body: {"path": "_history/archives/assets/char/sadie/base/sadie_identity_hero_v04.jpeg"}
    """
    _validate_project(project)
    body = await request.json()
    rel_path = body.get("path", "")
    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = projects_root() / project
    archive_path = project_dir / rel_path

    # Security check
    try:
        archive_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not archive_path.is_file():
        return JSONResponse({"error": f"Not found: {rel_path}"}, status_code=404)

    try:
        restored = ws_sidecar.restore_from_archive(archive_path, project_dir)
    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=500)

    # Invalidate tree cache
    _invalidate_caches(project)

    return JSONResponse(
        {
            "original_path": rel_path,
            "restored_to": str(restored.relative_to(project_dir)),
        }
    )


@app.get("/api/shot/{project}/{shot_id}")
async def get_shot(project: str, shot_id: str):
    _validate_project(project)
    store = _get_store(project)
    shot = store.get_shot(shot_id)
    store.close()

    if shot is None:
        return JSONResponse({"error": f"Shot '{shot_id}' not found"}, status_code=404)

    # Resolve take paths
    takes = shot.get("takes", [])
    for take in takes:
        fp = take.get("file_path", "")
        if fp and not fp.startswith("/"):
            take["media_url"] = f"/media/{project}/{fp}"

    # Check for coverage passes from PassStore
    coverage_passes = []
    episode_id = shot.get("episode_id", "")
    if episode_id:
        try:
            from recoil.execution.pass_store import PassStore

            pass_store = PassStore(project)
            state_data = pass_store._read_episode_file(episode_id)
            # Check shot_links for this shot_id
            shot_links = state_data.get("shot_links", {})
            if shot_id in shot_links:
                link = shot_links[shot_id]
                coverage_pass_id = link.get("coverage_pass_id", "")
                if coverage_pass_id:
                    pass_record = pass_store.get_pass(coverage_pass_id)
                    if pass_record:
                        seg_ids = pass_record.get("segment_shot_ids", [])
                        seg_idx = seg_ids.index(shot_id) if shot_id in seg_ids else -1
                        seg_count = len(seg_ids)
                        seg_ts = pass_record.get("segment_timestamps", {})
                        ts = seg_ts.get(str(seg_idx), {}) if seg_idx >= 0 else {}
                        coverage_passes.append(
                            {
                                "pass_id": coverage_pass_id,
                                "segment_index": seg_idx,
                                "segment_count": seg_count,
                                "pass_status": pass_record.get("status", "unknown"),
                                "timestamp_start": ts.get("start"),
                                "timestamp_end": ts.get("end"),
                            }
                        )

            # Also scan pass segment_shot_ids for this shot
            all_passes = state_data.get("passes", {})
            for pid, prec in all_passes.items():
                seg_ids = prec.get("segment_shot_ids", [])
                seen = {cp["pass_id"] for cp in coverage_passes}
                if shot_id in seg_ids and pid not in seen:
                    seg_idx = seg_ids.index(shot_id)
                    seg_ts = prec.get("segment_timestamps", {})
                    ts = seg_ts.get(str(seg_idx), {})
                    coverage_passes.append(
                        {
                            "pass_id": pid,
                            "segment_index": seg_idx,
                            "segment_count": len(seg_ids),
                            "pass_status": prec.get("status", "unknown"),
                            "timestamp_start": ts.get("start"),
                            "timestamp_end": ts.get("end"),
                        }
                    )
            pass_store.close()
        except Exception as e:
            log.warning("Could not check coverage passes for %s: %s", shot_id, e)

    # FIX (Opus C3): Include attempts, max_attempts, coverage_of in response
    # so the inspector can display them correctly.
    return JSONResponse(
        {
            "shot_id": shot_id,
            "project": project,
            "episode_id": shot.get("episode_id", ""),
            "status": shot.get("status", "previs_pending"),
            "status_color": _shot_status_color(shot.get("status", "previs_pending")),
            "pipeline": shot.get("pipeline"),
            "model": shot.get("model"),
            "cost_incurred": shot.get("cost_incurred", 0),
            "attempts": shot.get("attempts", 0),
            "max_attempts": shot.get("max_attempts", 3),
            "gate_results": shot.get("gate_results", {}),
            "output_path": shot.get("output_path"),
            "error_message": shot.get("error_message"),
            "takes": takes,
            "take_count": len(takes),
            "is_coverage": shot.get("is_coverage", False),
            "coverage_of": shot.get("coverage_of"),
            "coverage_passes": coverage_passes,
            "updated_at": shot.get("updated_at"),
        }
    )


# ── Routes: Pass Detail ────────────────────────────────────────


@app.get("/api/pass/{project}/{pass_id}")
async def get_pass_detail(project: str, pass_id: str):
    """Return full PassStore record for a coverage pass."""
    _validate_project(project)
    from recoil.execution.pass_store import PassStore

    try:
        store = PassStore(project)
        record = store.get_pass(pass_id)
        store.close()
    except ValueError:
        return JSONResponse({"error": "pass not found"}, status_code=404)
    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=500)

    if not record:
        return JSONResponse({"error": f"Pass '{pass_id}' not found"}, status_code=404)

    return JSONResponse(
        {
            "pass_id": pass_id,
            "project": project,
            "status": record.get("status", "unknown"),
            "segment_shot_ids": record.get("segment_shot_ids", []),
            "segment_timestamps": record.get("segment_timestamps", {}),
            "cost_usd": read_cost_from_record_safe(record),
            "expected_cuts": record.get("expected_cuts", 0),
            "detected_cuts": record.get("detected_cuts", 0),
            "takes": record.get("takes", []),
            "created_at": record.get("created_at"),
            "updated_at": record.get("updated_at"),
        }
    )


# ── Routes: Recent Feed ───────────────────────────────────────


@app.get("/api/recent/{project}")
async def get_recent(project: str, limit: int = 50, offset: int = 0):
    """Return recent media files sorted by modification time (newest first)."""
    _validate_project(project)
    project_dir = projects_root() / project
    # v2 layout: scan the four v2 media roots instead of the deleted output/ tree.
    _MEDIA_ROOTS = ("assets", "sequences", "renders", "state")

    if not any((project_dir / r).is_dir() for r in _MEDIA_ROOTS):
        return JSONResponse({"files": [], "total": 0})

    # Cache check
    now = _time.monotonic()
    cache_key = f"{project}:{limit}:{offset}"
    if (
        cache_key in _recent_cache
        and now - _recent_cache_mtime.get(cache_key, 0) < _RECENT_CACHE_TTL
    ):
        return JSONResponse(_recent_cache[cache_key])

    # Build sidecar index for metadata overlay
    meta_index = _build_metadata_index(project)

    files = []
    for root_name in _MEDIA_ROOTS:
        root = project_dir / root_name
        if not root.is_dir():
            continue
        for path in root.rglob("*"):
            if not path.is_file():
                continue
            if path.suffix.lower() not in MEDIA_EXTENSIONS:
                continue
            if path.name.startswith("."):
                continue
            # Skip _history (v2 archive root), _meta, boundary_frames, backup dirs.
            # (_archive retained for any legacy stragglers that escaped migration.)
            # _history match is exact (R10.3) — startswith would false-positive
            # sibling dirs like "_history_backup".
            parts = path.relative_to(project_dir).parts
            skip_dirs = ("_archive", "_meta", "boundary_frames", "_history")
            if any(
                p in skip_dirs
                or p.startswith(".")
                or "backup" in p.lower()
                for p in parts
            ):
                continue

            try:
                mtime = path.stat().st_mtime
            except OSError:
                continue

            rel_path = str(path.relative_to(project_dir))
            ext = path.suffix.lower()
            media_type = "video" if ext in (".mp4", ".mov", ".webm") else "image"

            entry = {
                "name": path.name,
                "path": rel_path,
                "media_url": f"/media/{project}/{rel_path}",
                "type": media_type,
                "mtime": mtime,
                "status": "untracked",
                "status_color": "gray",
                "model": None,
                "cost": None,
            }

            # Overlay metadata
            if rel_path in meta_index:
                meta = meta_index[rel_path]
                entry["status"] = meta.get("status", entry["status"])
                entry["status_color"] = meta.get("status_color", entry["status_color"])
                entry["model"] = meta.get("model")
                entry["cost"] = meta.get("cost")
                if meta.get("sidecar_source") == "pass_extraction":
                    entry["source_type"] = "pass_segment"

            files.append(entry)

    # Sort by mtime descending (newest first)
    files.sort(key=lambda f: f["mtime"], reverse=True)
    total = len(files)

    # Apply pagination
    paginated = files[offset : offset + limit]

    resp = {"files": paginated, "total": total}
    _recent_cache[cache_key] = resp
    _recent_cache_mtime[cache_key] = now
    return JSONResponse(resp)


# ── Routes: Activity ───────────────────────────────────────────


@app.get("/api/activity/{project}")
async def get_activity(project: str):
    _validate_project(project)
    from recoil.pipeline._lib.ops_log import scan_for_dangling_ops

    ops_log_path = _get_ops_log_path(project)
    in_flight = scan_for_dangling_ops(ops_log_path)

    recent = []
    if ops_log_path.is_file():
        lines = ops_log_path.read_text(encoding="utf-8").strip().split("\n")
        for line in reversed(lines[-40:]):
            try:
                record = json.loads(line)
                if record.get("status") in ("completed", "failed", "crashed"):
                    recent.append(record)
                    if len(recent) >= 10:
                        break
            except (json.JSONDecodeError, TypeError):
                continue

    return JSONResponse(
        {
            "project": project,
            "in_flight": in_flight,
            "in_flight_count": len(in_flight),
            "recent": recent,
        }
    )


# ── Routes: Action Analytics ───────────────────────────────────

_ANALYTICS_PATH = Path.home() / ".recoil-workspace" / "action_counts.json"


@app.post("/api/track-action")
async def track_action(request: Request):
    """Increment usage counter for a context menu action."""
    body = await request.json()
    action = body.get("action", "")
    if not action:
        return JSONResponse({"error": "action required"}, status_code=400)

    _ANALYTICS_PATH.parent.mkdir(parents=True, exist_ok=True)
    counts = {}
    if _ANALYTICS_PATH.is_file():
        try:
            counts = json.loads(_ANALYTICS_PATH.read_text())
        except (json.JSONDecodeError, OSError) as e:
            # Corrupt analytics file — DO NOT overwrite it. Writing fresh
            # data on top of a corrupt read silently destroys the prior
            # counters. Return early with the in-memory current action
            # incremented (caller sees their action counted; persisted
            # state stays as-is for the operator to inspect).
            log.warning(
                "action_stats: refusing to overwrite corrupt %s — %s",
                _ANALYTICS_PATH, e,
            )
            return JSONResponse({action: 1, "_corrupt": True}, status_code=200)
    counts[action] = counts.get(action, 0) + 1
    _ANALYTICS_PATH.write_text(json.dumps(counts, indent=2))
    return JSONResponse(counts)


@app.get("/api/action-stats")
async def action_stats():
    """Get usage counts for all context menu actions."""
    if not _ANALYTICS_PATH.is_file():
        return JSONResponse({})
    try:
        return JSONResponse(json.loads(_ANALYTICS_PATH.read_text()))
    except Exception:
        return JSONResponse({})


# ── Routes: Finder / External Apps ─────────────────────────────


@app.post("/api/open-in-finder/{project}")
async def open_in_finder(project: str, request: Request):
    """Reveal a file in macOS Finder."""
    _validate_project(project)
    body = await request.json()
    rel_path = body.get("path", "")
    full_path = projects_root() / project / rel_path
    try:
        full_path.resolve().relative_to(projects_root().resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)
    if not full_path.exists():
        return JSONResponse({"error": "Not found"}, status_code=404)

    import subprocess

    subprocess.Popen(["open", "-R", str(full_path)])
    return JSONResponse({"opened": rel_path})


def _read_orphan_sidecar_for_reclaim(old_sidecar: Path) -> tuple[dict, bool]:
    """Read the source sidecar for an orphan reclaim, distinguishing
    "no sidecar" from "corrupt sidecar" per Tenet 6.

    Returns ``(sidecar_dict, existed)``. Raises ``SidecarCorruptError`` when
    the file exists but is malformed — orphan reclaim must surface that
    instead of silently re-creating an empty ``{}`` sidecar (which would
    obliterate provenance).
    """
    if not old_sidecar.exists():
        return {}, False
    try:
        return json.loads(old_sidecar.read_text(encoding="utf-8")), True
    except (json.JSONDecodeError, OSError) as e:
        raise SidecarCorruptError(str(old_sidecar), message=str(e)) from e


@app.post("/api/passes/register_orphan/{project}")
async def register_orphan(project: str, request: Request):
    """Promote a quarantined orphan video into a first-class pass.

    Request body:
        {
          "orphan_filename": "wan27_atlas_test.mp4",
          "target_episode": "EP001",
          "segment_shot_ids": ["EP001_SH33", "EP001_SH33A"],
          "semantic_tag": "A_WREN",
          "reconstructed_provenance": { "model": "...", "notes": "..." }
        }

    Actions:
      1. Compute next pass counter.
      2. Build new pass_id and canonical filename.
      3. Create PassStore record.
      4. Rename file (and sidecar) out of _orphans/ into ep_NNN/.
      5. Update sidecar JSON to set pass_id + format_type + provenance.
      6. Trigger _maybe_extract_passes(project) so segments populate.
    """
    _validate_project(project)
    from recoil.execution.pass_store import PassStore
    import shutil as _shutil

    body = await request.json()
    orphan_filename = body.get("orphan_filename", "")
    if not isinstance(orphan_filename, str):
        return JSONResponse(
            {"error": "orphan_filename must be a string"}, status_code=400
        )
    target_episode = body.get("target_episode", "")
    if not isinstance(target_episode, str):
        return JSONResponse(
            {"error": "target_episode must be a string"}, status_code=400
        )
    segment_shot_ids = body.get("segment_shot_ids", []) or []
    if not isinstance(segment_shot_ids, list) or any(
        not isinstance(x, str) for x in segment_shot_ids
    ):
        return JSONResponse(
            {"error": "segment_shot_ids must be a list of strings"}, status_code=400
        )
    raw_tag = body.get("semantic_tag")
    if not isinstance(raw_tag, str):
        return JSONResponse({"error": "semantic_tag must be a string"}, status_code=400)
    semantic_tag = raw_tag.strip().upper()
    provenance = body.get("reconstructed_provenance") or {}

    if (
        not orphan_filename
        or not target_episode
        or not segment_shot_ids
        or not semantic_tag
    ):
        return JSONResponse(
            {
                "error": "orphan_filename, target_episode, segment_shot_ids, and semantic_tag are all required"
            },
            status_code=400,
        )
    if not re.match(r"^[A-Z][A-Z0-9]*(?:_[A-Z][A-Z0-9]*)*$", semantic_tag):
        return JSONResponse(
            {
                "error": f"semantic_tag must match [A-Z][A-Z0-9]*(_[A-Z][A-Z0-9]*)*, got {semantic_tag!r}"
            },
            status_code=400,
        )

    ep_match = re.match(r"EP(\d+)", target_episode, re.IGNORECASE)
    if not ep_match:
        return JSONResponse(
            {"error": "target_episode must look like 'EP001'"}, status_code=400
        )
    ep_num = int(ep_match.group(1))
    episode_id = f"EP{ep_num:03d}"

    project_dir = projects_root() / project
    candidate_ep_dirs = [
        project_dir / "renders" / f"ep_{ep_num:03d}",
        project_dir / "output" / "video" / f"ep_{ep_num:03d}",
    ]
    ep_dir = next(
        (
            candidate
            for candidate in candidate_ep_dirs
            if (candidate / "_orphans" / orphan_filename).is_file()
        ),
        candidate_ep_dirs[0],
    )
    orphan_path = ep_dir / "_orphans" / orphan_filename
    try:
        orphan_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "access denied"}, status_code=403)
    if not orphan_path.is_file():
        return JSONResponse(
            {"error": f"Orphan not found: {orphan_path}"}, status_code=404
        )

    from recoil.pipeline.orchestrator.coverage_planner import shot_list_from_ids

    try:
        shot_list = shot_list_from_ids(segment_shot_ids)
    except ValueError as ve:
        return JSONResponse({"error": f"Bad segment_shot_ids: {ve}"}, status_code=400)

    # Phase 3 naming-reset (Bug H fix): provenance MUST carry a model_id —
    # no "unknown" fallback. Reject the request rather than write a
    # non-reproducible filename.
    provenance_model = (provenance or {}).get("model")
    if not provenance_model or not isinstance(provenance_model, str):
        return JSONResponse(
            {
                "error": (
                    "reconstructed_provenance.model is required (non-empty string); "
                    "Phase 3 naming-reset eliminates the 'unknown' default."
                )
            },
            status_code=400,
        )

    store = PassStore(project)
    try:
        counter = store.next_pass_counter(episode_id)
        new_pass_id = f"{episode_id}_PASS_{counter:03d}_SH{shot_list}_{semantic_tag}"
        m_take = re.search(r"_take(\d+)", orphan_filename)
        take_num = int(m_take.group(1)) if m_take else 1
        # Canonical filename grammar (R4 SHORT — project/tag/model live in sidecar).
        from recoil.core.naming import build_filename
        new_video_name = build_filename(
            episode=ep_num,
            pass_counter=counter,
            shot_ids=segment_shot_ids,
            take=take_num,
        )
        new_video_path = ep_dir / new_video_name
        if new_video_path.exists():
            return JSONResponse(
                {"error": f"Destination already exists: {new_video_name}"},
                status_code=409,
            )

        store.create_pass(new_pass_id, segment_shot_ids)
        store.update_pass(
            new_pass_id,
            status="registered",
            video_path=str(new_video_path.relative_to(project_dir)),
            registered_from_orphan=orphan_filename,
            reconstructed_provenance=provenance,
        )
    finally:
        store.close()

    _shutil.move(str(orphan_path), str(new_video_path))

    old_sidecar = orphan_path.parent / f"{orphan_filename}.json"
    new_sidecar_path = ep_dir / f"{new_video_name}.json"
    try:
        sc_data, sidecar_existed = _read_orphan_sidecar_for_reclaim(old_sidecar)
    except SidecarCorruptError as e:
        log.exception(
            "orphan reclaim: corrupt sidecar at %s — refusing to overwrite",
            old_sidecar,
        )
        return JSONResponse(
            {
                "error": "corrupt sidecar — cannot reclaim without "
                         "preserving provenance",
                "path": str(old_sidecar),
                "detail": str(e),
            },
            status_code=500,
        )
    if sidecar_existed:
        _shutil.move(str(old_sidecar), str(new_sidecar_path))
    else:
        new_sidecar_path.write_text("{}", encoding="utf-8")

    sc_data.update(
        {
            "pass_id": new_pass_id,
            "video_path": str(new_video_path.relative_to(project_dir)),
            "format_type": provenance.get("format_type")
            or sc_data.get("format_type")
            or "B",
            "source": "registered_orphan",
            "status": "registered",
            "reconstructed_provenance": provenance,
        }
    )
    new_sidecar_path.write_text(json.dumps(sc_data, indent=2), encoding="utf-8")

    try:
        _maybe_extract_passes(project)
    except Exception as e:
        log.warning("Extraction trigger after register failed: %s", e)

    _invalidate_caches(project)
    return JSONResponse(
        {
            "pass_id": new_pass_id,
            "video_path": str(new_video_path.relative_to(project_dir)),
        }
    )


@app.post("/api/passes/delete_orphan/{project}")
async def delete_orphan(project: str, request: Request):
    """Soft-delete a quarantined orphan to projects/{project}/.trash/{YYYY-MM-DD}/."""
    _validate_project(project)
    import shutil as _shutil
    import datetime as _dt

    body = await request.json()
    orphan_filename = body.get("orphan_filename", "")
    target_episode = body.get("target_episode", "")
    ep_match = re.match(r"EP(\d+)", target_episode, re.IGNORECASE)
    if not orphan_filename or not ep_match:
        return JSONResponse(
            {"error": "orphan_filename and target_episode required"}, status_code=400
        )
    ep_num = int(ep_match.group(1))
    project_dir = projects_root() / project
    orphan_path = (
        project_dir
        / "output"
        / "video"
        / f"ep_{ep_num:03d}"
        / "_orphans"
        / orphan_filename
    )
    try:
        orphan_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "access denied"}, status_code=403)
    if not orphan_path.is_file():
        return JSONResponse({"error": "not found"}, status_code=404)
    today = _dt.date.today().isoformat()
    trash_dir = project_dir / ".trash" / today
    trash_dir.mkdir(parents=True, exist_ok=True)
    dst = trash_dir / orphan_filename
    _shutil.move(str(orphan_path), str(dst))
    old_sc = orphan_path.parent / f"{orphan_filename}.json"
    if old_sc.exists():
        _shutil.move(str(old_sc), str(trash_dir / old_sc.name))
    # R5 B2 fix (2026-05-21)—glob and unlink the orphan's boundary frames.
    # _extract_boundary_frames writes {video_stem}_seg{NN}.jpg into a
    # sibling boundary_frames/ directory; the previous delete_orphan only
    # moved the .mp4 + .mp4.json, leaving the seg jpgs in the SHOTS tree.
    # JT visual review 2026-05-21: "All these segXX shots are still here."
    video_stem = Path(orphan_filename).stem  # "EP001_SH10_take1.mp4" -> "EP001_SH10_take1"
    boundary_dir = orphan_path.parent / "boundary_frames"
    if boundary_dir.exists():
        bf_trash = trash_dir / "boundary_frames"
        bf_trash.mkdir(parents=True, exist_ok=True)
        for seg in boundary_dir.glob(f"{video_stem}_seg*.jpg"):
            _shutil.move(str(seg), str(bf_trash / seg.name))
    # R6 Phase 9 (c2) — also trash the `.{orphan_filename}.extracted` marker file.
    # Markers are created by the extraction pipeline at
    # workspace/server.py:1200 (`marker = ep_dir / f".{video_file.name}.extracted"`)
    # and :1344. delete_orphan previously left them behind, so subsequent extraction
    # passes treated a re-uploaded shot as already-extracted (B2-class leak).
    marker = orphan_path.parent / f".{orphan_path.name}.extracted"
    if marker.exists():
        _shutil.move(str(marker), str(trash_dir / marker.name))
    _invalidate_caches(project)
    return JSONResponse({"deleted": orphan_filename, "trash_path": str(dst)})


@app.post("/api/open-external/{project}")
async def open_external(project: str, request: Request):
    """Open a file in an external macOS app (Preview, etc.)."""
    _validate_project(project)
    body = await request.json()
    rel_path = body.get("path", "")
    app_name = body.get("app", "Preview")
    full_path = projects_root() / project / rel_path
    try:
        full_path.resolve().relative_to(projects_root().resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)
    if not full_path.is_file():
        return JSONResponse({"error": "Not found"}, status_code=404)

    import subprocess

    subprocess.Popen(["open", "-a", app_name, str(full_path)])
    return JSONResponse({"opened": rel_path, "app": app_name})


@app.post("/api/set-start-frame/{project}")
async def set_start_frame(project: str, request: Request):
    """Set an image as the start frame for a specific shot."""
    _validate_project(project)
    body = await request.json()
    rel_path = body.get("path", "")
    shot_id = body.get("shot_id", "")
    if not rel_path or not shot_id:
        return JSONResponse({"error": "path and shot_id required"}, status_code=400)

    full_path = projects_root() / project / rel_path
    if not full_path.is_file():
        return JSONResponse({"error": "File not found"}, status_code=404)

    # Update the shot state with the start frame path
    store = _get_store(project)
    shot = store.get_shot(shot_id)
    if not shot:
        return JSONResponse({"error": f"Shot '{shot_id}' not found"}, status_code=404)

    store.update_shot(shot_id, start_frame=rel_path)

    # Update sidecar on the image
    ws_sidecar.set_status(full_path, "pinned", tags=["start-frame", f"shot:{shot_id}"])

    # Invalidate cache
    _invalidate_caches(project)

    return JSONResponse(
        {
            "set_start_frame": rel_path,
            "shot_id": shot_id,
        }
    )


def _find_pass_video(project: str, pass_id: str) -> Path | None:
    """Locate the canonical video file on disk for a pass_id (latest take)."""
    from recoil.execution.pass_store import PassStore

    project_dir = projects_root() / project
    ep_match = re.match(r"^(EP\d+)", pass_id)
    if not ep_match:
        return None
    ep_num = int(ep_match.group(1)[2:])
    candidate_dirs = [
        project_dir / "output" / "video" / f"ep_{ep_num:03d}",
        project_dir / "renders" / f"ep_{ep_num:03d}",
    ]

    def _take_key(p: Path) -> int:
        parsed = parse_pass_filename(p.name)
        if parsed:
            return int(parsed["take_num"])
        return read_take_number(p.name) or 0

    store = PassStore(project)
    try:
        record = store.get_pass(pass_id)
        records = {p["pass_id"]: p for p in store.list_passes(ep_match.group(1))}
    except ValueError:
        record = None
        records = {}
    finally:
        store.close()

    if record and record.get("video_path"):
        recorded_path = project_dir / record["video_path"]
        if recorded_path.is_file():
            return recorded_path

    candidates: list[Path] = []
    for ep_dir in candidate_dirs:
        if not ep_dir.is_dir():
            continue
        for video in ep_dir.glob("*.mp4"):
            parsed = parse_pass_filename(video.name)
            if not parsed or parsed.get("strategy") != "coverage":
                continue
            resolved_id, _record = _resolve_pass_record_for_video(
                project=project,
                video_file=video,
                parsed=parsed,
                records=records,
            )
            if resolved_id == pass_id:
                candidates.append(video)

    candidates = sorted(candidates, key=_take_key)
    return candidates[-1] if candidates else None


@app.get("/api/passes/{pass_id}/detected_cuts/{project}")
async def get_detected_cuts(pass_id: str, project: str):
    """Return detected + expected cut points for the segmentation scrubber."""
    _validate_project(project)
    from recoil.execution.pass_store import PassStore

    store = PassStore(project)
    try:
        record = store.get_pass(pass_id)
    except ValueError:
        record = None
    finally:
        store.close()
    if not record:
        return JSONResponse({"error": "pass not found"}, status_code=404)

    video = _find_pass_video(project, pass_id)
    if not video or not video.is_file():
        return JSONResponse({"error": "video not found"}, status_code=404)

    rel = str(video.relative_to(projects_root() / project))
    try:
        duration = _probe_duration_seconds(video) or 0.0
    except MediaProbeError as e:
        log.exception("detected_cuts: probe failed for %s", video)
        return JSONResponse(
            {"error": "media probe failed", "detail": str(e)}, status_code=500,
        )
    detected = list(record.get("scene_detection_raw") or [])
    num_segments = len(record.get("segment_shot_ids") or [])
    expected = []
    if num_segments > 1 and duration > 0:
        slice_s = duration / num_segments
        expected = [slice_s * (i + 1) for i in range(num_segments - 1)]

    return JSONResponse(
        {
            "pass_id": pass_id,
            "video_url": f"/media/{project}/{rel}",
            "duration_s": duration,
            "detected_timestamps": detected,
            "expected_timestamps": expected,
            "segment_shot_ids": record.get("segment_shot_ids") or [],
        }
    )


@app.post("/api/passes/{pass_id}/confirm_timestamps/{project}")
async def confirm_timestamps(pass_id: str, project: str, request: Request):
    """Write human-confirmed timestamps and trigger re-extraction."""
    _validate_project(project)
    from recoil.execution.pass_store import PassStore

    body = await request.json()
    cut_points = body.get("cut_points")
    confirmed_map = body.get("confirmed_timestamps")

    store = PassStore(project)
    try:
        record = store.get_pass(pass_id)
    except ValueError:
        record = None
    finally:
        store.close()
    if not record:
        return JSONResponse({"error": "pass not found"}, status_code=404)

    video = _find_pass_video(project, pass_id)
    if not video or not video.is_file():
        return JSONResponse({"error": "video not found"}, status_code=404)
    try:
        duration = _probe_duration_seconds(video) or 0.0
    except MediaProbeError as e:
        log.exception("confirm_cuts: probe failed for %s", video)
        return JSONResponse(
            {"error": "media probe failed", "detail": str(e)}, status_code=500,
        )

    if confirmed_map is not None:
        if not isinstance(confirmed_map, dict):
            return JSONResponse(
                {"error": "confirmed_timestamps must be an object"}, status_code=400
            )
        timestamps_map = confirmed_map
    elif isinstance(cut_points, list):
        try:
            pts = sorted(float(c) for c in cut_points)
        except (TypeError, ValueError) as e:
            return JSONResponse(
                {"error": f"cut_points must be numeric: {e}"}, status_code=400
            )
        boundaries = [0.0] + pts + [duration]
        seg_ids = record.get("segment_shot_ids") or []
        if len(boundaries) - 1 != len(seg_ids):
            return JSONResponse(
                {
                    "error": f"cut count {len(pts)} inconsistent with segment count {len(seg_ids)}"
                },
                status_code=400,
            )
        timestamps_map = {}
        for i in range(len(seg_ids)):
            timestamps_map[str(i)] = {
                "start": boundaries[i],
                "end": boundaries[i + 1],
            }
    else:
        return JSONResponse(
            {"error": "either cut_points or confirmed_timestamps required"},
            status_code=400,
        )

    # Delete existing extracted segments (they came from bad timestamps)
    ep_dir = video.parent
    counter_from_id = pass_id.split("_")[2] if pass_id.count("_") >= 2 else "000"
    for seg in ep_dir.glob(f"shot_*_FROM_PASS_{counter_from_id}_*_take*.mp4"):
        try:
            seg.unlink()
        except OSError:
            pass
        sc = ep_dir / f"{seg.name}.json"
        if sc.exists():
            sc.unlink(missing_ok=True)
    for mk in ep_dir.glob(f".{video.name}.extracted"):
        mk.unlink(missing_ok=True)

    # Set intermediate status so UI can surface in-flight state and we have a rollback target.
    store = PassStore(project)
    try:
        store.update_pass(
            pass_id,
            confirmed_timestamps=timestamps_map,
            cuts_diverged=False,
            extraction_method="human_confirmed",
            status="extracting",
        )
    finally:
        store.close()

    try:
        _maybe_extract_passes(project)
    except Exception as e:
        log.warning(
            "Extraction failed for %s after timestamp confirmation: %s", pass_id, e
        )
        # Revert so UI surfaces the review state again
        store2 = PassStore(project)
        try:
            store2.update_pass(pass_id, status="segmentation_review")
        finally:
            store2.close()
        _invalidate_caches(project)
        return JSONResponse({"error": f"extraction failed: {e}"}, status_code=500)

    # Success — flip to extracted
    store3 = PassStore(project)
    try:
        store3.update_pass(pass_id, status="extracted")
    finally:
        store3.close()

    _invalidate_caches(project)
    return JSONResponse({"confirmed": pass_id, "timestamps": timestamps_map})


# ── Reject / regenerate (Phase 8) ─────────────────────────────


def _build_generate_command(
    project: str,
    episode: int,
    pass_ids: list,
    dry_run: bool = False,
    validate: bool = False,
) -> list:
    """Build the argv list for recoil/pipeline/cli/generate.py."""
    generate_cli = _RECOIL_ROOT / "pipeline" / "cli" / "generate.py"
    argv = [
        sys.executable,
        str(generate_cli),
        "--project",
        project,
        "--episode",
        str(episode),
    ]
    if validate:
        argv.append("--validate")
    elif dry_run:
        argv.append("--dry-run")
    if pass_ids:
        if len(pass_ids) == 1:
            argv += ["--pass", pass_ids[0]]
        else:
            argv += ["--passes", ",".join(pass_ids)]
    return argv


@app.post("/api/shots/{shot_id}/reject_segment/{project}")
async def reject_segment(shot_id: str, project: str, request: Request):
    """Mark a single extracted segment as rejected and record the reason."""
    _validate_project(project)
    body = await request.json()
    segment_path_rel = body.get("path", "")
    reason = body.get("reason", "")
    if not isinstance(reason, str):
        return JSONResponse({"error": "reason must be a string"}, status_code=400)
    if not segment_path_rel:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = projects_root() / project
    full = project_dir / segment_path_rel
    try:
        full.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "access denied"}, status_code=403)
    if not full.is_file():
        return JSONResponse({"error": "segment file not found"}, status_code=404)

    try:
        sc = ws_sidecar.read_sidecar(full) or {}
    except SidecarCorruptError as e:
        log.exception(
            "reject_segment: corrupt sidecar at %s — refusing to overwrite",
            full,
        )
        return JSONResponse(
            {
                "error": "corrupt sidecar — cannot reject without "
                         "preserving provenance",
                "path": segment_path_rel,
                "detail": str(e),
            },
            status_code=500,
        )
    sc_path = full.parent / f"{full.name}.json"
    sc.update(
        {
            "status": "rejected",
            "rejection_reason": reason,
            "replaced_by": sc.get("replaced_by"),
            "shot_id": shot_id,
        }
    )
    sc_path.write_text(json.dumps(sc, indent=2), encoding="utf-8")
    _invalidate_caches(project)
    return JSONResponse({"rejected": segment_path_rel, "shot_id": shot_id})


def _patch_segment_replaced_by(
    project: str,
    episode_num: int,
    shot_num: str,
    original_pass_id: str,
    new_pass_id: str,
) -> None:
    """Stamp ``replaced_by`` on the rejected segment's sidecars under the v3
    renders dir.

    REC-238: this previously scanned the deprecated ``output/video`` dir, so on
    v3 projects (segments live under ``renders/ep_NNN``) the glob matched
    nothing and ``replaced_by`` was never written — a silent provenance gap.
    Raises ``SidecarCorruptError`` on a corrupt sidecar; the caller maps it to a
    500 (provenance must not be patched over corruption).
    """
    ep_dir = ProjectPaths.for_project(project).episode_renders_dir(episode_num)
    counter_orig = (
        original_pass_id.split("_")[2] if "_" in original_pass_id else "000"
    )
    for f in ep_dir.glob(f"shot_{shot_num}_FROM_PASS_{counter_orig}_*_take*.mp4"):
        sc_path = ep_dir / f"{f.name}.json"
        if sc_path.exists():
            sc = ws_sidecar.read_sidecar(f) or {}
            sc["status"] = sc.get("status", "rejected")
            sc["replaced_by"] = new_pass_id
            sc_path.write_text(json.dumps(sc, indent=2), encoding="utf-8")


@app.post("/api/shots/{shot_id}/regenerate/{project}")
async def regenerate_shot(shot_id: str, project: str, request: Request):
    """Create a new single-shot pass with lineage and dispatch StepRunner."""
    _validate_project(project)
    from recoil.execution.pass_store import PassStore
    import subprocess as _sp

    body = await request.json()
    original_pass_id = body.get("original_pass_id", "")
    if not isinstance(original_pass_id, str):
        return JSONResponse(
            {"error": "original_pass_id must be a string"}, status_code=400
        )
    # Client sends shot_id; server resolves segment_index from the original pass record.
    body_shot_id = body.get("shot_id") or shot_id
    if not isinstance(body_shot_id, str):
        return JSONResponse({"error": "shot_id must be a string"}, status_code=400)
    reason = body.get("reason", "")
    if not isinstance(reason, str):
        return JSONResponse({"error": "reason must be a string"}, status_code=400)
    raw_tag = body.get("semantic_tag")
    if raw_tag is not None and not isinstance(raw_tag, str):
        return JSONResponse({"error": "semantic_tag must be a string"}, status_code=400)
    semantic_tag = (raw_tag or "").strip().upper() or "REGEN"
    dispatch = bool(body.get("dispatch", True))

    if not shot_id or not original_pass_id:
        return JSONResponse(
            {"error": "shot_id and original_pass_id required"},
            status_code=400,
        )

    ep_match = re.match(r"^(EP\d+)", shot_id)
    if not ep_match:
        return JSONResponse({"error": "shot_id must start with EP..."}, status_code=400)
    episode_id = ep_match.group(1).upper()
    episode_num = int(episode_id[2:])

    from recoil.pipeline.orchestrator.coverage_planner import shot_list_from_ids

    try:
        shot_list = shot_list_from_ids([shot_id])
    except ValueError as ve:
        return JSONResponse({"error": str(ve)}, status_code=400)

    original_record = None
    store = PassStore(project)
    try:
        # Resolve the original pass record first so we can derive segment_index.
        try:
            original_record = store.get_pass(original_pass_id)
        except ValueError:
            original_record = None
        if original_record is None:
            # Accept a prefix-only ID for client convenience
            candidates = [
                p
                for p in store.list_passes(episode_id)
                if p.get("pass_id", "").startswith(original_pass_id)
            ]
            if candidates:
                original_record = candidates[0]
                original_pass_id = original_record["pass_id"]
        # Derive segment_index from the original record's segment_shot_ids.
        segment_index = 0
        if original_record:
            seg_ids = original_record.get("segment_shot_ids") or []
            try:
                segment_index = seg_ids.index(body_shot_id)
            except ValueError:
                segment_index = 0

        counter = store.next_pass_counter(episode_id)
        new_pass_id = f"{episode_id}_PASS_{counter:03d}_SH{shot_list}_{semantic_tag}"
        store.create_pass(new_pass_id, [shot_id])
        store.update_pass(
            new_pass_id,
            lineage_ref={
                "original_pass_id": original_pass_id,
                "segment_index": segment_index,
                "reason": reason,
            },
            status="pending",
        )
    finally:
        store.close()

    # Patch the rejected segment sidecar with replaced_by (v3 renders dir — REC-238)
    if original_record:
        shot_num = _normalize_shot_num(shot_id) or ""
        try:
            _patch_segment_replaced_by(
                project, episode_num, shot_num, original_pass_id, new_pass_id
            )
        except SidecarCorruptError as e:
            log.exception(
                "regenerate: corrupt sidecar at %s — skipping replaced_by patch",
                e.path,
            )
            return JSONResponse(
                {
                    "error": "corrupt sidecar — cannot patch "
                             "replaced_by without preserving provenance",
                    "path": e.path,
                    "detail": str(e),
                },
                status_code=500,
            )

    dispatched = False
    if dispatch:
        argv = _build_generate_command(
            project=project,
            episode=episode_num,
            pass_ids=[new_pass_id],
            dry_run=False,
            validate=False,
        )
        try:
            _sp.Popen(argv, cwd=str(_RECOIL_ROOT.parent))
            dispatched = True
        except Exception as e:
            log.warning("Regenerate dispatch failed: %s", e)

    _invalidate_caches(project)
    return JSONResponse(
        {
            "new_pass_id": new_pass_id,
            "dispatched": dispatched,
        }
    )


# ── Routes: Media ──────────────────────────────────────────────


@app.get("/media/{path:path}")
async def serve_media(path: str):
    """Serve media files from the projects root.

    Path format: {project}/renders/ep_001/shot_001.png
    or just: {project}/{relative_path}
    """
    full_path = projects_root() / path

    # BUG-6 fix: Security check BEFORE file existence check
    try:
        full_path.resolve().relative_to(projects_root().resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not full_path.is_file():
        return JSONResponse({"error": f"File not found: {path}"}, status_code=404)

    # Determine content type
    ext = full_path.suffix.lower()
    content_types = {
        ".png": "image/png",
        ".jpg": "image/jpeg",
        ".jpeg": "image/jpeg",
        ".webp": "image/webp",
        ".mp4": "video/mp4",
        ".mov": "video/quicktime",
        ".webm": "video/webm",
    }
    ct = content_types.get(ext, "application/octet-stream")

    return FileResponse(
        full_path,
        media_type=ct,
        headers={"Cache-Control": "no-store"},
    )


# ── Backward-compat re-exports for legacy callers ──────────────────
# These names keep `workspace.server` importable from external tests + tools
# (migrate_pass_names.py, test_pass_naming.py, test_scene_alignment.py, etc.)
# that bound to the pre-MF-5 module layout. Console v2's v2_dispatch.py
# imports from recoil.workspace.tree / recoil.workspace.coverage directly and does NOT use
# these re-exports. Phase D's MF-3 proxy-removal CP drops these after callers
# migrate.
#
# Pattern aliases are retained for external tests/tools. In-module parsing now
# routes through parse_pass_filename so the workspace follows core.naming.
_PASS_PATTERN = PASS_PATTERN
_SHOT_PATTERN = SHOT_PATTERN
_parse_pass_filename = parse_pass_filename
_parse_shot_filename = parse_shot_filename
_normalize_shot_num = normalize_shot_num
_group_by_pass_anchors = group_by_pass_anchors
_group_flat_by_stem = group_flat_by_stem
_group_episode_files_by_shot = group_episode_files_by_shot
_build_metadata_index = build_metadata_index
_scan_output_dir = scan_output_dir
_best_status = best_status
_coverage_summary_for_episode = coverage_summary_for_episode
_recent_activity_for_episode = recent_activity_for_episode
_STATUS_PRIORITY = STATUS_PRIORITY


# ── CLI Entry Point ────────────────────────────────────────────


def main():
    global _DEFAULT_PROJECT

    parser = argparse.ArgumentParser(description="Recoil Workspace Server")
    parser.add_argument("--project", default="tartarus", help="Default project name")
    parser.add_argument("--port", type=int, default=8450, help="Server port")
    parser.add_argument("--host", default="127.0.0.1", help="Server host")
    args = parser.parse_args()

    _DEFAULT_PROJECT = args.project

    # Set the project in workspace state on startup
    ws_state.set_project(args.project)

    log.info(
        "Starting Recoil Workspace on %s:%d (project: %s)",
        args.host,
        args.port,
        args.project,
    )
    log.info("Projects root: %s", projects_root())

    import uvicorn

    uvicorn.run(app, host=args.host, port=args.port, log_level="info")


if __name__ == "__main__":
    main()
