# recoil/pipeline/lib/sidecar.py
"""Sidecar manifest read protocol (Phase 1) + video sidecar populator (R4).

Two layers live here:

1. Asset-manifest read protocol (original)
   - mtime cache for fast reads (~10µs warm)
   - Lazy sha256 verification on `force=True` only
   - Drift surfaced via `SidecarRead.drifted`, never raised
   - Watcher does NOT invalidate directly; consumers pass `force=True` on events

2. Video sidecar populator (R4 Phase 3 — A4 leak fix)
   - `populate_sidecar(...)` builds the canonical .mp4.json dict
   - `write_sidecar_dict(...)` writes a sidecar dict to a path
   - Used by BOTH dispatch_cli (single-shot) and step_runner (r2v_multi)

See spec §A.4 for the canonical sketch in opus_round_3.md.
"""

from __future__ import annotations

import hashlib
import json
import threading
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any


SIDECAR_SCHEMA_VERSION = "1.0"

_CACHE: dict[Path, "_CacheEntry"] = {}
_LOCK = threading.RLock()


@dataclass
class _CacheEntry:
    manifest: dict
    cached_at_mtime: float
    sha256_at_cache: str | None = None


@dataclass
class SidecarRead:
    manifest: dict
    drifted: bool
    cached_sha256: str | None
    current_sha256: str | None = None
    sidecar_path: Path | None = None


