# ==============================================================================
# PORTED FROM STARSEND: lib/execution_store.py
# DATE: 2026-03-29
# ==============================================================================
"""execution_store.py — Per-project JSON file-per-shot backend for execution state.

Single-machine safe (fcntl + atomic replace). Cross-machine safety via the
project write lease (state_lease.py) — fcntl does NOT span machines.
Reference: workspace/state.py:91-113 (canonical pattern).
Lock file: <shots_dir>/.shots.lock.

Each shot is stored as an individual JSON file under
    projects/{project}/state/visual/shots/{shot_id}.json.
"""

import contextlib
import fcntl
import json
import logging
import os
import re
import tempfile
import threading
import time
from collections import defaultdict
from pathlib import Path
from typing import Optional

from recoil.core.exceptions import ExecutionStoreCorruptError
from recoil.core.paths import projects_root, ProjectPaths
from recoil.execution.state_lease import ensure_write_lease
from recoil.pipeline._lib.schema_versions import EXECUTION_STORE_SCHEMA_VERSION

logger = logging.getLogger(__name__)

# ── State Machine ───────────────────────────────────────────────

VALID_TRANSITIONS: dict[str, frozenset[str]] = {
    # ── Previs layer ──
    "previs_pending":       frozenset({"previs_submitted", "previs_generating", "skipped", "failed"}),
    "previs_submitted":     frozenset({"previs_processing", "previs_generating", "failed"}),
    "previs_processing":    frozenset({"previs_complete", "previs_generated", "failed"}),
    "previs_generating":    frozenset({"previs_generated", "previs_failed", "previs_mechanical_failed", "previs_semantic_failed", "pending_qc", "failed"}),
    "previs_generated":     frozenset({"previs_approved", "previs_pending", "previs_generating", "previs_mechanical_failed", "previs_rejected", "failed"}),
    "previs_failed":        frozenset({"previs_pending", "previs_generating", "failed"}),
    "previs_complete":      frozenset({"previs_approved", "previs_pending", "failed"}),
    "previs_rejected":      frozenset({"previs_pending", "previs_approved", "previs_generated"}),
    "previs_approved":      frozenset({"keyframe_pending", "keyframe_generating", "video_pending", "skipped", "previs_generated"}),

    # ── Keyframe layer ──
    "keyframe_pending":     frozenset({"keyframe_generating", "skipped", "failed"}),
    "keyframe_generating":  frozenset({"keyframe_generated", "keyframe_mechanical_failed", "keyframe_semantic_failed", "icu_escalated", "pending_qc", "failed"}),
    "keyframe_generated":   frozenset({"keyframe_approved", "keyframe_pending", "keyframe_generating", "keyframe_semantic_failed", "keyframe_rejected", "previs_approved", "failed"}),
    "keyframe_approved":    frozenset({"video_pending", "keyframe_pending", "keyframe_generated", "skipped"}),
    "keyframe_mechanical_failed": frozenset({"keyframe_pending", "keyframe_generating", "failed"}),
    "keyframe_semantic_failed":   frozenset({"keyframe_pending", "keyframe_generating", "keyframe_approved", "failed"}),
    "keyframe_rejected":    frozenset({"keyframe_pending", "keyframe_approved", "keyframe_generated"}),

    # ── Video layer ──
    "video_pending":        frozenset({"video_submitted", "skipped", "failed", "keyframe_approved", "previs_approved"}),
    "video_submitted":      frozenset({"video_processing", "video_failed", "failed"}),
    "video_processing":     frozenset({"video_ready", "video_failed", "video_mechanical_failed", "video_semantic_failed", "pending_qc", "failed"}),
    "video_ready":          frozenset({"video_complete", "video_downloading", "video_failed", "video_mechanical_failed", "video_semantic_failed", "pending_qc", "failed"}),
    "video_downloading":    frozenset({"video_complete", "failed"}),
    "video_complete":       frozenset({"approved", "rejected", "video_rejected", "video_pending", "video_submitted", "keyframe_pending", "previs_pending", "failed"}),
    "video_failed":         frozenset({"video_pending", "failed"}),
    "video_mechanical_failed": frozenset({"video_pending", "failed"}),
    "video_semantic_failed":   frozenset({"video_pending", "video_complete", "failed"}),
    "video_rejected":       frozenset({"video_pending", "keyframe_pending", "previs_pending", "video_complete"}),

    # ── Terminal / decision states ──
    "approved":             frozenset({"video_pending"}),
    "rejected":             frozenset({"video_pending", "keyframe_pending", "previs_pending"}),
    "abandoned":            frozenset(),  # terminal — no outgoing transitions
    "skipped":              frozenset({"previs_pending", "keyframe_pending", "video_pending"}),
    "failed":               frozenset({"previs_pending", "keyframe_pending", "keyframe_approved", "video_pending"}),

    # ── Legacy compat (existing data may have these) ──
    "qc_mechanical_failed": frozenset({"previs_pending", "keyframe_pending", "keyframe_approved", "video_pending", "video_complete", "failed"}),
    "qc_semantic_failed":   frozenset({"keyframe_rejected", "keyframe_approved", "video_pending", "video_complete"}),

    # ── Phase 2.5: ERROR routing states ──
    # pending_qc: critic ERRORed (vision API down, etc.) — recheck script promotes or escalates
    "pending_qc": frozenset({
        "keyframe_generated", "video_complete", "previs_generated",
        "needs_review", "icu_escalated",
        "keyframe_semantic_failed", "video_semantic_failed",
    }),
    # icu_escalated: feedback agent had no fix — JT manually reviews after autopsy
    "icu_escalated": frozenset({
        "needs_review", "keyframe_pending", "video_pending", "abandoned",
    }),
    # needs_review: terminal state for shots requiring human action
    "needs_review": frozenset({
        "keyframe_pending", "video_pending", "abandoned",
    }),
}

