#!/usr/bin/env python3
"""Universal sidecar read/write module for the Recoil Workspace.

Every media file in the project tree (assets/, sequences/, renders/, state/)
gets a companion .json sidecar containing its provenance, status, and
lineage. This module handles all sidecar CRUD.

Sidecar naming: {filename}.json — e.g., hero.jpg → hero.jpg.json
This is DIFFERENT from pipeline/lib/sidecar.py which writes to _meta/{filename}.json.
The _meta pattern is used for _canonical/ refs only. This module handles everything else.

Atomic writes: tempfile + os.replace (same pattern as workspace/state.py).
"""

# Pass-video sidecars (video/ep_NNN/EP001_PASS_...take1.mp4.json):
#   Must include "format_type" in generation_params. Values: "A" | "B" | "C" |
#   other identifiers set by coverage_planner. This field is sidecar-only —
#   it is NOT present in the filename as of Phase 1 (SYNTHESIS §4).
#
# Extracted-segment sidecars (shot_NNN[~x]_FROM_PASS_...take1.mp4.json):
#   Include three alignment fields introduced by AMEND_SPEC_01 Phase 1:
#     segment_class:     "planner-intended" | "model-added" | None
#     parent_shot_id:    str | None   (populated only for model-added segments)
#     alignment_delta_s: float | None (distance from aligned detected cut to
#                                      the expected planner boundary; None for
#                                      model-added or missed boundaries)

import contextlib
import fcntl
import json
import logging
import os
import shutil
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

_log = logging.getLogger(__name__)

# ── Tenet 6: typed-exception import ───────────────────────────
_RECOIL_ROOT = Path(__file__).resolve().parent.parent
if str(_RECOIL_ROOT) not in sys.path:
    sys.path.insert(0, str(_RECOIL_ROOT))
from recoil.core.exceptions import (  # noqa: E402
    CastingFragmentCorruptError,
    SidecarCorruptError,
)
from recoil.core.atomic_write import atomic_write_json  # noqa: E402

# ── Constants ─────────────────────────────────────────────────

# `as SCHEMA_VERSION` rebind keeps this module's existing public attribute
# name byte-stable for every existing import site.
from recoil.pipeline._lib.schema_versions import SIDECAR_SCHEMA_VERSION as SCHEMA_VERSION  # noqa: E402

SIDECAR_VALID_STATUSES = frozenset(
    {
        "candidate",
        "pinned",
        "canonical",
        "archived",
    }
)

SIDECAR_VALID_SOURCES = frozenset(
    {
        "manual_drop",
        "pipeline",
        "pass_extraction",
    }
)
# FROZEN CONTRACT (data-contracts.md §1a): the value set MUST NOT change.


class SidecarFieldError(ValueError):
    """Raised when set_status receives an unknown extra kwarg.

    Enforces the frozen-contract field set per data-contracts.md §1a.
    """
    pass


class PromoteRoundTripError(RuntimeError):
    """promote_to_canonical wrote a shelf hero that the collapsed resolver could not resolve back as a shelf is_hero asset."""


_SIDECAR_EXTRA_ALLOWED = frozenset({
    "notes",
    "replaced_by",
    "promoted_to",
    "promoted_from",
    "archived_at",
    "archived_from",
    "tags",
    "lineage",
    "provenance",  # populate_sidecar fills this dict
})

MEDIA_EXTENSIONS = frozenset(
    {
        ".png",
        ".jpg",
        ".jpeg",
        ".webp",
        ".mp4",
        ".mov",
        ".webm",
    }
)


# ── Helpers ───────────────────────────────────────────────────


def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


def _sidecar_path(media_path: Path) -> Path:
    """Return the sidecar path for a media file: same dir, {name}.json."""
    return media_path.parent / f"{media_path.name}.json"


def _is_sidecar_file(path: Path) -> bool:
    """Check if a path is itself a sidecar JSON file (to avoid recursion)."""
    if path.suffix != ".json":
        return False
    # A sidecar is {mediafile}.json — check if stripping .json yields a media ext
    stem_path = Path(path.stem)
    return stem_path.suffix.lower() in MEDIA_EXTENSIONS