def compute_sha256(file_path: Path) -> str:
    h = hashlib.sha256()
    with file_path.open("rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            h.update(chunk)
    return h.hexdigest()


def _sidecar_path_for(file_path: Path) -> Path:
    return file_path.parent / "_meta" / f"{file_path.name}.json"


def read_sidecar(file_path: Path, *, force: bool = False) -> SidecarRead | None:
    """Read the sidecar manifest for a file.

    Returns None if no sidecar exists. Returns a SidecarRead with drifted=True
    if the sha256 in the manifest does not match the current file (only checked
    when force=True).
    """
    sidecar = _sidecar_path_for(file_path)
    if not sidecar.exists():
        return None

    with _LOCK:
        try:
            current_mtime = file_path.stat().st_mtime
        except FileNotFoundError:
            return None

        cached = _CACHE.get(file_path)
        if cached is not None and cached.cached_at_mtime == current_mtime and not force:
            return SidecarRead(
                manifest=cached.manifest,
                drifted=False,
                cached_sha256=cached.sha256_at_cache,
                sidecar_path=sidecar,
            )

        try:
            manifest = json.loads(sidecar.read_text())
        except FileNotFoundError:
            return None
        manifest_sha = manifest.get("file", {}).get("sha256")

        drifted = False
        current_sha = None
        if force and manifest_sha is not None:
            current_sha = compute_sha256(file_path)
            drifted = current_sha != manifest_sha

        _CACHE[file_path] = _CacheEntry(
            manifest=manifest,
            cached_at_mtime=current_mtime,
            sha256_at_cache=current_sha or manifest_sha,
        )

        return SidecarRead(
            manifest=manifest,
            drifted=drifted,
            cached_sha256=manifest_sha,
            current_sha256=current_sha,
            sidecar_path=sidecar,
        )


def write_sidecar(file_path: Path, manifest: dict) -> Path:
    """Write a sidecar manifest atomically."""
    sidecar = _sidecar_path_for(file_path)
    sidecar.parent.mkdir(parents=True, exist_ok=True)
    tmp = sidecar.with_suffix(".json.tmp")
    tmp.write_text(json.dumps(manifest, indent=2))
    tmp.replace(sidecar)
    with _LOCK:
        _CACHE.pop(file_path, None)
    return sidecar


def invalidate_cache(file_path: Path | None = None) -> None:
    """Invalidate one entry or the whole cache."""
    with _LOCK:
        if file_path is None:
            _CACHE.clear()
        else:
            _CACHE.pop(file_path, None)


# ─────────────────────────────────────────────────────────────────────────────
# Video sidecar populator (R4 Phase 3 — A4 leak fix)
#
# Replaces ad-hoc `{"refs_used": [], "seed": null, ...}` literals scattered
# throughout dispatch_cli / step_runner. One helper, one schema, one source
# of truth. Phase 9 audit assertion #18 + #20 both depend on this contract.
#
# Schema (sidecar JSON written next to <video>.mp4):
#   {
#     "schema_version": "1.0",
#     "video_path": "...",
#     "model": "seeddance-2.0",
#     "model_filename_id": "seeddance-2-0",
#     "modality": "video_i2v",
#     "tag": "SOLO_JADE",
#     "project": "tartarus",
#     "cost_usd": 1.21,
#     "duration_s": 5.0,
#     "prompt": "<final dispatched prompt>",
#     "provenance": {
#       "refs_used": ["jade_hero.png", "jade_front.png"],
#       "seed": 2010003547,
#       "gate_results": {"hero_frame_ok": true, ...},
#       "prompt_layers": {"action": "...", "subject": "..."},
#       "prompt_engine_version": "875b9e2892e5"
#     }
#   }
# ─────────────────────────────────────────────────────────────────────────────


def _derive_filename_id(model: str | None) -> str | None:
    """Mirror naming._model_filename_id for sidecar parity."""
    if not model:
        return None
    try:
        from recoil.core.naming import _model_filename_id
        return _model_filename_id(model)
    except Exception:
        return model.lower().replace(".", "-").replace("_", "-")


def _normalize_refs(value) -> list[dict]:
    """Normalize a heterogeneous refs_used input to a list[dict] canonical shape.

    Accepts:
        None                                  → []
        list[str | Path | dict | None]        → list[dict]
            - str:  {"path": "<str>"}
            - Path: {"path": str(p)}
            - dict: passed through unchanged
            - None entries silently skipped

    Raises:
        TypeError if value is not a list (or None).
        TypeError if a list element is not str / Path / dict / None.
    """
    if value is None:
        return []
    if not isinstance(value, list):
        raise TypeError(
            f"_normalize_refs: expected list or None, got {type(value).__name__}"
        )
    out: list[dict] = []
    for item in value:
        if item is None:
            continue
        if isinstance(item, dict):
            out.append(item)
        elif isinstance(item, str):
            out.append({"path": item})
        elif hasattr(item, "__fspath__"):
            # pathlib.Path or any os.PathLike — step_runner passes Paths.
            out.append({"path": str(item)})
        else:
            raise TypeError(
                f"_normalize_refs: unsupported element type {type(item).__name__} "
                f"(value={item!r}); accepts str | Path | dict | None"
            )
    return out


def _provider_run_provenance(receipt: Any) -> dict[str, Any]:
    """Extract provider-native run ids from the canonical result metadata."""
    meta = (
        getattr(receipt, "metadata", None)
        or (getattr(receipt, "run_result", None)
            and getattr(receipt.run_result, "metadata", None))
        or {}
    )
    if not isinstance(meta, dict):
        return {}

    provider_run_id = (
        meta.get("provider_run_id")
        or meta.get("request_id")
        or meta.get("native_id")
        or meta.get("run_id")
    )
    provider = meta.get("provider")
    flora_run_id = meta.get("flora_run_id")
    if flora_run_id is None and provider == "flora":
        flora_run_id = provider_run_id

    out: dict[str, Any] = {}
    if provider_run_id is not None:
        out["provider_run_id"] = provider_run_id
    if flora_run_id is not None:
        out["flora_run_id"] = flora_run_id
    return out


def populate_sidecar(
    *,
    receipt: Any,
    payload: dict,
    refs_used: list | None = None,
    gate_results: dict | None = None,
    prompt_layers: dict | None = None,
    tag: str | None = None,
    project: str | None = None,
    # R6 Phase 2 — new explicit kwargs (legacy housekeeping migration).
    dispatch_path: str | None = None,
    provider_adapter: str | None = None,
    pipeline: str | None = None,
    shot_id: str | None = None,
    generation_params: dict | None = None,
    inputs_snapshot_hash: str | None = None,
    location_id: str | None = None,
    source: str = "pipeline",
    status: str = "candidate",
) -> dict:
    """Build the canonical sidecar dict. Does NOT write to disk.

    R6 Phase 2: signature extended with 9 new kwargs to absorb legacy
    `write_pipeline_sidecar_RETIRED` fields. Receipt-None branch reads cost+seed
    from `payload`. Output dual-emits `model` / `prompt` at top level AND
    inside `provenance`; dual-emits `cost_usd` at top level + `cost` inside
    `provenance` (note: provenance uses LEGACY KEY NAME `cost`, top-level
    uses MODERN KEY `cost_usd`).

    Six housekeeping fields are stamped UNCONDITIONALLY at top-level:
        `source` (default "pipeline"), `status` (default "candidate"),
        `created_at`, `updated_at` (both ISO timestamps stamped at call time),
        `lineage` (always {}), `notes` (always ""), `tags` (always []).

    Args:
        receipt: GenerationReceipt (CP-5) or a RunResult-shaped object, or
                 None. If None, cost+seed are read from `payload`.
        payload: The dispatched payload (the dict passed to dispatch()).
                 Reads model, modality, duration, prompt, reference_images,
                 video_path, output_path, and (when receipt is None) cost_usd
                 + seed.
        refs_used: Refs list — str | Path | dict elements accepted. If None,
                   falls back to `payload["reference_images"]` or [].
        gate_results: e.g. {"hero_frame_ok": True}. Defaults to {}.
        prompt_layers: e.g. {"action": "...", "subject": "..."}. Defaults to {}.
        tag: The dropped-from-filename tag (SOLO_JADE / A_WREN / etc.).
        project: The dropped-from-filename project slug.
        dispatch_path: stamped by `dispatch()` in CP-5 per
                       `recoil/pipeline/core/dispatch.py:268`. Caller passes
                       `self._dispatch_path or "unknown"` (step_runner) or
                       `getattr(ctx.step_runner, "_dispatch_path", None) or "unknown"`
                       (dispatch_cli). Default "unknown" for tests.
        provider_adapter: e.g. "fal_seeddance". Vestigial but kept (audit).
        pipeline: e.g. "video_i2v", "video_t2v", "r2v_multi". Workspace API
                  exposes via /api/shot/.
        shot_id: e.g. "EP001_SH10". Primary key across 613+ uses.
        generation_params: e.g. {"duration": 5.0, "aspect_ratio": "9:16"}.
                           Audit tooling reads this.
        inputs_snapshot_hash: sha or None.
        location_id: location identifier or None.
        source: housekeeping; default "pipeline". Validated by audit #27
                against {"pipeline", "manual_drop", "pass_extraction"}.
        status: housekeeping; default "candidate". Workspace's frozen
                SIDECAR_VALID_STATUSES (recoil/workspace/sidecar.py:56-63) =
                {"candidate", "pinned", "canonical", "archived"}.
                Audit #27 accepts a forward-compatible superset
                {"candidate", "approved", "rejected", "pinned", "canonical",
                "archived"} — populate_sidecar callers should only emit
                values inside the workspace frozen set.

    Returns:
        The canonical sidecar dict, ready to json.dumps().
    """
    # Defensive local import — avoids a circular at module-load time.
    try:
        from recoil.pipeline._lib.prompt_engine import (
            PROMPT_ENGINE_SCHEMA_VERSION,
        )
    except ImportError:
        PROMPT_ENGINE_SCHEMA_VERSION = "unavailable"

    # Pull cost + seed defensively (receipt shapes vary).
    cost_usd = 0.0
    seed = None
    if receipt is not None:
        cost_usd = float(
            getattr(receipt, "cost_usd", None)
            or (getattr(receipt, "run_result", None)
                and getattr(receipt.run_result, "cost_usd", 0.0))
            or 0.0
        )
        meta = (
            getattr(receipt, "metadata", None)
            or (getattr(receipt, "run_result", None)
                and getattr(receipt.run_result, "metadata", None))
            or {}
        )
        if isinstance(meta, dict):
            seed = meta.get("seed")
    else:
        # R6 Phase 2 — legacy callers (step_runner.execute_video,
        # execute_keyframe) have no receipt but have cost+seed in local
        # scope; they pass via payload. (Q3 / Opus R3 — receipt-None landmine.)
        cost_usd = float(payload.get("cost_usd") or 0.0)
        seed = payload.get("seed")

    # Refs reconciliation. Refs argument wins; else payload["reference_images"].
    refs_input = refs_used if refs_used is not None else (
        payload.get("reference_images") or []
    )
    refs_normalized = _normalize_refs(refs_input)
    run_id_provenance = _provider_run_provenance(receipt)

    now_iso = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

    model = payload.get("model")
    prompt = payload.get("prompt")
    grouping = payload.get("grouping")
    segment_shot_ids = payload.get("segment_shot_ids")

    return {
        # ── modern top-level (kept) ─────────────────────────────────
        "schema_version": SIDECAR_SCHEMA_VERSION,
        # ── legacy housekeeping (top-level, defaults baked in) ──────
        "source": source,
        "status": status,
        "created_at": now_iso,
        "updated_at": now_iso,
        # ── modern top-level (kept) ─────────────────────────────────
        "video_path": str(payload.get("video_path") or payload.get("output_path") or ""),
        "model": model,
        "model_filename_id": (
            payload.get("model_filename_id") or _derive_filename_id(model)
        ),
        "modality": payload.get("modality"),
        "tag": tag,
        "project": project,
        "cost_usd": cost_usd,
        "duration_s": payload.get("duration") or payload.get("duration_s"),
        "prompt": prompt,
        # ── legacy housekeeping (top-level dicts/lists) ────────────
        "lineage": {},
        "notes": "",
        "tags": [],
        # ── provenance bag (legacy fields + modern audit fields) ───
        "provenance": {
            "refs_used": refs_normalized,
            "seed": seed,
            "gate_results": gate_results or {},
            "prompt_layers": prompt_layers or {},
            "prompt_engine_version": PROMPT_ENGINE_SCHEMA_VERSION,
            # ── dual-emit for legacy consumers (Opus R3 Q4) ────────
            "model": model,
            "prompt": prompt,
            "cost": cost_usd,                      # NOTE: legacy key "cost", not "cost_usd"
            # ── absorbed legacy provenance fields ──────────────────
            "generation_params": generation_params or {},
            "inputs_snapshot_hash": inputs_snapshot_hash,
            "location_id": location_id,
            "pipeline": pipeline or "unknown",
            "shot_id": shot_id,
            "segment_shot_ids": (
                list(segment_shot_ids)
                if isinstance(segment_shot_ids, (list, tuple))
                else segment_shot_ids
            ),
            "project": project,
            "grouping": dict(grouping) if isinstance(grouping, dict) else grouping,
            **run_id_provenance,
            "dispatch_path": dispatch_path or "unknown",
            "provider_adapter": provider_adapter,
        },
    }


def write_sidecar_dict(
    sidecar_path: Path,
    sidecar_dict: dict,
) -> None:
    """Write a sidecar dict to disk as indented JSON.

    R6 Phase 1: acquires the per-sidecar fcntl.flock (`with_sidecar_lock`) and
    delegates to `atomic_write_json` for the tmp → fsync → os.replace cycle.
    The flock keys on the MEDIA path (NOT the sidecar JSON path) so this
    writer interlocks with `recoil/workspace/sidecar.py::set_status` (which
    keys on the media path via `_sidecar_lock_path`). Both writers can race;
    only one wins the critical section at a time.

    Distinct from the asset-manifest `write_sidecar(file_path, manifest) -> Path`
    above. This helper takes an explicit sidecar path (typically
    <video>.mp4.json sitting next to a video) and a dict. No `_meta/`
    indirection, no Path return value — used by dispatch_cli + step_runner
    on the video-sidecar write path.
    """
    from recoil.core.atomic_write import atomic_write_json, with_sidecar_lock

    # Sidecar convention: `<media_name>.json` sitting next to the media file,
    # so stripping the `.json` suffix reconstructs the media path the lock
    # keys on.
    media_path = sidecar_path.with_suffix("")
    sidecar_path.parent.mkdir(parents=True, exist_ok=True)
    with with_sidecar_lock(media_path):
        atomic_write_json(sidecar_path, sidecar_dict, indent=2)


__all__ = [
    # asset-manifest read protocol
    "SidecarRead",
    "read_sidecar",
    "write_sidecar",
    "compute_sha256",
    "invalidate_cache",
    # video sidecar populator (R4)
    "SIDECAR_SCHEMA_VERSION",
    "populate_sidecar",
    "write_sidecar_dict",
]