SHOT_VALID_STATUSES = frozenset(VALID_TRANSITIONS.keys())


class InvalidTransitionError(Exception):
    """Raised when a shot status transition is not allowed by the state machine."""
    def __init__(self, shot_id: str, from_state: str, to_state: str):
        self.shot_id = shot_id
        self.from_state = from_state
        self.to_state = to_state
        allowed = sorted(VALID_TRANSITIONS.get(from_state, set()))
        super().__init__(
            f"Shot {shot_id}: transition {from_state} -> {to_state} is not allowed. "
            f"Valid from {from_state}: {allowed}"
        )


class ShotExistsError(Exception):
    """Raised when trying to create a shot that already exists."""
    def __init__(self, shot_id: str):
        self.shot_id = shot_id
        super().__init__(f"Shot {shot_id} already exists")

# Legacy paths — kept for migration
# TENET_6_DEFERRED_TO_PHASE_E: SQLite migration legacy; remove after no project still has the legacy DB.
from recoil.core.paths import PIPELINE_ROOT  # noqa: E402 — transitional: needed for legacy DB migration only
LEGACY_DB_PATH = PIPELINE_ROOT / "data" / "execution_plans" / "execution.db"
_LEGACY_CACHE_DIR = Path.home() / ".cache" / "starsend"

_SHOT_FIELDS = (
    "schema_version",
    "shot_id", "episode_id", "pipeline", "model", "status", "job_id",
    "session_id", "gate_results", "cost_incurred", "retry_waste_cost",
    "output_path", "error_message", "attempts", "max_attempts", "takes",
    "updated_at", "is_coverage", "coverage_of", "deferred", "deferred_reason",
)

_SHOT_DEFAULTS = {
    "schema_version": EXECUTION_STORE_SCHEMA_VERSION,
    "episode_id": "",
    "pipeline": None,
    "model": None,
    "status": "previs_pending",
    "job_id": None,
    "session_id": None,
    "gate_results": {},
    "cost_incurred": 0.0,
    "retry_waste_cost": 0.0,
    "output_path": None,
    "error_message": None,
    "attempts": 0,
    "max_attempts": 3,
    "takes": [],
    "updated_at": None,
    "is_coverage": False,
    "coverage_of": None,
    "deferred": False,
    "deferred_reason": None,
}


def _project_shots_dir(project: str) -> Path:
    """Return the shots directory for a given project (inside Dropbox)."""
    return ProjectPaths.for_project(project).shots_dir