# Phase 20 SSOT: atomic_write_json (recoil.core.atomic_write) replaces the
# local _atomic_write_json_locked impl. Caller is still responsible for
# holding _with_sidecar_lock — the SSOT call is structurally identical
# (tempfile + EINVAL-tolerant fsync + os.replace) plus a stricter fsync the
# old impl skipped.

# Backward-compat aliases retained for workspace/server.py:1767 archive scan
# loop. Bypasses _with_sidecar_lock — Tenet 6 violation flagged in original
# alias docstring. The server.py callsite must migrate to write_sidecar() /
# set_status() and these aliases must then be deleted.
_atomic_write_json_locked = atomic_write_json
_atomic_write_json = atomic_write_json


def _sidecar_lock_path(media_path: Path) -> Path:
    """Per-sidecar lock file: .{media_name}.json.lock (hidden sibling).

    Retained as a thin wrapper for callers that want the path string
    without acquiring the lock (e.g., diagnostic tools).
    """
    return media_path.parent / f".{media_path.name}.json.lock"


# R6 Phase 7: lock helper migrated to the shared SSOT in
# recoil/core/atomic_write.py. The local wrapper preserves the existing
# `_with_sidecar_lock(media_path)` call signature so workspace internals
# (set_status, archive_with_sidecar, restore_from_archive, promote_to_canonical,
# ensure_sidecar, _create_stub_sidecar_locked callers) keep working byte-stable.
from recoil.core.atomic_write import with_sidecar_lock as _with_sidecar_lock  # noqa: E402


@contextlib.contextmanager
def _with_casting_state_lock(state_path: Path):
    """Acquire fcntl.flock on casting_state.json's lock file for a RMW critical section.

    casting_state.json is a shared per-project file mutated by promote_to_canonical;
    concurrent promotes against different entities would otherwise race.
    """
    lock_path = state_path.parent / f".{state_path.name}.lock"
    lock_path.parent.mkdir(parents=True, exist_ok=True)
    lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR)
    try:
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        yield
    finally:
        fcntl.flock(lock_fd, fcntl.LOCK_UN)
        os.close(lock_fd)


# ── Core Read/Write ───────────────────────────────────────────


def read_sidecar(media_path: Path) -> Optional[dict]:
    """Read the sidecar JSON for a media file.

    Returns None if no sidecar exists. Raises ``SidecarCorruptError`` per
    Tenet 6 if the sidecar exists but cannot be parsed (corrupt JSON, I/O
    error). The two conditions are no longer collapsed — silent corruption
    used to mask provenance loss.
    """
    sc = _sidecar_path(media_path)
    if not sc.is_file():
        return None
    try:
        return json.loads(sc.read_text(encoding="utf-8"))
    except FileNotFoundError:
        # Race: file existed at .is_file() check, gone before read.
        return None
    except (json.JSONDecodeError, OSError) as e:
        _log.warning(
            "sidecar.read_sidecar: corrupt %s (%s)",
            sc, e.__class__.__name__,
        )
        raise SidecarCorruptError(str(sc), message=str(e)) from e


def _write_sidecar_locked(media_path: Path, data: dict) -> Path:
    """Stamp + validate + atomic-write sidecar. Caller must hold the lock."""
    # setdefault preserves any prior on-disk version so a future constant
    # bump does not silently rewrite legacy records.
    data.setdefault("schema_version", SCHEMA_VERSION)
    data["updated_at"] = _now_iso()

    if "source" in data and data["source"] not in SIDECAR_VALID_SOURCES:
        raise ValueError(
            f"Invalid sidecar source {data['source']!r}. Must be one of: "
            f"{', '.join(sorted(SIDECAR_VALID_SOURCES))}"
        )

    sc = _sidecar_path(media_path)
    _atomic_write_json_locked(sc, data)
    return sc


