"""Beat / Take adapter — bridges projects/{slug}/state/visual/shots/{beat_id}.json
to the HTTP shape.

Phase 16 contract:
  list_beats(project_id, episode_id, scene_id) → list[Beat]
  list_takes(beat_id, project_id) → list[Take]  # project_id required (Phase 3)

Phase 19 contract (mutations — first writes from this codebase):
  set_primary(take_id, project_id=None)
  toggle_circled(take_id, project_id=None)
  reject_take(take_id, project_id=None)

CP-7 Beat/Take/Scene live in-memory only — there is no disk persistence yet.
This adapter reads the existing per-shot state files JT's pipeline already
produces and exposes them under the new HTTP shape. When CP-N+ ships
disk-backed CP-7 we swap the read path; the HTTP contract is unchanged.

The Phase 19 mutation helpers ARE the first writes from this codebase to
projects/{slug}/state/visual/shots/*.json — JT-curated artifacts. We use
atomic write-and-rename (write to .tmp + fsync + os.replace) so a partial
write cannot corrupt shot state.
"""

from __future__ import annotations

import json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional

from recoil.api.adapters._ids import (
    validate_hierarchy_id,
    validate_project_id,
)
from recoil.api.fallback_bridge import emit_fallback
from recoil.api.schemas.engine import (
    SCHEMA_VERSION,
    Beat,
    BeatStatus,
    Episode,
    EvalState,
    MediaKind,
    MissingCanonicalFieldError,
    Scene,
    Take,
    TakeStatus,
)
from recoil.core.atomic_write import atomic_write_json
from recoil.core.paths import projects_root, ProjectPaths

logger = logging.getLogger(__name__)


def _shots_dir(project_id: str) -> Path:
    return ProjectPaths.for_project(project_id).shots_dir


# Beat files: any JSON in the shots directory that isn't a dot-file.
# Older projects (e.g. driver-beware) use free-form names like
# V4_CAUTIONARY_SH01.json, SEEDANCE_I2V_1778026133.json, SEQ11_SH01.json.
# We read all of them and synthesize episode_id from the filename when the
# field is empty (see _derive_episode_id).
_BEAT_FILE_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_]*\.json$")


def _list_beat_files(project_id: str) -> list[Path]:
    d = _shots_dir(project_id)
    if not d.exists():
        return []
    return sorted(p for p in d.iterdir() if _BEAT_FILE_RE.match(p.name))


_EP_PATH_RE = re.compile(r"/ep[_-]?(\d+)/")


class EpisodeIdUnresolvable(ValueError):
    """Reserved for the future when the sanctioned fallback below is removed."""


def _derive_episode_id(shot: dict, path: Path, project_id: str = "") -> str | None:
    """Return episode_id, preferring structured sources only.

    Resolution ladder (Law 4 three-prong test):
      1. shot['episode_id'] (if non-empty string) — CANONICAL
      2. Parse from shot['output_path'] — contains /ep_NNN/ for driver-beware
      3. Parse from filename with _SH delimiter (microdrama convention)
      4. None for REF_-prefixed files (reference assets, not shots)

    Build A Phase 4: the filename-prefix sanctioned fallback was retired.
    Callers that encounter a shot with no resolvable episode_id MUST raise
    MissingCanonicalFieldError at the route boundary; this helper itself
    returns None and the caller decides whether absence is fatal (microdrama
    list_beats / _synthesize_episodes) or expected (REF_ files).
    """
    ep = shot.get("episode_id")
    if isinstance(ep, str) and ep.strip():
        return ep.strip()

    output_path = shot.get("output_path") or ""
    ep_match = _EP_PATH_RE.search(output_path)
    if ep_match:
        return f"ep_{ep_match.group(1).zfill(3)}"

    stem = path.stem
    if stem.startswith("REF_"):
        return None

    sh_idx = stem.find("_SH")
    if sh_idx > 0:
        return stem[:sh_idx].split("_")[0]

    # Build A Phase 4: filename-prefix fallback retired — return None so the
    # caller can either skip (synthesis) or raise (route boundary).
    return None


def _is_ref_file(path: Path) -> bool:
    """REF_-prefixed files are reference assets, not shots — a None
    episode_id for them is EXPECTED, not a missing canonical field."""
    return path.stem.startswith("REF_")