def _is_under(path: Path, root: Path) -> bool:
    try:
        path.resolve(strict=False).relative_to(root.resolve(strict=False))
    except (OSError, ValueError):
        return False
    return True


class ExecutionStore:
    """Per-project JSON file-per-shot store for Layer 2 execution state.

    Each shot is stored as an individual JSON file. Writes are single-machine
    safe (fcntl + atomic replace). Cross-machine safety is via the project
    write lease (state_lease.py) — fcntl does NOT span machines.

    Explicit db_path stores outside ``<projects_root>/<project>`` are exempt
    from lease interaction, which keeps tests and scratch stores local.

    Args:
        project: Project name (e.g. "leviathan"). Determines storage location.
        db_path: Explicit override for the shots directory — used by tests.
    """

    def __init__(
        self,
        project: str = "leviathan",
        db_path: Optional[Path] = None,
        *,
        migrate: bool = True,
    ):
        self.project = project
        if db_path is not None:
            # db_path override: if it points to a .db file, use sibling shots/ dir
            db_path = Path(db_path)
            if db_path.suffix == ".db":
                self._shots_dir = db_path.parent / "shots"
            else:
                self._shots_dir = db_path
        else:
            self._shots_dir = _project_shots_dir(project)
        self._lease_enabled = (
            True if db_path is None else self._is_under_project_tree(self._shots_dir)
        )
        self._lock = threading.Lock()
        self._flock_path = self._shots_dir / ".shots.lock"

        if migrate:
            self._shots_dir.mkdir(parents=True, exist_ok=True)
        # Auto-migrate from SQLite if shots dir is empty.
        if migrate and not any(self._shots_dir.glob("*.json")):
            self._try_migrate_from_sqlite()

    @property
    def shots_dir(self) -> Path:
        return self._shots_dir

    def _is_under_project_tree(self, path: Path) -> bool:
        try:
            root = projects_root() / self.project.lower()
        except Exception:
            return False
        return _is_under(path, root)

    def _ensure_write_lease(self) -> None:
        if self._lease_enabled:
            ensure_write_lease(self.project)

    # ── Atomic I/O ────────────────────────────────────────────────

    def _shot_path(self, shot_id: str) -> Path:
        """Return the JSON file path for a shot."""
        return self._shots_dir / f"{shot_id}.json"

    def _read_shot_file(self, shot_id: str) -> Optional[dict]:
        """Read a single shot JSON file. Returns None if not found."""
        path = self._shot_path(shot_id)
        if not path.is_file():
            return None
        try:
            return json.loads(path.read_text(encoding="utf-8"))
        except json.JSONDecodeError as e:
            raise ExecutionStoreCorruptError(str(path), str(e)) from e
        except IOError as e:
            logger.warning("Failed to read shot %s: %s", shot_id, e)
            return None

    def _write_shot_file_locked(self, shot_id: str, data: dict) -> None:
        """Atomic write via tempfile + os.replace(). Caller must hold the lock."""
        # Backfill schema_version on update paths that bypass _SHOT_DEFAULTS
        # (e.g. update_shot's existing-record merge branch reading pre-MF-6
        # records). setdefault preserves any prior on-disk version so a future
        # constant bump does not silently rewrite legacy records.
        data.setdefault("schema_version", EXECUTION_STORE_SCHEMA_VERSION)
        path = self._shot_path(shot_id)
        fd, tmp = tempfile.mkstemp(
            dir=str(self._shots_dir), prefix=f".tmp_{shot_id}_", suffix=".json"
        )
        try:
            with os.fdopen(fd, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2, default=str)
            os.replace(tmp, str(path))
        except Exception:
            # Clean up temp file on failure
            try:
                os.unlink(tmp)
            except OSError:
                pass
            raise

    @contextlib.contextmanager
    def _locked(self):
        """Acquire intra-process + cross-process locks for a critical section.

        Order matches the canonical workspace/state.py pattern: threading.Lock
        outer (cheap intra-process bounce), fcntl.flock inner. Releases both
        on exit, including exceptions.
        """
        self._shots_dir.mkdir(parents=True, exist_ok=True)
        with self._lock:
            lock_fd = os.open(str(self._flock_path), os.O_CREAT | os.O_RDWR)
            try:
                fcntl.flock(lock_fd, fcntl.LOCK_EX)
                yield
            finally:
                fcntl.flock(lock_fd, fcntl.LOCK_UN)
                os.close(lock_fd)

    def _load_all(self) -> list[dict]:
        """Load all shot files. Raises ExecutionStoreCorruptError on corrupt JSON."""
        if not self._shots_dir.is_dir():
            return []
        shots = []
        for path in self._shots_dir.iterdir():
            if not path.is_file() or not path.suffix == ".json":
                continue
            if path.name.startswith(".tmp"):
                continue
            try:
                shots.append(json.loads(path.read_text(encoding="utf-8")))
            except json.JSONDecodeError as e:
                raise ExecutionStoreCorruptError(str(path), str(e)) from e
            except IOError as e:
                logger.warning("Skipping unreadable shot file %s: %s", path.name, e)
        return shots

    def close(self):
        """No-op. JSON files don't need connection management."""
        pass

    # ── Shot CRUD ──────────────────────────────────────────────────

    def insert_shot(self, shot: dict) -> None:
        """Insert a new shot record."""
        self._ensure_write_lease()
        data = {k: shot.get(k, _SHOT_DEFAULTS.get(k)) for k in _SHOT_FIELDS}
        data["shot_id"] = shot["shot_id"]
        if isinstance(data.get("gate_results"), str):
            try:
                data["gate_results"] = json.loads(data["gate_results"])
            except (json.JSONDecodeError, TypeError):
                data["gate_results"] = {}
        if isinstance(data.get("takes"), str):
            try:
                data["takes"] = json.loads(data["takes"])
            except (json.JSONDecodeError, TypeError):
                data["takes"] = []
        data["updated_at"] = time.time()
        with self._locked():
            self._write_shot_file_locked(shot["shot_id"], data)

    def insert_shots_batch(self, shots: list[dict]) -> None:
        """Insert multiple shots."""
        self._ensure_write_lease()
        now = time.time()
        with self._locked():
            for shot in shots:
                data = {k: shot.get(k, _SHOT_DEFAULTS.get(k)) for k in _SHOT_FIELDS}
                data["shot_id"] = shot["shot_id"]
                if isinstance(data.get("gate_results"), str):
                    try:
                        data["gate_results"] = json.loads(data["gate_results"])
                    except (json.JSONDecodeError, TypeError):
                        data["gate_results"] = {}
                if isinstance(data.get("takes"), str):
                    try:
                        data["takes"] = json.loads(data["takes"])
                    except (json.JSONDecodeError, TypeError):
                        data["takes"] = []
                data["updated_at"] = now
                self._write_shot_file_locked(shot["shot_id"], data)

    def get_shot(self, shot_id: str) -> Optional[dict]:
        """Get full shot record as dict."""
        data = self._read_shot_file(shot_id)
        if data is None:
            return None
        # Ensure complex fields are proper types
        if not isinstance(data.get("gate_results"), dict):
            data["gate_results"] = {}
        if not isinstance(data.get("takes"), list):
            data["takes"] = []
        return data

    def get_shot_status(self, shot_id: str) -> str:
        """Get the current status of a shot."""
        data = self._read_shot_file(shot_id)
        if data is None:
            return "previs_pending"
        return data.get("status", "previs_pending")

    def update_shot(self, shot_id: str, **fields) -> None:
        """Update specific fields on a shot.

        Enforces state machine transitions when 'status' is included.
        Raises InvalidTransitionError for invalid transitions.
        Allows no-op (same state) transitions silently.

        Handles special semantics:
          - gate_results: merged with existing
          - cost_incurred: accumulated (added to existing)
          - retry_waste_cost: accumulated (added to existing)
          - takes: replaced wholesale
          - append_take: appends a single take to the list
        """
        self._ensure_write_lease()
        with self._locked():
            existing = self._read_shot_file(shot_id)
            if existing is None:
                # Auto-create if not found — parse episode_id from shot_id
                episode_id = fields.get("episode_id", "")
                if not episode_id:
                    ep_match = re.match(r"(EP\d+)", shot_id)
                    if ep_match:
                        episode_id = ep_match.group(1)
                new_shot = {"shot_id": shot_id, "episode_id": episode_id}
                new_shot.update(fields)
                data = {k: new_shot.get(k, _SHOT_DEFAULTS.get(k)) for k in _SHOT_FIELDS}
                data["shot_id"] = shot_id
                data["updated_at"] = time.time()
                self._write_shot_file_locked(shot_id, data)
                return

            # ── Enforce state machine transitions ──
            if "status" in fields:
                new_status = fields["status"]
                old_status = existing.get("status", "previs_pending")

                # Allow no-op (same state) transitions
                if new_status != old_status:
                    if old_status in VALID_TRANSITIONS:
                        allowed = VALID_TRANSITIONS[old_status]
                        if new_status not in allowed:
                            # Permissive: log but don't block
                            logger.warning(
                                "Shot %s: non-standard transition %s -> %s (allowing)",
                                shot_id, old_status, new_status,
                            )
                    else:
                        # Unknown current state — legacy data compat
                        logger.warning(
                            "Shot %s: unknown current state '%s' — allowing transition to '%s'",
                            shot_id, old_status, new_status,
                        )

            # Ensure complex fields are proper types
            if not isinstance(existing.get("gate_results"), dict):
                existing["gate_results"] = {}
            if not isinstance(existing.get("takes"), list):
                existing["takes"] = []

            # Extract takes and append_take first to ensure deterministic ordering:
            # 'takes' replaces wholesale, then 'append_take' appends to the list.
            has_takes = "takes" in fields
            takes_value = fields.pop("takes", None)
            has_append = "append_take" in fields
            append_value = fields.pop("append_take", None)

            if has_takes:
                existing["takes"] = takes_value if takes_value else []
            if has_append:
                existing["takes"].append(append_value)

            for key, value in fields.items():
                if key == "gate_results" and isinstance(value, dict):
                    existing["gate_results"].update(value)
                elif key == "cost_incurred" and value is not None:
                    existing["cost_incurred"] = (existing.get("cost_incurred") or 0) + value
                elif key == "retry_waste_cost" and value is not None:
                    existing["retry_waste_cost"] = (existing.get("retry_waste_cost") or 0) + value
                else:
                    existing[key] = value

            existing["updated_at"] = time.time()
            self._write_shot_file_locked(shot_id, existing)

    def create_shot(self, shot_id: str, initial_data: dict | None = None) -> None:
        """Create a new shot. Raises ShotExistsError if it already exists.

        Args:
            shot_id: Unique shot identifier.
            initial_data: Optional dict of initial field values.
        """
        # Build data dict outside the lock (pure local computation)
        data = {"shot_id": shot_id}
        if initial_data:
            data.update(initial_data)
        data = {k: data.get(k, _SHOT_DEFAULTS.get(k)) for k in _SHOT_FIELDS}
        data["shot_id"] = shot_id
        data["updated_at"] = time.time()
        # Check-and-write under a single lock to avoid TOCTOU race
        self._ensure_write_lease()
        with self._locked():
            if self._read_shot_file(shot_id) is not None:
                raise ShotExistsError(shot_id)
            self._write_shot_file_locked(shot_id, data)

    def append_take(self, shot_id: str, take: dict) -> int:
        """Append a take record to a shot's takes list.

        Args:
            shot_id: Shot identifier.
            take: Take record dict.

        Returns:
            0-based index of the appended take.
        """
        self._ensure_write_lease()
        with self._locked():
            existing = self._read_shot_file(shot_id)
            if existing is None:
                raise KeyError(f"Shot {shot_id} not found")
            if not isinstance(existing.get("takes"), list):
                existing["takes"] = []
            existing["takes"].append(take)
            idx = len(existing["takes"]) - 1
            existing["updated_at"] = time.time()
            self._write_shot_file_locked(shot_id, existing)
            return idx

    def acquire_next_take_number(self, shot_id: str) -> int:
        """Atomically claim the next take number for a shot.

        Holds the store's internal lock for the full read-and-claim cycle so
        concurrent callers cannot receive the same number. Use this in
        StepRunner._next_take_number when available.

        The returned number is `len(takes) + 1` (1-based). Callers that want
        to reserve the slot should follow up with append_take().
        """
        with self._locked():
            existing = self._read_shot_file(shot_id)
            if existing is None or not isinstance(existing.get("takes"), list):
                return 1
            return len(existing["takes"]) + 1

    def force_reset_status(self, shot_id: str, to_state: str, reason: str) -> None:
        """Emergency bypass: set status without state machine validation.

        Logs an audit trail warning. Use only for manual recovery.

        Args:
            shot_id: Shot identifier.
            to_state: Target status (must be in SHOT_VALID_STATUSES).
            reason: Human-readable reason for the forced reset.
        """
        if to_state not in SHOT_VALID_STATUSES:
            raise ValueError(f"Invalid target state: {to_state}")
        self._ensure_write_lease()
        with self._locked():
            existing = self._read_shot_file(shot_id)
            if existing is None:
                raise KeyError(f"Shot {shot_id} not found")
            old_state = existing.get("status", "previs_pending")
            logger.warning(
                "FORCE RESET: Shot %s: %s -> %s (reason: %s)",
                shot_id, old_state, to_state, reason,
            )
            existing["status"] = to_state
            existing["updated_at"] = time.time()
            self._write_shot_file_locked(shot_id, existing)

    def atomic_transition(self, shot_id: str, allowed_from: set[str], to_state: str) -> bool:
        """Thread-safe compare-and-swap transition.

        Only transitions if the current status is in `allowed_from`.
        Returns True if the transition was applied, False otherwise.

        Args:
            shot_id: Shot identifier.
            allowed_from: Set of statuses that permit this transition.
            to_state: Target status.

        Returns:
            True if transitioned, False if current state was not in allowed_from.
        """
        self._ensure_write_lease()
        with self._locked():
            existing = self._read_shot_file(shot_id)
            if existing is None:
                return False
            current = existing.get("status", "previs_pending")
            if current not in allowed_from:
                return False
            # Log non-standard transitions but allow them
            if current in VALID_TRANSITIONS:
                allowed = VALID_TRANSITIONS[current]
                if to_state not in allowed:
                    logger.warning(
                        "Shot %s: non-standard transition %s -> %s (allowing)",
                        shot_id, current, to_state,
                    )
            existing["status"] = to_state
            existing["updated_at"] = time.time()
            self._write_shot_file_locked(shot_id, existing)
            return True

    def delete_shot(self, shot_id: str) -> bool:
        """Delete a shot record. Returns True if deleted, False if not found."""
        self._ensure_write_lease()
        path = self._shot_path(shot_id)
        with self._locked():
            if path.is_file():
                path.unlink()
                logger.info("Deleted shot %s", shot_id)
                return True
            return False

    def get_shots_by_status(self, *statuses: str) -> list[dict]:
        """Get all shots matching any of the given statuses."""
        status_set = set(statuses)
        return [
            s for s in self._load_all()
            if s.get("status") in status_set
        ]

    def get_shots_by_episode(self, episode_id: str, include_coverage: bool = True) -> list[dict]:
        """Get all shots for a specific episode.

        Args:
            episode_id: Episode identifier to filter by.
            include_coverage: If False, exclude coverage shots (is_coverage=True).
        """
        shots = [
            s for s in self._load_all()
            if s.get("episode_id") == episode_id
            and (include_coverage or not s.get("is_coverage", False))
        ]
        return sorted(shots, key=lambda s: s.get("shot_id", ""))

    def get_all_shots(self) -> list[dict]:
        """Get all shots across all episodes."""
        shots = self._load_all()
        return sorted(shots, key=lambda s: (s.get("episode_id", ""), s.get("shot_id", "")))

    def get_approved_neighbors(
        self,
        shot_id: str,
        episode_id: str,
        scene_index: str | None = None,
    ) -> dict[str, dict | None]:
        """Return the previous and next approved shots for a given shot.

        Only returns shots in the same episode AND same scene (if scene_index
        provided). Each neighbor dict contains {shot_id, output_path, status}.
        """
        episode_shots = self.get_shots_by_episode(episode_id, include_coverage=False)

        def _sort_key(s):
            m = re.search(r'(\d+)$', s.get("shot_id", "0"))
            return int(m.group(1)) if m else 0
        episode_shots.sort(key=_sort_key)

        current_idx = None
        for i, s in enumerate(episode_shots):
            if s.get("shot_id") == shot_id:
                current_idx = i
                break

        if current_idx is None:
            return {"previous": None, "next": None}

        approved_statuses = {"approved", "video_complete", "keyframe_approved"}

        def _same_scene(s):
            if scene_index is None:
                return True
            neighbor_scene = s.get("scene_index") or s.get("scene_id") or s.get("location")
            return neighbor_scene is not None and str(neighbor_scene) == str(scene_index)

        previous = None
        for i in range(current_idx - 1, -1, -1):
            s = episode_shots[i]
            if not _same_scene(s):
                continue
            if s.get("status") in approved_statuses and s.get("output_path"):
                previous = {
                    "shot_id": s["shot_id"],
                    "output_path": s["output_path"],
                    "status": s["status"],
                }
                break

        next_shot = None
        for i in range(current_idx + 1, len(episode_shots)):
            s = episode_shots[i]
            if not _same_scene(s):
                continue
            if s.get("status") in approved_statuses and s.get("output_path"):
                next_shot = {
                    "shot_id": s["shot_id"],
                    "output_path": s["output_path"],
                    "status": s["status"],
                }
                break

        return {"previous": previous, "next": next_shot}

    # ── Crash Recovery ─────────────────────────────────────────────

    def detect_orphans(self, session_id: str) -> list[dict]:
        """Find in-flight shots from a different session (crashed pipeline)."""
        in_flight = {
            "previs_generating", "keyframe_generating",
            "video_submitted", "video_processing", "video_downloading",
        }
        orphans = []
        for s in self._load_all():
            if s.get("status") in in_flight and s.get("session_id") != session_id:
                orphans.append({
                    "shot_id": s["shot_id"],
                    "status": s["status"],
                    "old_session_id": s.get("session_id"),
                    "job_id": s.get("job_id"),
                })
        return orphans

    def recover_orphan(self, shot_id: str, new_session_id: str) -> None:
        """Reclaim an orphaned shot with a new session_id."""
        self.update_shot(
            shot_id,
            session_id=new_session_id,
            attempts=(self.get_shot(shot_id) or {}).get("attempts", 0) + 1,
        )

    # ── Aggregation ────────────────────────────────────────────────

    def total_cost(self, episode_id: Optional[str] = None) -> float:
        """Sum of costs, optionally filtered by episode."""
        total = 0.0
        for s in self._load_all():
            if episode_id and s.get("episode_id") != episode_id:
                continue
            total += s.get("cost_incurred", 0) or 0
        return total

    def summary(self, episode_id: Optional[str] = None) -> dict:
        """Status summary with counts per status."""
        by_status = defaultdict(int)
        count = 0
        for s in self._load_all():
            if episode_id and s.get("episode_id") != episode_id:
                continue
            by_status[s.get("status", "previs_pending")] += 1
            count += 1
        return {
            "episode_id": episode_id or "all",
            "total_shots": count,
            "total_cost": round(self.total_cost(episode_id), 4),
            "by_status": dict(by_status),
        }

    def budget_summary(self) -> dict:
        """Season-level budget summary for the Board tab."""
        episodes = defaultdict(lambda: {
            "total_shots": 0, "total_cost": 0.0,
            "prep": 0, "needs_approval": 0, "shooting": 0,
            "review_dailies": 0, "in_the_can": 0,
        })

        needs_approval = {"previs_generated", "keyframe_generated"}
        shooting = {
            "previs_generating", "keyframe_generating",
            "video_submitted", "video_processing", "video_downloading",
        }
        review = {"previs_approved", "keyframe_approved", "video_ready"}

        for s in self._load_all():
            ep = s.get("episode_id", "")
            status = s.get("status", "previs_pending")
            cost = s.get("cost_incurred", 0) or 0
            ep_data = episodes[ep]
            ep_data["total_shots"] += 1
            ep_data["total_cost"] += cost
            if status == "previs_pending":
                ep_data["prep"] += 1
            elif status in needs_approval:
                ep_data["needs_approval"] += 1
            elif status in shooting:
                ep_data["shooting"] += 1
            elif status in review:
                ep_data["review_dailies"] += 1
            elif status in ("video_complete", "approved"):
                ep_data["in_the_can"] += 1

        return {
            "episodes": [
                {
                    "episode_id": ep_id,
                    "total_shots": d["total_shots"],
                    "total_cost": round(d["total_cost"], 4),
                    "prep": d["prep"],
                    "needs_approval": d["needs_approval"],
                    "shooting": d["shooting"],
                    "review_dailies": d["review_dailies"],
                    "in_the_can": d["in_the_can"],
                }
                for ep_id, d in sorted(episodes.items())
            ],
            "season_total_cost": round(self.total_cost(), 4),
        }

    def checkpoint(self) -> None:
        """No-op. JSON files are immediately persisted."""
        pass

    # ── SQLite Migration ──────────────────────────────────────────

    def _try_migrate_from_sqlite(self) -> None:
        """One-time migration: read shots from SQLite, write as JSON files.

        Checks both the local cache (~/.cache/starsend/) and Dropbox locations.
        """
        import sqlite3

        candidates = [
            _LEGACY_CACHE_DIR / self.project / "execution.db",
            LEGACY_DB_PATH,
        ]
        try:
            candidates.insert(
                1, ProjectPaths.for_project(self.project).visual_state_dir / "execution.db"
            )
        except FileNotFoundError:
            logger.debug("ProjectPaths unavailable for %s — skipping Dropbox migration candidate", self.project)

        for db_path in candidates:
            if not db_path.is_file():
                continue
            try:
                conn = sqlite3.connect(str(db_path), timeout=5.0)
                conn.row_factory = sqlite3.Row
                rows = conn.execute("SELECT * FROM shots").fetchall()
                if not rows:
                    conn.close()
                    continue

                count = 0
                self._ensure_write_lease()
                with self._locked():
                    for row in rows:
                        d = dict(row)
                        # Parse JSON blobs
                        for field in ("gate_results", "takes"):
                            if isinstance(d.get(field), str):
                                try:
                                    d[field] = json.loads(d[field])
                                except (json.JSONDecodeError, TypeError):
                                    d[field] = {} if field == "gate_results" else []
                            elif d.get(field) is None:
                                d[field] = {} if field == "gate_results" else []

                        self._write_shot_file_locked(d["shot_id"], d)
                        count += 1

                conn.close()
                logger.info(
                    "Migrated %d shots from SQLite (%s) to JSON (%s)",
                    count, db_path, self._shots_dir,
                )
                return  # Stop after first successful migration
            except (sqlite3.Error, IOError) as e:
                logger.warning("SQLite migration failed for %s: %s", db_path, e)

    def __del__(self):
        try:
            self.close()
        except Exception:
            pass