def write_sidecar(media_path: Path, data: dict) -> Path:
    """Write/update sidecar JSON for a media file.

    Atomic + cross-process safe via per-sidecar fcntl.flock.
    Always sets updated_at and ensures schema_version is present.
    Validates `source` enum if present.
    Returns the sidecar file path.
    """
    with _with_sidecar_lock(media_path):
        return _write_sidecar_locked(media_path, data)


def _create_stub_sidecar_locked(media_path: Path) -> dict:
    """Stub-create body. Caller must hold the per-sidecar lock."""
    data = {
        "schema_version": SCHEMA_VERSION,
        "source": "manual_drop",
        "status": "candidate",
        "created_at": _now_iso(),
        "updated_at": _now_iso(),
        "provenance": {},
        "lineage": {},
        "notes": "",
        "tags": [],
        "segment_class": None,
        "parent_shot_id": None,
        "alignment_delta_s": None,
    }
    _write_sidecar_locked(media_path, data)
    return data


def _ensure_sidecar_locked(media_path: Path) -> dict:
    """Read-or-stub body. Caller must hold the per-sidecar lock."""
    existing = read_sidecar(media_path)
    if existing is not None:
        return existing
    return _create_stub_sidecar_locked(media_path)


def ensure_sidecar(media_path: Path) -> dict:
    """Read existing sidecar or create a stub for manual drops.

    Cross-process safe: only acquires the lock when a stub-create is needed.
    Returns the sidecar data (either existing or newly created).
    """
    existing = read_sidecar(media_path)
    if existing is not None:
        return existing
    with _with_sidecar_lock(media_path):
        # Re-check inside the lock to avoid duplicate stub-create races.
        existing = read_sidecar(media_path)
        if existing is not None:
            return existing
        return _create_stub_sidecar_locked(media_path)


def create_stub_sidecar(media_path: Path) -> dict:
    """Create a minimal sidecar for a manually-dropped file.

    Cross-process safe via per-sidecar fcntl.flock.

    The alignment fields (segment_class, parent_shot_id, alignment_delta_s)
    default to None and are populated by the pass-segment writer when this
    stub belongs to an extracted segment.

    Returns the created sidecar data.
    """
    with _with_sidecar_lock(media_path):
        return _create_stub_sidecar_locked(media_path)


# ── Status Management ─────────────────────────────────────────


def set_status(
    media_path: Path,
    status: str,
    *,
    notes: Optional[str] = None,
    replaced_by: Optional[str] = None,
    promoted_to: Optional[str] = None,
    promoted_from: Optional[str] = None,
    archived_at: Optional[str] = None,
    archived_from: Optional[str] = None,
    tags: Optional[list[str]] = None,
    lineage: Optional[dict] = None,
) -> dict:
    """Update the status field in a sidecar. Returns updated sidecar.

    Frozen-contract fields per data-contracts.md §1a:
      status, notes, replaced_by, promoted_to, promoted_from,
      archived_at, archived_from, tags, lineage, provenance.

    Note: `source` is not a parameter here — it's set at sidecar creation
    time by `create_stub_sidecar` (manual_drop) or
    `recoil.pipeline._lib.sidecar.populate_sidecar` (pipeline) or
    `_write_segment_sidecars` (pass_extraction).

    Unknown kwargs raise TypeError (Tenet 6 fail-loud).
    """
    if status not in SIDECAR_VALID_STATUSES:
        raise ValueError(
            f"Invalid status '{status}'. Must be one of: "
            f"{', '.join(sorted(SIDECAR_VALID_STATUSES))}"
        )

    with _with_sidecar_lock(media_path):
        data = _ensure_sidecar_locked(media_path)
        data["status"] = status

        explicit = {
            "notes": notes,
            "replaced_by": replaced_by,
            "promoted_to": promoted_to,
            "promoted_from": promoted_from,
            "archived_at": archived_at,
            "archived_from": archived_from,
            "tags": tags,
            "lineage": lineage,
        }
        for key, value in explicit.items():
            if value is not None:
                data[key] = value

        _write_sidecar_locked(media_path, data)
    return data