def _require_episode_id(shot: dict, path: Path, project_id: str) -> str:
    """Resolve episode_id or raise MissingCanonicalFieldError.

    Build A Phase 4 retired the filename-prefix fallback, so a real shot
    file with no resolvable episode_id is now a structurally-absent canonical
    field — fail loud (422 at the route boundary) instead of silently
    dropping the shot. REF_ files are excluded by the caller before this is
    reached (they legitimately have no episode_id).
    """
    ep = _derive_episode_id(shot, path, project_id)
    if ep is not None:
        return ep
    raise MissingCanonicalFieldError(
        field="episode_id",
        project_id=project_id,
    )


def _load_shot(path: Path) -> Optional[dict]:
    try:
        with path.open("r", encoding="utf-8") as fh:
            return json.load(fh)
    except (json.JSONDecodeError, OSError) as exc:
        emit_fallback(
            "shot_file_unreadable_drop",
            scope="api/adapters/beats",
            payload={"path": str(path), "error": str(exc)},
        )
        return None


def _shot_status_to_beat_status(raw: Optional[str]) -> BeatStatus:
    """Coerce free-form shot.status to the canonical BeatStatus enum.

    Empirical statuses observed: pending_qc, video_failed, complete,
    locked, draft, blocked, queued, running. The canonical Beat enum is
    pending|running|blocked|locked|draft.
    """
    if not raw:
        return "pending"
    s = str(raw).lower()
    if "lock" in s:
        return "locked"
    if "block" in s or "fail" in s:
        return "blocked"
    if "draft" in s:
        return "draft"
    if s in ("running", "in_progress", "in-progress", "queued"):
        return "running"
    return "pending"


def _take_status(raw_take: dict) -> TakeStatus:
    if raw_take.get("rejected"):
        return "failed"
    if raw_take.get("error") or raw_take.get("error_message"):
        return "failed"
    if raw_take.get("file_path") or raw_take.get("output_path"):
        return "succeeded"
    emit_fallback(
        "take_status_unsignaled_default_queued",
        scope="api/adapters/beats",
        payload={"take_id": raw_take.get("take_id"), "keys": sorted(raw_take.keys())},
    )
    return "queued"


def _take_eval_state(raw_take: dict) -> EvalState:
    g1 = raw_take.get("gate_1") or raw_take.get("gate_2") or {}
    if isinstance(g1, dict):
        passed = g1.get("passed")
        if passed is True:
            return "pass"
        if passed is False:
            return "fail"
    if raw_take.get("rejected"):
        return "fail"
    if raw_take.get("disposition") == "approved":
        return "pass"
    if raw_take.get("disposition") == "rejected":
        return "fail"
    emit_fallback(
        "take_eval_state_unsignaled_default_pending",
        scope="api/adapters/beats",
        payload={"take_id": raw_take.get("take_id"), "keys": sorted(raw_take.keys())},
    )
    return "pending"


def _take_media(raw_shot: dict, raw_take: dict) -> Optional[MediaKind]:
    fp = raw_take.get("file_path") or raw_take.get("output_path") or ""
    fp = str(fp).lower()
    if fp.endswith(".mp4") or fp.endswith(".mov"):
        return "video"
    if fp.endswith((".png", ".jpg", ".jpeg", ".webp")):
        return "still"
    if fp.endswith((".mp3", ".wav", ".flac", ".m4a")):
        return "audio"
    pipeline = (raw_shot.get("pipeline") or "").lower()
    if "video" in pipeline:
        return "video"
    if pipeline in ("still", "previz", "image"):
        return "still"
    return None


def _ts_to_iso(ts: Any) -> datetime:
    """Coerce a take timestamp (epoch float, ISO string) to datetime."""
    if isinstance(ts, (int, float)):
        try:
            return datetime.fromtimestamp(float(ts), tz=timezone.utc)
        except (OSError, ValueError):
            return datetime.now(tz=timezone.utc)
    if isinstance(ts, str):
        try:
            return datetime.fromisoformat(ts.replace("Z", "+00:00"))
        except ValueError:
            pass
    return datetime.now(tz=timezone.utc)