# ── Cross-project aggregation ─────────────────────────────────────


def global_budget_summary() -> dict:
    """Iterate all project dirs and aggregate budget summaries."""
    results = {}
    grand_total = 0.0

    if not projects_root().exists():
        return {"projects": results, "grand_total_cost": 0.0}

    for proj_dir in sorted(projects_root().iterdir()):
        if not proj_dir.is_dir() or proj_dir.name.startswith(("_", ".")):
            continue
        pp = ProjectPaths.for_project(proj_dir.name)
        shots_dir = pp.shots_dir
        # Also check for legacy SQLite that would trigger migration
        db_path = pp.visual_state_dir / "execution.db"
        if not shots_dir.exists() and not db_path.exists():
            continue
        try:
            store = ExecutionStore(project=proj_dir.name)
            summary = store.budget_summary()
            store.close()
            results[proj_dir.name] = summary
            grand_total += summary.get("season_total_cost", 0.0)
        except Exception as e:
            logger.warning(f"Failed to read budget for {proj_dir.name}: {e}")

    return {
        "projects": results,
        "grand_total_cost": round(grand_total, 4),
    }


__all__ = [
    # Public symbols (Phase D — MF-3 + DEBT-9).
    # Core class.
    "ExecutionStore",
    # Constants / state machine.
    "SHOT_VALID_STATUSES",
    "VALID_TRANSITIONS",
    # Exceptions.
    "InvalidTransitionError",
    "ShotExistsError",
    # Module-level helpers.
    "global_budget_summary",
]