def get_status(media_path: Path) -> Optional[str]:
    """Get the status from a file's sidecar. Returns None if no sidecar."""
    data = read_sidecar(media_path)
    if data is None:
        return None
    return data.get("status")


# ── Canonical Promotion ───────────────────────────────────────


def promote_to_canonical(
    media_path: Path,
    asset_type: str,
    entity_id: str,
    project_dir: Path,
    *,
    kind: str = "identity",
    look: str = "base",
) -> dict:
    """v3 promote: pool-first, content-hash lineage, single-hero, byte-copy.

    v3 layout (post layout-v3 + ref-taxonomy migration): canonical assets
    live at ``assets/{class}/{subject}/{look}/`` with hero refs named
    ``{subject}_{kind}.{ext}`` at the look root and pool variants under
    ``pool/{kind}/``.

    The legacy ``asset_type`` argument (characters/locations/props) is
    translated to its v3 class ("char"/"loc"/"prop") via the taxonomy SSOT
    before path construction.

    Steps:
      1. Pool-first: move source to pool if not already there.
      2. Compute source SHA-256 (frozen in hero sidecar).
      3. Demote current hero in pool (is_hero → false).
      4. Delete old hero at look root (single-hero invariant).
      5. Byte-copy pool source → hero at look root. NEVER re-encode.
      6. Assert copy integrity (SHA-256 match).
      7. Write hero sidecar with derived_from lineage.
      8. Flag pool source as canonical + is_hero.
      9. Update casting_state.json (convenience index).
     10. Run reconcile on the look dir.

    Returns the updated sidecar data for the pool source file.
    """
    import hashlib
    import re

    from recoil.core.paths import ProjectPaths, VALID_ASSET_CLASSES, VALID_REF_TYPES
    from recoil.core.ref_resolver import resolve_reference_bundle
    from recoil.core.ref_stem import ref_filename, subject_stem
    from recoil.core.reconcile_assets import reconcile_subject
    from recoil.pipeline._lib.taxonomy import (
        EXTENSION_NORMALIZE,
    )

    # ── 0. Resolve v3 asset class from legacy asset_type ──────────
    _LEGACY_PLURAL_TO_CLASS = {
        "characters": "char",
        "character": "char",
        "locations": "loc",
        "location": "loc",
        "props": "prop",
        "prop": "prop",
    }
    asset_class = _LEGACY_PLURAL_TO_CLASS.get(asset_type)
    if asset_class is None:
        # Accept raw v3 class names too
        if asset_type in VALID_ASSET_CLASSES:
            asset_class = asset_type
        else:
            raise ValueError(f"Unknown asset_type: {asset_type}")

    # Also compute legacy folder key for casting_state.json backward compat
    _CLASS_TO_LEGACY_PLURAL = {
        "char": "characters",
        "loc": "locations",
        "prop": "props",
    }
    cast_type = _CLASS_TO_LEGACY_PLURAL[asset_class]

    # Normalize entity_id: lowercase, spaces/hyphens → underscores, strip non-alnum
    entity_id = re.sub(r"[\s\-]+", "_", entity_id.strip().lower())
    entity_id = re.sub(r"[^a-z0-9_]", "", entity_id)
    if not entity_id:
        raise ValueError("Entity ID is empty after normalization")

    # Validate kind
    if kind not in VALID_REF_TYPES:
        raise ValueError(
            f"Invalid ref kind: {kind!r}. Must be one of {sorted(VALID_REF_TYPES)}"
        )

    # ── Build paths via ProjectPaths ──────────────────────────────
    ppaths = ProjectPaths.from_root(project_dir)
    look_dir = ppaths.asset_look_dir(asset_class, entity_id, look)
    look_dir.mkdir(parents=True, exist_ok=True)
    pool_kind_dir = ppaths.pool_dir(asset_class, entity_id, kind, look)
    pool_kind_dir.mkdir(parents=True, exist_ok=True)

    # ── Helper: compute SHA-256 of a file ─────────────────────────
    def _sha256(p: Path) -> str:
        h = hashlib.sha256()
        with open(p, "rb") as f:
            for chunk in iter(lambda: f.read(65536), b""):
                h.update(chunk)
        return h.hexdigest()

    # ── 1. Pool-first: ensure source lives in pool/{kind}/ ────────
    # Normalize extension: .jpg → .jpeg
    ext = media_path.suffix
    norm_ext = EXTENSION_NORMALIZE.get(ext.lower(), ext)

    # Determine the next version number for pool naming
    existing_versions = []
    for f in pool_kind_dir.iterdir():
        m = re.match(
            rf"^{re.escape(entity_id)}_{re.escape(kind)}_[a-z0-9\-]+_v(\d{{2}})\.\w+$",
            f.name,
        )
        if m:
            existing_versions.append(int(m.group(1)))
    next_version = max(existing_versions, default=0) + 1

    # Check if source is already in the pool dir
    try:
        media_path.relative_to(pool_kind_dir)
        pool_source = media_path
    except ValueError:
        # Source is NOT in pool — move it there with canonical name
        # Use "hero" as the variant for promoted files
        pool_name = f"{entity_id}_{kind}_hero_v{next_version:02d}{norm_ext}"
        pool_source = pool_kind_dir / pool_name
        shutil.move(str(media_path), str(pool_source))
        # Move sidecar too if it exists
        old_sc = _sidecar_path(media_path)
        if old_sc.is_file():
            new_sc = _sidecar_path(pool_source)
            shutil.move(str(old_sc), str(new_sc))

    # ── 2. Compute source hash ────────────────────────────────────
    source_sha256 = _sha256(pool_source)

    # ── 3. Demote current hero in pool ────────────────────────────
    # Find all pool files with is_hero: true in their sidecars, set false
    for pool_file in pool_kind_dir.iterdir():
        if pool_file.suffix.lower() not in MEDIA_EXTENSIONS:
            continue
        if pool_file == pool_source:
            continue
        pool_sc = read_sidecar(pool_file)
        if pool_sc and pool_sc.get("is_hero"):
            with _with_sidecar_lock(pool_file):
                sc = _ensure_sidecar_locked(pool_file)
                sc["is_hero"] = False
                sc["status"] = "approved"
                _write_sidecar_locked(pool_file, sc)

    # ── 4. Delete old hero at look root (single-hero invariant) ───
    hero_stems = {
        f"{entity_id}_{kind}",
        f"{subject_stem(entity_id)}-{kind}",
    }
    for hero_stem in hero_stems:
        for old_hero in look_dir.glob(f"{hero_stem}.*"):
            if old_hero.suffix.lower() in MEDIA_EXTENSIONS:
                # Also remove its sidecar
                old_hero_sc = _sidecar_path(old_hero)
                if old_hero_sc.is_file():
                    old_hero_sc.unlink()
                old_hero.unlink()
                _log.info("Removed old hero: %s", old_hero)

    # ── 5. Byte-copy pool source → hero at look root ─────────────
    pool_ext = pool_source.suffix
    hero_dest = look_dir / ref_filename(entity_id, kind, pool_ext)
    shutil.copyfile(str(pool_source), str(hero_dest))

    # ── 6. Assert copy integrity ──────────────────────────────────
    hero_sha256 = _sha256(hero_dest)
    if hero_sha256 != source_sha256:
        # Clean up the corrupt copy before raising
        hero_dest.unlink(missing_ok=True)
        raise RuntimeError(
            f"Byte-copy integrity failure: pool source SHA-256 {source_sha256} "
            f"!= hero copy SHA-256 {hero_sha256}"
        )

    bundle = resolve_reference_bundle(ppaths, asset_class, entity_id, kind)
    if not any(
        a.source == "shelf" and a.is_hero and a.path == hero_dest
        for a in bundle.assets
    ):
        raise PromoteRoundTripError(
            f"Promoted hero did not round-trip through resolver: {hero_dest}"
        )

    # ── 7. Write hero sidecar ─────────────────────────────────────
    # Read pool sidecar as base
    with _with_sidecar_lock(pool_source):
        pool_sidecar = read_sidecar(pool_source)
        if pool_sidecar is None:
            pool_sidecar = _create_stub_sidecar_locked(pool_source)

    hero_sidecar_data = dict(pool_sidecar)
    hero_sidecar_data["status"] = "canonical"
    hero_sidecar_data["is_hero"] = True
    hero_sidecar_data["content_sha256"] = hero_sha256
    # NB: do NOT overwrite "source" — it is a strict enum (SIDECAR_VALID_SOURCES).
    # The hero inherits the pool image's valid source; pool→canonical provenance
    # is tracked via casting_state / "promoted_to", not "source". (Writing the
    # pool path here failed _write_sidecar_locked's enum validation — crashing
    # every promotion.)

    # Add derived_from for derivative kinds (turn, closeup, expr, fullbody)
    _DERIVATIVE_KINDS = frozenset({"turn", "closeup", "expr", "fullbody"})
    if kind in _DERIVATIVE_KINDS:
        hero_sidecar_data["derived_from"] = {
            "source_stem": pool_source.stem,
            "source_kind": kind,
            "source_sha256": source_sha256,
        }
    else:
        # Primary kind (identity) — no derived_from, but record source hash
        hero_sidecar_data.setdefault("derived_from", {})
        hero_sidecar_data["derived_from"]["source_sha256"] = source_sha256

    with _with_sidecar_lock(hero_dest):
        _write_sidecar_locked(hero_dest, hero_sidecar_data)

    # ── 8. Flag pool source as canonical + is_hero ────────────────
    with _with_sidecar_lock(pool_source):
        pool_sc = _ensure_sidecar_locked(pool_source)
        pool_sc["status"] = "canonical"
        pool_sc["is_hero"] = True
        pool_sc["content_sha256"] = source_sha256
        pool_sc["promoted_to"] = str(hero_dest.relative_to(project_dir))
        _write_sidecar_locked(pool_source, pool_sc)

    # ── 9. Update casting_state.json (convenience index) ──────────
    casting_state_path = ppaths.casting_state_path
    hero_rel = str(hero_dest.relative_to(project_dir))
    with _with_casting_state_lock(casting_state_path):
        if casting_state_path.is_file():
            try:
                casting = json.loads(casting_state_path.read_text(encoding="utf-8"))
            except (json.JSONDecodeError, OSError) as e:
                _log.warning(
                    "sidecar: corrupt casting fragment at %s (%s)",
                    casting_state_path, e,
                )
                raise CastingFragmentCorruptError(
                    str(casting_state_path), message=str(e)
                ) from e
        else:
            casting = {}

        if cast_type not in casting:
            casting[cast_type] = {}
        if entity_id not in casting[cast_type]:
            casting[cast_type][entity_id] = {}

        casting[cast_type][entity_id]["hero_path"] = hero_rel
        casting[cast_type][entity_id]["updated_at"] = _now_iso()
        casting[cast_type][entity_id]["source_sha256"] = source_sha256
        _atomic_write_json_locked(casting_state_path, casting)

    # ── 10. Run reconcile on the look dir ─────────────────────────
    try:
        reconcile_subject(look_dir)
    except Exception as e:
        # Reconcile failure is non-fatal for promotion — log and continue
        _log.warning(
            "promote_to_canonical: reconcile_subject failed for %s: %s",
            look_dir, e,
        )

    return pool_sc