def _build_beat(shot: dict) -> Beat:
    beat_id = shot.get("shot_id") or shot.get("id") or "UNKNOWN"
    takes = shot.get("takes") or []
    primary = None
    # Heuristic: first non-rejected take with a file_path is the primary.
    for t in takes:
        if not isinstance(t, dict):
            continue
        if t.get("rejected"):
            continue
        if t.get("file_path") or t.get("output_path"):
            primary = t.get("take_id") or _synthetic_take_id(beat_id, t, takes.index(t))
            break
    return Beat(
        schema_version=SCHEMA_VERSION,
        id=beat_id,
        name=beat_id,
        status=_shot_status_to_beat_status(shot.get("status")),
        takes=len([t for t in takes if isinstance(t, dict)]),
        primary=primary,
        score=None,
        focused=False,
    )


def _synthetic_take_id(beat_id: str, raw_take: dict, idx: int) -> str:
    """Stable synthetic take id when the shot file omits one."""
    n = raw_take.get("take_number") or raw_take.get("take_num")
    if n is not None:
        return f"{beat_id}_T{int(n):03d}"
    return f"{beat_id}_T{idx:03d}"


def _resolve_aspect_for_take(
    raw_take: dict, project_id: Optional[str]
) -> Optional[str]:
    """Resolve aspect_ratio for a take. Order: take.aspect_ratio (explicit) →
    project.aspect_ratio (via Project(slug)). Returns None if neither resolves."""
    if raw_take.get("aspect_ratio"):
        return str(raw_take["aspect_ratio"])
    if not project_id:
        return None
    try:
        from recoil.core.project import get_project
        from recoil.core.project import AspectUnresolvable

        return get_project(project_id).aspect_ratio
    except (FileNotFoundError, AspectUnresolvable, json.JSONDecodeError):
        return None


def _build_take(
    beat_id: str, raw_take: dict, idx: int, project_id: Optional[str] = None
) -> Take:
    take_id = raw_take.get("take_id") or _synthetic_take_id(beat_id, raw_take, idx)
    c1 = raw_take.get("cost")
    c2 = raw_take.get("cost_usd")
    raw_cost = c1 if c1 not in (None, "") else c2
    cost = float(raw_cost) if raw_cost not in (None, "") else None
    model = raw_take.get("model") or raw_take.get("model_id")
    ts = _ts_to_iso(raw_take.get("timestamp") or raw_take.get("ts"))
    media = _take_media({}, raw_take)
    warnings_raw = raw_take.get("warnings") or []
    warnings = (
        [str(w) for w in warnings_raw if w] if isinstance(warnings_raw, list) else []
    )
    rel_path = raw_take.get("file_path") or raw_take.get("output_path") or ""
    url = f"/api/media/{project_id}/{rel_path}" if project_id and rel_path else None
    return Take(
        schema_version=SCHEMA_VERSION,
        id=take_id,
        beat_id=beat_id,
        idx=idx,
        status=_take_status(raw_take),
        eval_state=_take_eval_state(raw_take),
        score=None,
        model=model,
        cost_gen=cost,
        cost_eval=0.0,
        ts=ts,
        eta=None,
        progress=None,
        primary=False,
        circled=False,
        hidden=bool(raw_take.get("hidden")),
        media=media,
        failure_mode=raw_take.get("failure_mode"),
        warnings=warnings,
        aspect=_resolve_aspect_for_take(raw_take, project_id),
        url=url,
    )


# ── Public API ────────────────────────────────────────────────────────────


def _synthesize_episodes(project_id: str) -> list[Episode]:
    """Walk the project's shot files; return one Episode per distinct
    ``episode_id``, sorted deterministically, ``synthesized=True``,
    ``scenes=[]`` (lazy — clients call ``list_scenes`` for the next level).

    Single source of truth for the P3 synthesis. Both
    ``adapters.projects._build_project`` and the public ``list_episodes``
    delegate here so the projection cannot drift.
    """
    validate_project_id(project_id)
    # Build B Phase 7: gate via Project SSOT.
    try:
        from recoil.core.project import get_project as _core_get_project

        proj = _core_get_project(project_id)
    except Exception as e:
        emit_fallback(
            "project_load_failure_isolated",
            scope="api/adapters/beats._synthesize_episodes",
            payload={
                "project_id": project_id,
                "error": str(e),
                "error_type": type(e).__name__,
            },
        )
        proj = None
    if proj is not None and not proj.supports_episodes:
        return []
    seen: set[str] = set()
    for path in _list_beat_files(project_id):
        shot = _load_shot(path)
        if shot is None:
            continue
        if _is_ref_file(path):
            # Reference asset — legitimately has no episode_id; skip.
            continue
        # Build A Phase 4: a non-REF shot with no resolvable episode_id is a
        # structurally-absent canonical field — raise (422) instead of
        # silently dropping it from the synthesized episode set.
        seen.add(_require_episode_id(shot, path, project_id))
    return [
        Episode(
            schema_version=SCHEMA_VERSION,
            id=ep_id,
            name=ep_id,
            status="pending",
            score=None,
            scenes=[],
            synthesized=True,
        )
        for ep_id in sorted(seen)
    ]