# ── Archive / Restore ─────────────────────────────────────────


def archive_with_sidecar(media_path: Path, project_dir: Path) -> Path:
    """Move file + sidecar to v3 archive root, preserving relative path.

    v3 layout: archives live at ``_history/archives/`` via
    ``ProjectPaths.history_archives_dir``. The v1 output/ tree was destroyed
    in the v2 migration; writing to output/_archive/ would re-create a stray
    directory alongside the v3 tree (assets/char/, sequences/, renders/).

    Returns the archive destination path for the media file.
    """
    from recoil.core.paths import ProjectPaths

    ppaths = ProjectPaths.from_root(project_dir)
    archive_dir = ppaths.history_archives_dir

    # Compute relative path from project dir
    try:
        rel = media_path.relative_to(project_dir)
    except ValueError:
        # Fallback: use just the filename
        rel = Path(media_path.name)

    archive_dest = archive_dir / rel
    archive_dest.parent.mkdir(parents=True, exist_ok=True)

    # Update sidecar status + move under one critical section so a concurrent
    # writer can't slip a write in between read-status and the file move.
    with _with_sidecar_lock(media_path):
        data = _ensure_sidecar_locked(media_path)
        data["status"] = "archived"
        data["archived_at"] = _now_iso()
        data["archived_from"] = str(rel)
        _write_sidecar_locked(media_path, data)

        # Move media file
        shutil.move(str(media_path), str(archive_dest))

        # Move sidecar
        sc_source = _sidecar_path(media_path)
        if sc_source.is_file():
            sc_dest = _sidecar_path(archive_dest)
            sc_dest.parent.mkdir(parents=True, exist_ok=True)
            shutil.move(str(sc_source), str(sc_dest))

    # Clean up empty parent directories up to the project root (v2: stop at
    # project_dir — v1 stopped at output/ which no longer exists).
    parent = media_path.parent
    while parent != project_dir and parent.is_dir():
        # Only remove if truly empty (no files, no subdirs except _meta)
        # Skip _meta and per-sidecar lock files (.<name>.json.lock); they
        # don't count as "still has content" for the empty-dir cleanup.
        remaining = [
            p for p in parent.iterdir()
            if p.name != "_meta"
            and not (p.name.startswith(".") and p.name.endswith(".lock"))
        ]
        meta_dir = parent / "_meta"
        if not remaining:
            # Remove empty _meta too if it exists and is empty
            if meta_dir.is_dir() and not list(meta_dir.iterdir()):
                meta_dir.rmdir()
            if not list(parent.iterdir()):
                parent.rmdir()
                parent = parent.parent
            else:
                break
        else:
            break

    return archive_dest