SYNTHETIC_SCENE_ID_SUFFIX = "__synthetic_scene_1"


def synthetic_scene_id(episode_id: str) -> str:
    """The id of the single synthetic scene under ``episode_id``.

    Until shot files learn ``scene_id``, the hierarchy collapses every
    episode to one synthetic scene. The format is load-bearing — list_beats'
    equality check + list_scenes' synthesis + frontend node decoration all
    derive from this single helper. Renaming the suffix is a one-line change.
    """
    return f"{episode_id}{SYNTHETIC_SCENE_ID_SUFFIX}"


def list_episodes(project_id: str) -> list[Episode]:
    """Public surface — synthesized episode list for ``project_id``.

    Mirrors ``_synthesize_episodes`` (single helper, no duplication —
    Anti-Pattern 5). Used by the ``/api/projects/{pid}/episodes`` route.
    """
    return _synthesize_episodes(project_id)


def list_scenes(project_id: str, episode_id: str) -> list[Scene]:
    """Return the single canonical synthetic scene for ``episode_id``.

    Build A Phase 4: the ``scenes_synthesized_one_per_episode`` sanctioned
    fallback was retired. The single-synthetic-scene-per-episode shape is
    now the canonical route convention until shot files learn ``scene_id``;
    the ``synthesized=True`` flag on each Scene is the operator-visible
    signal, not a FALLBACK_FIRED bus event.

    Bogus ``episode_id`` (no shot file references it) raises ``KeyError``;
    the route handler maps to 404 — bogus ids are a user error.
    """
    validate_project_id(project_id)
    validate_hierarchy_id("episode_id", episode_id)
    episodes = _synthesize_episodes(project_id)
    if not any(e.id == episode_id for e in episodes):
        raise KeyError(f"episode_id not found: {episode_id}")
    return [
        Scene(
            schema_version=SCHEMA_VERSION,
            id=synthetic_scene_id(episode_id),
            name="(synthetic scene)",
            status="pending",
            score=None,
            beat_list=[],
            synthesized=True,
        )
    ]


def list_beats(project_id: str, episode_id: str, scene_id: str) -> list[Beat]:
    """List beats for a (project, episode, scene).

    Under the P3 synthesis model every episode has exactly ONE synthetic
    scene (``<episode_id>__synthetic_scene_1``); shot files do not yet carry
    ``scene_id``, so we filter by ``episode_id`` only and return all beats.
    When shot files learn ``scene_id`` (engine-side migration; see
    ``.out-of-scope/hierarchy-enumeration-engine-side.md``), this function
    grows real scene-level filtering.

    Defensive check: if ``scene_id`` is not the synthetic name AND not
    present as a real scene_id in any shot for this episode, raise
    ``KeyError``. Bogus scene_id is a 404, not a quality-neutral fallback;
    the route handler maps ``KeyError`` to ``HTTPException(404)``.

    Path-traversal guard (Debug R1): all three IDs are validated against
    their canonical regexes before any filesystem access.
    """
    validate_project_id(project_id)
    if episode_id == "_all" and scene_id == "_all":
        try:
            from recoil.core.project import get_project as _core_get_project

            proj = _core_get_project(project_id)
        except Exception as e:
            emit_fallback(
                "project_load_failure_isolated",
                scope="api/adapters/beats.list_beats",
                payload={
                    "project_id": project_id,
                    "error": str(e),
                    "error_type": type(e).__name__,
                },
            )
            proj = None
        if proj is not None and not proj.supports_episodes:
            return [
                _build_beat(s)
                for s in (_load_shot(p) for p in _list_beat_files(project_id))
                if s is not None
            ]
        raise KeyError(f"scene_id not found: {scene_id}")
    validate_hierarchy_id("episode_id", episode_id)
    validate_hierarchy_id("scene_id", scene_id)
    if scene_id != synthetic_scene_id(episode_id):
        raise KeyError(f"scene_id not found: {scene_id}")
    # Verify the episode_id is real (must appear in at least one shot file).
    if not any(e.id == episode_id for e in _synthesize_episodes(project_id)):
        raise KeyError(f"episode_id not found: {episode_id}")
    out: list[Beat] = []
    for path in _list_beat_files(project_id):
        shot = _load_shot(path)
        if shot is None:
            continue
        if _is_ref_file(path):
            # Reference asset — legitimately has no episode_id; skip.
            continue
        # Build A Phase 4: a non-REF shot with no resolvable episode_id is a
        # structurally-absent canonical field — raise (422) instead of
        # silently dropping it. A derivable-but-different episode_id is a
        # legitimate filter miss and continues.
        if _require_episode_id(shot, path, project_id) != episode_id:
            continue
        out.append(_build_beat(shot))
    return out