def restore_from_archive(archive_path: Path, project_dir: Path) -> Path:
    """Move file + sidecar back from _archive/ to original location.

    Uses the archived_from field in the sidecar to determine the original path.
    Falls back to stripping the _archive prefix if no archived_from field.
    Returns the restored path.

    Security (R7.1): `archived_from` comes from a JSON file that may be older
    than the current code. We refuse absolute paths or `..` traversal, and
    belt-and-suspenders verify that the resolved target lives under
    `project_dir` before doing anything destructive.

    Compatibility (R7.2): pre-v3 archives store `archived_from` strings like
    `output/refs/characters/...` (v1) or `assets/identity/...` (v2).
    Translate those legacy prefixes to their v3 equivalents (e.g.
    `assets/char/`) before building the restore target — otherwise restoring
    recreates a deprecated tree alongside the live v3 layout.
    """
    # Read sidecar for original location
    data = read_sidecar(archive_path)
    if data and data.get("archived_from"):
        original_rel = data["archived_from"]

        # R7.2: translate v1/v2 prefixes → v3 (assets/char/, etc.) before
        # building the target. The SSOT for all prefix translations lives in
        # path_migration.py; the inline _LEGACY_PREFIX_MAP was consolidated
        # there in Phase 3.
        from recoil.core.path_migration import translate_legacy_path

        original_rel = translate_legacy_path(original_rel)

        # R7.1: reject absolute paths and `..` traversal up front.
        candidate = Path(original_rel)
        if candidate.is_absolute():
            _log.warning(
                "Refusing to restore — archived_from is absolute: %s",
                original_rel,
            )
            raise ValueError(
                f"Refusing to restore {archive_path}: archived_from is absolute"
            )
        if ".." in candidate.parts:
            _log.warning(
                "Refusing to restore — archived_from contains '..': %s",
                original_rel,
            )
            raise ValueError(
                f"Refusing to restore {archive_path}: archived_from contains '..'"
            )

        restore_dest = project_dir / candidate

        # Belt-and-suspenders: verify the resolved target stays under project_dir.
        try:
            restore_dest.resolve().relative_to(project_dir.resolve())
        except ValueError:
            _log.warning(
                "Refusing to restore — resolved path escapes project: %s",
                restore_dest,
            )
            raise ValueError(
                f"Refusing to restore {archive_path}: resolved path escapes project_dir"
            )
    else:
        # Fallback: strip the archive root prefix.
        # v3 layout: archives live under _history/archives/ via ProjectPaths.
        # The legacy output/_archive/ root is retained as a secondary
        # fallback for straggler files that survived migration.
        from recoil.core.paths import ProjectPaths

        ppaths = ProjectPaths.from_root(project_dir)
        archive_dir = ppaths.history_archives_dir
        try:
            rel = archive_path.relative_to(archive_dir)
        except ValueError:
            legacy_archive_dir = project_dir / "output" / "_archive"
            try:
                rel = archive_path.relative_to(legacy_archive_dir)
            except ValueError:
                raise ValueError(f"Cannot determine restore path for {archive_path}")
        restore_dest = project_dir / rel

    restore_dest.parent.mkdir(parents=True, exist_ok=True)

    # Update sidecar status + move under one critical section so a concurrent
    # writer can't slip a write in between the status update and the file move.
    with _with_sidecar_lock(archive_path):
        if data:
            data["status"] = "candidate"
            data.pop("archived_at", None)
            data.pop("archived_from", None)
            _write_sidecar_locked(archive_path, data)

        # Move media file
        shutil.move(str(archive_path), str(restore_dest))

        # Move sidecar
        sc_source = _sidecar_path(archive_path)
        if sc_source.is_file():
            sc_dest = _sidecar_path(restore_dest)
            sc_dest.parent.mkdir(parents=True, exist_ok=True)
            shutil.move(str(sc_source), str(sc_dest))

    return restore_dest


# ── Scanner ───────────────────────────────────────────────────


def scan_for_missing_sidecars(output_dir: Path) -> list[Path]:
    """Find media files without sidecars in an output directory.

    Skips _meta/ directories and sidecar JSON files themselves.
    Returns list of media file paths that lack a companion sidecar.
    """
    missing = []
    if not output_dir.is_dir():
        return missing

    for path in output_dir.rglob("*"):
        if not path.is_file():
            continue
        # Skip hidden files, _meta dirs, backup dirs
        parts = path.relative_to(output_dir).parts
        if any(
            p.startswith(".") or p == "_meta" or "backup" in p.lower() for p in parts
        ):
            continue
        # Skip non-media files
        if path.suffix.lower() not in MEDIA_EXTENSIONS:
            continue
        # Skip files that are sidecar JSONs themselves
        if _is_sidecar_file(path):
            continue
        # Check if sidecar exists
        if not _sidecar_path(path).is_file():
            missing.append(path)

    return missing


def auto_stub_missing(output_dir: Path) -> int:
    """Create stub sidecars for all media files missing them.

    Returns count of stubs created.
    """
    missing = scan_for_missing_sidecars(output_dir)
    for path in missing:
        create_stub_sidecar(path)
    return len(missing)


# ── Pipeline Sidecar Writer ───────────────────────────────────
# R6 Phase 7 (2026-05-22): `write_pipeline_sidecar` (_RETIRED) deleted. All
# production call sites migrated to
# `recoil.pipeline._lib.sidecar.populate_sidecar` + `write_sidecar_dict`. The
# previous re-export shim has been removed.