def list_takes(beat_id: str, project_id: str) -> list[Take]:
    """List takes for a beat.

    project_id is required — callers must pass the project scope.

    Path-traversal guard (Debug R1): IDs validated before any filesystem
    access.
    """
    validate_hierarchy_id("beat_id", beat_id)
    validate_project_id(project_id)
    path = _shots_dir(project_id) / f"{beat_id}.json"
    if not path.exists():
        return []
    shot = _load_shot(path)
    if shot is None:
        return []
    takes = shot.get("takes") or []
    out: list[Take] = []
    for i, raw in enumerate(takes):
        if not isinstance(raw, dict):
            continue
        out.append(_build_take(beat_id, raw, i, project_id))
    return out


def get_beat(beat_id: str, project_id: str) -> Optional[Beat]:
    """Single-beat lookup. Used by lineage adapter."""
    validate_hierarchy_id("beat_id", beat_id)
    validate_project_id(project_id)
    path = _shots_dir(project_id) / f"{beat_id}.json"
    if not path.exists():
        return None
    shot = _load_shot(path)
    if shot is None:
        return None
    return _build_beat(shot)


def get_shot_dict(beat_id: str, project_id: str) -> Optional[dict]:
    """Raw shot dict — exposed for the lineage adapter."""
    validate_hierarchy_id("beat_id", beat_id)
    validate_project_id(project_id)
    path = _shots_dir(project_id) / f"{beat_id}.json"
    if path.exists():
        return _load_shot(path)
    return None


def get_episode_id_for_beat(beat_id: str, project_id: str) -> str | None:
    """Resolve episode_id for a beat by loading its shot file.

    Public wrapper around _derive_episode_id for use by external callers.
    """
    validate_hierarchy_id("beat_id", beat_id)
    validate_project_id(project_id)
    path = _shots_dir(project_id) / f"{beat_id}.json"
    if not path.exists():
        return None
    shot = _load_shot(path)
    if shot is None:
        return None
    return _derive_episode_id(shot, path, project_id)


# ── Phase 19 mutation helpers ─────────────────────────────────────────────
# (post-Phase-20: atomic shot writes go directly through atomic_write_json
# from recoil.core.atomic_write — no wrapper.)


def _find_beat(beat_id: str, project_id: Optional[str] = None) -> tuple[Path, dict]:
    """Locate the shot file for beat_id. Returns (path, shot_dict).
    Raises KeyError if no shot file exists for beat_id."""
    validate_hierarchy_id("beat_id", beat_id)
    if project_id is not None:
        validate_project_id(project_id)
    candidates: list[str]
    if project_id:
        candidates = [project_id]
    else:
        root = projects_root()
        candidates = (
            [p.name for p in root.iterdir() if p.is_dir()] if root.exists() else []
        )
    for slug in candidates:
        path = _shots_dir(slug) / f"{beat_id}.json"
        shot = _load_shot(path)
        if shot is not None:
            return path, shot
    raise KeyError(f"beat {beat_id!r} not found in any project")


def _next_beat_id(episode_id: str, project_id: str) -> str:
    """Generate the next sequential beat_id for an episode."""
    max_num = 0
    pattern = re.compile(rf"^{re.escape(episode_id)}_SH(\d+)$", re.IGNORECASE)
    for path in _list_beat_files(project_id):
        m = pattern.match(path.stem)
        if m:
            max_num = max(max_num, int(m.group(1)))
    return f"{episode_id}_SH{max_num + 1:02d}"


def _find_shot_for_take(
    take_id: str, project_id: Optional[str]
) -> tuple[Path, dict, dict, int]:
    """Locate the shot file containing ``take_id``.

    Returns (path, shot_dict, raw_take_dict, take_index_in_takes_list).
    Raises KeyError if no shot has a take with that id.

    Path-traversal guard (Debug R1): IDs validated before any filesystem
    access. ``project_id=None`` triggers an O(N) cross-project scan; we
    emit a warning event so callers know to thread project_id through.
    """
    validate_hierarchy_id("take_id", take_id)
    if project_id is not None:
        validate_project_id(project_id)
    candidates: list[str]
    if project_id:
        candidates = [project_id]
    else:
        # Pragmatic fix (Debug R1 finding 8): full contract change deferred.
        # Surface the slow path as a warning event so we can grep for it
        # in production and prioritize the threading work later.
        try:
            from recoil.api.eventbus import (
                BUS,
            )  # local import — avoid cycle on cold start

            BUS.emit_sync(
                severity="warning",
                scope="adapter",
                summary="O(N) take lookup; pass project_id",
                detail=f"_find_shot_for_take({take_id!r}, project_id=None) — full project scan",
            )
        except Exception:  # noqa: BLE001 — never let logging break the call
            pass
        root = projects_root()
        candidates = (
            [p.name for p in root.iterdir() if p.is_dir()] if root.exists() else []
        )

    for slug in candidates:
        shots_dir = _shots_dir(slug)
        if not shots_dir.exists():
            continue
        for path in _list_beat_files(slug):
            shot = _load_shot(path)
            if shot is None:
                continue
            takes = shot.get("takes") or []
            for idx, raw in enumerate(takes):
                if not isinstance(raw, dict):
                    continue
                resolved_id = raw.get("take_id") or _synthetic_take_id(
                    shot.get("shot_id") or path.stem, raw, idx
                )
                if resolved_id == take_id:
                    return path, shot, raw, idx
    raise KeyError(f"take {take_id!r} not found in any shot")


def set_primary(take_id: str, project_id: Optional[str] = None) -> dict:
    """Mark ``take_id`` as the primary take on its beat.

    Writes the shot file with ``primary_take_id`` set. Other takes on
    the same beat have their ``primary`` field cleared.
    """
    path, shot, _take, _idx = _find_shot_for_take(take_id, project_id)
    shot["primary_take_id"] = take_id
    for raw in shot.get("takes") or []:
        if isinstance(raw, dict):
            raw["primary"] = raw.get("take_id") == take_id
    atomic_write_json(path, shot)
    return {"shot_id": shot.get("shot_id") or path.stem, "primary": take_id}


def toggle_circled(take_id: str, project_id: Optional[str] = None) -> dict:
    """Flip the ``circled`` boolean on ``take_id``."""
    path, shot, raw_take, _idx = _find_shot_for_take(take_id, project_id)
    new_state = not bool(raw_take.get("circled"))
    raw_take["circled"] = new_state
    atomic_write_json(path, shot)
    return {
        "shot_id": shot.get("shot_id") or path.stem,
        "take_id": take_id,
        "circled": new_state,
    }


def set_circled(take_id: str, value: bool, project_id: Optional[str] = None) -> dict:
    """Idempotent set of the ``circled`` boolean on ``take_id`` (Debug R1 fix).

    Unlike ``toggle_circled``, calling this twice with the same ``value``
    is a no-op — safe to retry on network blips.
    """
    path, shot, raw_take, _idx = _find_shot_for_take(take_id, project_id)
    raw_take["circled"] = bool(value)
    atomic_write_json(path, shot)
    return {
        "shot_id": shot.get("shot_id") or path.stem,
        "take_id": take_id,
        "circled": bool(value),
    }


def reject_take(take_id: str, project_id: Optional[str] = None) -> dict:
    """Mark ``take_id`` rejected (also flags it hidden)."""
    path, shot, raw_take, _idx = _find_shot_for_take(take_id, project_id)
    raw_take["rejected"] = True
    raw_take["hidden"] = True
    # If this was the primary, demote it.
    if shot.get("primary_take_id") == take_id:
        shot["primary_take_id"] = None
    atomic_write_json(path, shot)
    return {
        "shot_id": shot.get("shot_id") or path.stem,
        "take_id": take_id,
        "rejected": True,
    }


def set_prompt_override(
    beat_id: str, new_text: str, project_id: Optional[str] = None
) -> dict:
    """Write new_text to shot["prompt_override"] for the given beat.

    Follows the same atomic write pattern as set_primary / toggle_circled.
    Searches by beat_id (shot file stem), not by take_id.

    Raises KeyError if no shot file exists for beat_id.
    """
    validate_hierarchy_id("beat_id", beat_id)
    if project_id is not None:
        validate_project_id(project_id)
    candidates: list[str]
    if project_id:
        candidates = [project_id]
    else:
        root = projects_root()
        candidates = (
            [p.name for p in root.iterdir() if p.is_dir()] if root.exists() else []
        )
    for slug in candidates:
        path = _shots_dir(slug) / f"{beat_id}.json"
        shot = _load_shot(path)
        if shot is None:
            continue
        shot["prompt_override"] = new_text
        atomic_write_json(path, shot)
        return {
            "shot_id": shot.get("shot_id") or path.stem,
            "prompt_override_set": True,
        }
    raise KeyError(f"beat {beat_id!r} not found in any project")


def insert_beat(
    episode_id: str,
    text: str,
    project_id: str,
    after_beat_id: Optional[str] = None,
) -> dict:
    """Create a new shot JSON file for a brand-new beat in the episode.

    Raises KeyError if project_id's shots directory does not exist.
    """
    validate_project_id(project_id)
    shots_dir = _shots_dir(project_id)
    if not shots_dir.exists():
        raise KeyError(f"shots directory does not exist for project {project_id!r}")
    new_beat_id = _next_beat_id(episode_id, project_id)
    shot = {
        "schema_version": 1,
        "shot_id": new_beat_id,
        "episode_id": episode_id,
        "status": "pending",
        "takes": [],
        "prompt_override": text,
        "inserted_after": after_beat_id,
        "created_by": "BeatInsertionProposal",
    }
    path = shots_dir / f"{new_beat_id}.json"
    atomic_write_json(path, shot)
    return {
        "shot_id": new_beat_id,
        "episode_id": episode_id,
        "beat_insertion_applied": True,
    }


def update_take_params(
    take_id: str, params_delta: dict, project_id: Optional[str] = None
) -> dict:
    """Merge params_delta into the take dict for take_id.

    Uses _find_shot_for_take to locate (path, shot, raw_take, idx).
    Updates raw_take in-place with params_delta keys, then atomic_write_json.

    Raises KeyError if take_id is not found.
    """
    path, shot, raw_take, _idx = _find_shot_for_take(take_id, project_id)
    raw_take.update(params_delta)
    atomic_write_json(path, shot)
    return {
        "shot_id": shot.get("shot_id") or path.stem,
        "take_id": take_id,
        "params_updated": list(params_delta.keys()),
    }


def add_directive_to_beats(
    beat_ids: list[str],
    note: str,
    project_id: Optional[str] = None,
) -> dict:
    """Append a directive note to each beat's shot JSON.

    Raises KeyError if any beat_id is not found (fails fast on first miss).
    Beats processed before the miss are already mutated — safe to re-run
    since directives are append-only and notes are idempotent in intent.
    """
    updated: list[str] = []
    for bid in beat_ids:
        path, shot = _find_beat(bid, project_id)
        directives = shot.get("directives", [])
        directives.append(note)
        shot["directives"] = directives
        atomic_write_json(path, shot)
        updated.append(bid)
    return {"beat_ids_updated": updated, "directive_applied": True}


def _next_cutaway_id(source_beat_id: str, project_id: str) -> str:
    """Generate the next cutaway beat_id for a source beat."""
    max_num = 0
    pattern = re.compile(rf"^{re.escape(source_beat_id)}_CUT(\d+)$", re.IGNORECASE)
    for path in _list_beat_files(project_id):
        m = pattern.match(path.stem)
        if m:
            max_num = max(max_num, int(m.group(1)))
    return f"{source_beat_id}_CUT{max_num + 1:02d}"


def extract_cutaway(
    from_beat_id: str,
    description: str,
    project_id: Optional[str] = None,
) -> dict:
    """Create a new cutaway shot derived from an existing beat.

    Creates the new shot file AND marks the source beat with a cutaways list.
    Raises KeyError if from_beat_id is not found.
    """
    path, source_shot = _find_beat(from_beat_id, project_id)
    # path structure: <projects_root>/<project>/state/visual/shots/<beat>.json
    resolved_project = path.parent.parent.parent.parent.name
    episode_id = _derive_episode_id(source_shot, path, resolved_project)
    cutaway_id = _next_cutaway_id(from_beat_id, resolved_project)
    cutaway_shot = {
        "schema_version": 1,
        "shot_id": cutaway_id,
        "episode_id": episode_id,
        "status": "pending",
        "takes": [],
        "prompt_override": description,
        # cutaway_source is the editorial back-link; coverage_of is the
        # generation pipeline field for existing coverage tracking (both
        # refer to the same beat but serve different consumers).
        "cutaway_source": from_beat_id,
        "is_coverage": True,
        "coverage_of": from_beat_id,
        "created_by": "ExtractCutawayProposal",
    }
    cutaway_path = path.parent / f"{cutaway_id}.json"
    # Write cutaway first: if the second write fails, the orphaned cutaway
    # is still valid via its cutaway_source field and safe to re-run.
    atomic_write_json(cutaway_path, cutaway_shot)
    cutaways = source_shot.get("cutaways", [])
    cutaways.append(cutaway_id)
    source_shot["cutaways"] = cutaways
    atomic_write_json(path, source_shot)
    return {
        "shot_id": cutaway_id,
        "cutaway_source": from_beat_id,
        "episode_id": episode_id,
        "extract_cutaway_applied": True,
    }


def set_ref_overrides(
    beat_id: str,
    swap_before: str,
    swap_after: str,
    prompt_additions: Optional[list[str]] = None,
    project_id: Optional[str] = None,
) -> dict:
    """Scope boundary: ref_resolver.py does not read shot["ref_overrides"] yet —
    pipeline wiring is a separate follow-on.
    Raises KeyError if beat_id is not found.
    """
    path, shot = _find_beat(beat_id, project_id)
    overrides = shot.get("ref_overrides", [])
    overrides.append({"before": swap_before, "after": swap_after})
    shot["ref_overrides"] = overrides
    if prompt_additions:
        existing_adds = shot.get("prompt_additions", [])
        existing_adds.extend(prompt_additions)
        shot["prompt_additions"] = existing_adds
    atomic_write_json(path, shot)
    return {
        "shot_id": shot.get("shot_id") or path.stem,
        "ref_swap_applied": True,
        "swap": {"before": swap_before, "after": swap_after},
        "prompt_additions_count": len(prompt_additions) if prompt_additions else 0,
    }


def pin_strategy(
    beat_id: str,
    strategy_name: str,
    rationale: str,
    project_id: Optional[str] = None,
) -> dict:
    """Scope boundary: production_loop.py does not read shot["pinned_strategy"] yet —
    production loop wiring is a separate follow-on.
    Raises KeyError if beat_id is not found.
    """
    path, shot = _find_beat(beat_id, project_id)
    shot["pinned_strategy"] = {"name": strategy_name, "rationale": rationale}
    atomic_write_json(path, shot)
    return {
        "shot_id": shot.get("shot_id") or path.stem,
        "pinned_strategy_set": True,
        "strategy_name": strategy_name,
    }


__all__ = [
    "SYNTHETIC_SCENE_ID_SUFFIX",
    "synthetic_scene_id",
    "list_episodes",
    "list_scenes",
    "list_beats",
    "list_takes",
    "get_beat",
    "get_shot_dict",
    "get_episode_id_for_beat",
    "set_primary",
    "toggle_circled",
    "set_circled",
    "reject_take",
    "set_prompt_override",
    "update_take_params",
    "insert_beat",
    "add_directive_to_beats",
    "extract_cutaway",
    "set_ref_overrides",
    "pin_strategy",
]
