"""workspace/tree.py — tree assembly + metadata index for the Recoil workspace.

Extracted from workspace/server.py in MF-5 (Engine Fix Sprint Phase B,
2026-04-30). Phase B carved this module out so Console v2's v2_dispatch.py
can call tree-assembly logic directly without going through legacy
web-framework routes.

Public surface declared in __all__. Internal helpers stay underscored.

Frozen contract: every public function returns dicts with the same keys /
types as the pre-Phase-B inline implementation. The audit-2026-04-25
data-contracts.md tree-payload contract is byte-stable across this
extraction (verified by Phase 6 gate).

Tenet 6 (Errors Must Be Visible): silent fallback patterns transcribed
verbatim from server.py are flagged with a TODO-PHASE-E comment. Phase E
addresses Tenet 6 violations across the engine; Phase B does not modify
error semantics.
"""

from __future__ import annotations

import json
import logging
import re
import sys
from collections import defaultdict
from collections.abc import Callable
from pathlib import Path

from recoil.core.naming import parse_filename
from recoil.core.paths import projects_root
from recoil.core.project import get_project
from recoil.execution.execution_store import ExecutionStore  # noqa: F401  (type hint surface)
from recoil.workspace import sidecar as ws_sidecar
from recoil.workspace.helpers import (
    get_store as _get_store,
    shot_status_color as _shot_status_color,
)

# ── Tenet 6: typed-exception import ───────────────────────────
_RECOIL_ROOT = Path(__file__).resolve().parent.parent
if str(_RECOIL_ROOT) not in sys.path:
    sys.path.insert(0, str(_RECOIL_ROOT))
from recoil.core.exceptions import (  # noqa: E402,F401
    ExecutionStoreUnavailableError,
    SidecarCorruptError,  # re-exported for callers that catch from recoil.workspace.tree
)

log = logging.getLogger("recoil.workspace.tree")


# ── Module-level constants ──────────────────────────────────────

MEDIA_EXTENSIONS: frozenset[str] = frozenset(
    {".png", ".jpg", ".jpeg", ".webp", ".mp4", ".mov", ".webm"}
)

# Matches: shot_001.png, shot_005a.png (keyframe/base — no take suffix)
# Matches: shot_001_take1.mp4, shot_001_take018.png, shot_005a_take3_31434.png
# Matches: shot_005_I2V_766B_take2.mp4, shot_005_R2V_V3_FE7E_take1.mp4
# The key: filename must start with "shot_" followed by digits (optionally a letter),
# then optionally model/hash tags, then optionally _take{N}[_{hash}]
SHOT_PATTERN: re.Pattern[str] = re.compile(
    r"^shot_(\d{1,4}[a-z]?)"  # shot_NNN or shot_NNNa
    r"(?:_[A-Z0-9]+)*?"  # optional model/hash tags (non-greedy): _I2V_766B, _R2V_V3_FE7E
    r"(?:_take(\d+))?"  # optional _takeN (capture take number)
    r"(?:_\d+)?"  # optional trailing hash: _31434
    r"\.[a-zA-Z0-9]+$"  # extension
)

# Legacy semantic-tag coverage names predate recoil.core.naming's short grammar.
# New workspace code parses via parse_filename first and uses this only as the
# EP2/render-review compatibility fallback.
LEGACY_SEMANTIC_PASS_PATTERN: re.Pattern[str] = re.compile(
    r"^(EP\d{3})_PASS_(\d{3})_SH(\d{1,4}[a-z]?(?:_\d{1,4}[a-z]?)*)_"
    r"([A-Z][A-Z0-9]*(?:_[A-Z][A-Z0-9]*)*)_take(\d+)"
    r"(?:_\d+)?"
    r"\.[a-zA-Z0-9]+$"
)
PASS_PATTERN = LEGACY_SEMANTIC_PASS_PATTERN

# R5 B3 fix (2026-05-21, Phase 5 path b) — boundary frames are extracted by
# ffmpeg in step_runner._extract_boundary_frames, NOT by Gemini Flash image
# generation. The workspace status banner previously inherited stale parent
# state (status="video_failed" + model="gemini-3.1-flash-image-preview"
# from the prior viewer selection) when a user clicked a *_seg*.jpg file,
# because the seg file's tree node carried no authoritative kind/status
# stamp and the JS viewer-status DOM elements were not refreshed. This
# regex + overlay (see _boundary_frame_overlay) lets the metadata index
# unambiguously mark every boundary frame so the frontend can render
# kind="boundary_frame" instead of leaking parent-pass state.
_BOUNDARY_FRAME_RE: re.Pattern[str] = re.compile(r"^(.+)_seg(\d+)\.jpg$")


def parse_pass_filename(filename: str) -> dict | None:
    """Parse workspace pass/group filenames into structured fields.

    Returns:
        {
            "episode": "EP001",
            "counter": "017",
            "shot_list": "33_33a_34_35",          # raw string form from filename
            "shot_tokens": ["33", "33a", "34", "35"],  # parsed list form
            "semantic_tag": "A_WREN",
            "take_num": 1,
        }
        or None on no match.
    """
    parsed = parse_filename(filename)
    if parsed:
        return {
            "episode": parsed["episode_token"],
            "counter": (
                f"{parsed['ordinal']:03d}"
                if parsed.get("ordinal") is not None
                else None
            ),
            "shot_list": parsed["shot_list"],
            "shot_tokens": list(parsed["shot_tokens"]),
            "semantic_tag": None,
            "take_num": int(parsed["take"]),
            "strategy": parsed["strategy"],
            "ordinal": int(parsed["ordinal"]),
            "grouping_token": parsed.get("grouping_token"),
            "legacy_grouping": bool(parsed.get("legacy_grouping")),
            "shot_ids": list(parsed.get("shot_ids") or []),
        }

    m = LEGACY_SEMANTIC_PASS_PATTERN.match(filename)
    if not m:
        return None
    shot_list = m.group(3)
    return {
        "episode": m.group(1),
        "counter": m.group(2),
        "shot_list": shot_list,
        "shot_tokens": shot_list.split("_"),
        "semantic_tag": m.group(4),
        "take_num": int(m.group(5)),
        "strategy": "coverage",
        "ordinal": int(m.group(2)),
        "grouping_token": "PASS",
        "legacy_grouping": True,
        "shot_ids": [f"{m.group(1)}_SH{tok}" for tok in shot_list.split("_")],
    }

# Single shot-token matcher — used by extraction and other consumers that need
# to walk a pass_id's tokens and find the boundary between shot tokens and
# semantic-tag tokens. Module scope so the regex is compiled once.
_SHOT_TOKEN_RE: re.Pattern[str] = re.compile(r"^(?:SH)?\d{1,4}[a-z]?$")

# Grouping strategy registry — populated below after function defs.
_GROUPERS: dict[str, Callable] = {}


# ── Pure regex parsers ───────────────────────────────────────────


def parse_shot_filename(filename: str) -> dict | None:
    """Parse a shot filename into shot_id and take info.

    Returns {"shot_num": "001", "take_num": int|None} on match, None otherwise.
    take_num is None for keyframe/base files (no _take suffix).
    """
    m = SHOT_PATTERN.match(filename)
    if not m:
        return None
    return {
        "shot_num": m.group(1),
        "take_num": int(m.group(2)) if m.group(2) else None,
    }


def normalize_shot_num(pass_shot_id: str) -> str:
    """Extract base shot number from PassStore shot ID and zero-pad.

    'EP001_SH05A_HATCH_MS' -> '005a'
    'EP001_SH12' -> '012'
    'SH3' -> '003'
    """
    m = re.match(r"(?:EP\d+_)?SH(\d+)([a-zA-Z]?)", pass_shot_id)
    if not m:
        return None
    num = m.group(1).zfill(3)
    suffix = m.group(2).lower()
    return num + suffix


def _pass_id_parts(pass_id: str) -> dict | None:
    m = re.match(
        r"^EP\d+_PASS_(\d{3})_SH([\d_a-zA-Z]+)_"
        r"([A-Z][A-Z0-9]*(?:_[A-Z][A-Z0-9]*)*)$",
        pass_id,
    )
    if not m:
        return None
    return {
        "counter": m.group(1),
        "shot_list": m.group(2),
        "shot_tokens": m.group(2).split("_"),
        "semantic_tag": m.group(3),
    }


def _pass_anchor_key(pass_id: str) -> str | None:
    parts = _pass_id_parts(pass_id)
    if not parts:
        return None
    return f"{parts['counter']}:{parts['semantic_tag']}"


def _grouping_from_sidecar(project: str, node: dict) -> dict | None:
    prov = node.get("provenance")
    if isinstance(prov, dict) and isinstance(prov.get("grouping"), dict):
        return prov["grouping"]
    prov = node.get("sidecar_provenance")
    if isinstance(prov, dict) and isinstance(prov.get("grouping"), dict):
        return prov["grouping"]

    rel_path = node.get("path")
    if not project or not isinstance(rel_path, str):
        return None
    media_path = projects_root() / project / rel_path
    try:
        data = ws_sidecar.read_sidecar(media_path)
    except SidecarCorruptError:
        raise
    except Exception:
        return None
    if not isinstance(data, dict):
        return None
    provenance = data.get("provenance")
    if not isinstance(provenance, dict):
        return None
    grouping = provenance.get("grouping")
    return grouping if isinstance(grouping, dict) else None


def _pass_record_for_file(
    *,
    parsed: dict,
    node: dict,
    grouping: dict | None,
    passes_by_id: dict[str, dict],
) -> tuple[str, dict] | tuple[None, None]:
    source_pass_id = None
    if isinstance(grouping, dict):
        source_pass_id = grouping.get("source_pass_id")
    if isinstance(source_pass_id, str) and source_pass_id in passes_by_id:
        return source_pass_id, passes_by_id[source_pass_id]

    if parsed.get("semantic_tag"):
        pass_id = (
            f"{parsed['episode']}_PASS_{parsed['counter']}_"
            f"SH{parsed['shot_list']}_{parsed['semantic_tag']}"
        )
        if pass_id in passes_by_id:
            return pass_id, passes_by_id[pass_id]

    rel_path = node.get("path")
    if isinstance(rel_path, str):
        for pass_id, record in passes_by_id.items():
            if record.get("video_path") == rel_path:
                return pass_id, record

    parsed_shots = {normalize_shot_num(sid) for sid in parsed.get("shot_ids") or []}
    parsed_shots.discard(None)
    for pass_id, record in passes_by_id.items():
        parts = _pass_id_parts(pass_id)
        if parts and parts["counter"] != parsed.get("counter"):
            continue
        record_shots = {
            normalize_shot_num(sid)
            for sid in (record.get("segment_shot_ids") or [])
        }
        record_shots.discard(None)
        if parsed_shots and record_shots == parsed_shots:
            return pass_id, record

    return None, None


def _pass_bucket_key(
    *,
    parsed: dict,
    node: dict,
    grouping: dict | None,
    passes_by_id: dict[str, dict],
) -> tuple[str, str | None, dict | None]:
    pass_id, record = _pass_record_for_file(
        parsed=parsed,
        node=node,
        grouping=grouping,
        passes_by_id=passes_by_id,
    )
    if pass_id:
        anchor = _pass_anchor_key(pass_id)
        if anchor:
            return anchor, pass_id, record

    strategy = (
        grouping.get("strategy")
        if isinstance(grouping, dict) and grouping.get("strategy")
        else parsed.get("strategy") or "coverage"
    )
    ordinal = (
        grouping.get("ordinal")
        if isinstance(grouping, dict) and grouping.get("ordinal") is not None
        else parsed.get("ordinal")
    )
    ordinal_token = f"{int(ordinal):03d}" if ordinal is not None else "000"
    return f"{strategy}:{ordinal_token}:{parsed.get('shot_list', '')}", pass_id, record


# ── Private helpers ──────────────────────────────────────────────


def _natural_sort_key(s: str) -> list:
    # Split into digit and non-digit chunks so V2 sorts before V10.
    return [int(c) if c.isdigit() else c.lower() for c in re.split(r"(\d+)", s)]


def _clean_location_name(slug: str) -> str:
    """Turn 'int_sadie_apartment' into 'Sadie Apartment'."""
    name = slug.replace("_", " ").title()
    for prefix in ("Int ", "Ext "):
        if name.startswith(prefix):
            name = name[len(prefix) :]
    return name


def _clean_episode_name(slug: str) -> str:
    name = slug.replace("ep_", "EP").replace("_", " ").upper()
    if name.startswith("EP") and len(name) <= 6:
        name = "Episode " + name[2:]
    return name


# ── Metadata index + scanning ────────────────────────────────────


def _boundary_frame_overlay(
    media_path: Path, project_dir: Path
) -> dict | None:
    """R5 B3 fix (Phase 5 path b)—neutral overlay for ffmpeg-extracted seg
    frames.

    Returns a meta_index entry for a *_seg*.jpg boundary frame that:
      - stamps kind="boundary_frame" so the frontend can route around the
        generic shot-detail status path (which was leaking
        "video_failed | gemini-3.1-flash-image-preview" from stale parent
        state),
      - reads model from the SOURCE VIDEO's sidecar (if present) so the
        provenance line in the inspector is accurate—NOT inferred from
        previs_model or from whatever shot the viewer last had loaded,
      - reports source video status as a separate field
        ("source_video_status") so the frontend can show
        "boundary_frame_from_rejected_source" when the parent pass was
        rejected, without misclassifying the frame itself as failed.

    Returns None if media_path is not a boundary frame.
    """
    m = _BOUNDARY_FRAME_RE.match(media_path.name)
    if not m:
        return None
    source_stem = m.group(1)
    parent_video_dir = media_path.parent.parent  # boundary_frames -> ep_NNN
    source_mp4 = parent_video_dir / f"{source_stem}.mp4"
    source_sidecar = parent_video_dir / f"{source_stem}.mp4.json"
    overlay: dict = {
        "kind": "boundary_frame",
        "role": "boundary_frame",
        "status": "boundary_frame",
        "status_color": "gray",
        "model": None,
        "source": "ffmpeg_extraction",
        "source_video": (
            str(source_mp4.relative_to(project_dir))
            if source_mp4.exists()
            else None
        ),
        "source_video_status": None,
    }
    if source_sidecar.exists():
        try:
            sc = json.loads(source_sidecar.read_text())
            prov = sc.get("provenance") or {}
            overlay["model"] = prov.get("model")
            src_status = sc.get("status")
            overlay["source_video_status"] = src_status
            # If the source video pass failed, the boundary frame inherits
            # "video_failed" as its SOURCE state, but its OWN kind is still
            # boundary_frame — surface that distinction explicitly so the
            # status banner does not read "video_failed".
            if src_status == "rejected":
                overlay["status"] = "boundary_frame_from_rejected_source"
        except (json.JSONDecodeError, OSError):
            # Best-effort: skip corrupt/unreadable source sidecar, keep the
            # neutral overlay so the seg frame at least renders correctly.
            pass
    return overlay


def build_metadata_index(project: str) -> dict[str, dict]:
    """Build a path-keyed metadata lookup from 3 data sources.

    Priority (highest to lowest):
    1. Universal sidecar ({file}.json) — status, source, notes
    2. ExecutionStore takes — shot_id, pipeline status, take info
    3. Canonical ref sidecars (_meta/{file}.json) — canonical ref metadata
    """
    index = {}
    project_dir = projects_root() / project

    # Index ExecutionStore takes (keyed by relative file_path)
    try:
        store = _get_store(project)
        for shot in store.get_all_shots():
            shot_id = shot.get("shot_id", "")
            status = shot.get("status", "previs_pending")
            model = shot.get("model")
            for i, take in enumerate(shot.get("takes", [])):
                fp = take.get("file_path", "")
                if fp:
                    index[fp] = {
                        "shot_id": shot_id,
                        "status": status,
                        "status_color": _shot_status_color(status),
                        "model": model,
                        "cost": take.get("cost", 0),
                        "take_index": i,
                        "take_id": take.get("take_id", f"T{i + 1}"),
                        "source": "pipeline",
                    }
        store.close()
    except Exception as e:
        log.warning("Could not read ExecutionStore for metadata index: %s", e)
        # TODO-PHASE-E (MF-22 / Tenet 6): currently swallows failure. Phase E
        # determines whether this is a sanctioned fallback or a silent-swallow
        # violation. DO NOT modify error semantics in Phase B.

    # Index ref sidecars (v2 layout: assets/{kind}/{subject}/_meta/*.json)
    # Replaces the v1 _canonical/ scan deleted in project-paths-refactor-v2.
    from recoil.core.paths import VALID_ASSET_KINDS
    assets_dir = project_dir / "assets"
    if assets_dir.is_dir():
        for kind in VALID_ASSET_KINDS:
            kind_dir = assets_dir / kind
            if not kind_dir.is_dir():
                continue
            for meta_file in kind_dir.rglob("_meta/*.json"):
                try:
                    sidecar = json.loads(meta_file.read_text())
                    ref_file = sidecar.get("file", {})
                    ref_path = ref_file.get("path", "")
                    if ref_path:
                        try:
                            rel = str(Path(ref_path).relative_to(project_dir))
                        except ValueError:
                            rel = ref_path
                        index[rel] = {
                            "source": "asset_ref",
                            "status": "hero",
                            "status_color": "blue",
                            "ref_type": sidecar.get("context", {}).get(
                                "ref_type", kind,
                            ),
                            "model": sidecar.get("model"),
                        }
                except (json.JSONDecodeError, OSError) as e:
                    log.warning(
                        "tree: skipping corrupt asset-ref sidecar %s (%s: %s)",
                        meta_file, e.__class__.__name__, e,
                    )

    # Index universal sidecars ({file}.json) — overlay on top of existing entries.
    # v2 layout: walk every v2 media root (assets/, sequences/, renders/, state/)
    # instead of the deleted v1 output/ tree. Mirrors the mcp_server.py pattern.
    for root_name in ("assets", "sequences", "renders", "state"):
        root = project_dir / root_name
        if not root.is_dir():
            continue
        for sc_path in root.rglob("*.json"):
            # Skip _meta/ sidecars (handled above), hidden files, non-sidecar JSON
            if "_meta" in sc_path.parts:
                continue
            # v2: never walk _history/ archives. Exact match (R10.3) — startswith
            # would false-positive sibling dirs like "_history_backup".
            if any(p == "_history" for p in sc_path.parts):
                continue
            if sc_path.name.startswith("."):
                continue
            # A universal sidecar is {mediafile}.json — the stem must have a media extension
            stem_as_path = Path(sc_path.stem)
            if stem_as_path.suffix.lower() not in ws_sidecar.MEDIA_EXTENSIONS:
                continue
            # The media file this sidecar belongs to
            media_path = sc_path.parent / sc_path.stem
            if not media_path.is_file():
                continue

            try:
                sc_data = json.loads(sc_path.read_text(encoding="utf-8"))
            except FileNotFoundError:
                # File deleted between rglob discovery and read_text (TOCTOU).
                # Skip silently — not corruption.
                continue
            except (json.JSONDecodeError, OSError) as e:
                # Tenet 6: skip-with-visibility — corrupt take/universal sidecar
                # logged at WARNING and dropped from tree (not silent continue).
                log.warning(
                    "tree: skipping corrupt take metadata at %s — %s: %s",
                    sc_path,
                    e.__class__.__name__,
                    e,
                )
                continue

            try:
                rel = str(media_path.relative_to(project_dir))
            except ValueError:
                continue

            sc_status = sc_data.get("status", "candidate")
            sc_source = sc_data.get("source", "unknown")

            # Map sidecar status to display color
            status_color_map = {
                "candidate": "gray",
                "pinned": "amber",
                "canonical": "blue",
                "archived": "gray",
            }

            # Build sidecar overlay — merge with existing entry if present
            sc_overlay = {
                "sidecar_status": sc_status,
                "sidecar_source": sc_source,
                "sidecar_notes": sc_data.get("notes", ""),
                "replaced_by": sc_data.get("replaced_by"),
            }
            if isinstance(sc_data.get("provenance"), dict):
                sc_overlay["sidecar_provenance"] = sc_data["provenance"]

            # Only override status_color if this file is NOT tracked by ExecutionStore
            # (ExecutionStore pipeline status takes priority for color)
            if rel not in index:
                sc_overlay["status"] = sc_status
                sc_overlay["status_color"] = status_color_map.get(sc_status, "gray")
                sc_overlay["source"] = sc_source

            if rel in index:
                index[rel].update(sc_overlay)
            else:
                index[rel] = sc_overlay

    # R5 B3 fix (Phase 5 path b)—boundary-frame neutral overlay. Walk every
    # renders/ep_*/boundary_frames/ dir and stamp each *_seg*.jpg with the
    # boundary_frame overlay. This runs LAST so it overrides any incorrect
    # upstream stamp (e.g. an ExecutionStore take row that happens to key on
    # a boundary_frames path, or a universal sidecar that picked up a stale
    # model id). The overlay is keyed by the same rel-path the
    # build_project_tree node assembly uses, so frontend reads it for free.
    # v2 layout: boundary frames live at renders/ep_*/boundary_frames/
    # (post-migration output/video/ is removed; renders/ replaces it).
    video_root = project_dir / "renders"
    if video_root.is_dir():
        for ep_dir in video_root.glob("ep_*"):
            bf_dir = ep_dir / "boundary_frames"
            if not bf_dir.is_dir():
                continue
            for seg in bf_dir.glob("*_seg*.jpg"):
                overlay = _boundary_frame_overlay(seg, project_dir)
                if overlay is None:
                    continue
                try:
                    rel = str(seg.relative_to(project_dir))
                except ValueError:
                    continue
                # OVERRIDE semantics: boundary frames are always
                # boundary_frame kind regardless of any prior stamp.
                # Merge so we keep any sidecar_* keys already present
                # (sidecar_status="candidate" from the auto-stub etc.)
                # but force status/status_color/model/kind/role to the
                # neutral overlay values.
                if rel in index:
                    merged = dict(index[rel])
                    merged.update(overlay)
                    index[rel] = merged
                else:
                    index[rel] = overlay

    return index


def scan_output_dir(project: str) -> dict:
    """Scan project media directories, overlay metadata, return tree.

    v2 layout: iterates the four v2 media roots (assets/, sequences/, renders/,
    state/) instead of the deleted v1 output/ tree. Function name preserved for
    API stability; root-level node is still labeled "output" to keep response
    shape byte-stable for tree consumers.
    """
    project_dir = projects_root() / project
    media_roots = ("assets", "sequences", "renders", "state")

    # Empty early-out: all four v2 roots missing/empty.
    if not any((project_dir / r).is_dir() for r in media_roots):
        return {"name": "output", "type": "directory", "children": [], "file_count": 0}

    meta_index = build_metadata_index(project)
    file_count = 0

    def scan(current_path: Path) -> list:
        nonlocal file_count
        children = []
        try:
            items = sorted(
                current_path.iterdir(), key=lambda p: (not p.is_dir(), p.name.lower())
            )
        except PermissionError:
            return children

        for item in items:
            # Skip hidden files, _meta dirs, .DS_Store
            if item.name.startswith(".") or item.name == "_meta":
                continue
            # v2 layout: never surface _history/ (archives, migration tarballs,
            # debug grids). Direct /media/{path} access remains possible.
            # Exact match (R10.3) — startswith would suppress sibling dirs
            # like "_history_backup" that callers may legitimately want to see.
            if item.name == "_history":
                continue
            if item.name == "_canonical":  # legacy v1 — should not exist post-migration
                continue

            if item.is_dir():
                dir_children = scan(item)
                if dir_children:  # only include non-empty dirs
                    children.append(
                        {
                            "name": item.name,
                            "type": "directory",
                            "children": dir_children,
                        }
                    )
            elif item.suffix.lower() in MEDIA_EXTENSIONS:
                file_count += 1
                # Relative path from project root (matches ExecutionStore take paths)
                rel_path = str(item.relative_to(project_dir))

                node = {
                    "name": item.name,
                    "type": "file",
                    "path": rel_path,
                    "media_url": f"/media/{project}/{rel_path}",
                    "status": "untracked",
                    "status_color": "gray",
                }

                # Overlay metadata if available
                if rel_path in meta_index:
                    node.update(meta_index[rel_path])

                children.append(node)

        return children

    # Walk each v2 media root as a top-level child of the synthetic "output"
    # node (response shape byte-stable for tree consumers).
    tree_children: list = []
    for root_name in media_roots:
        root = project_dir / root_name
        if not root.is_dir():
            continue
        root_children = scan(root)
        if root_children:
            tree_children.append(
                {
                    "name": root_name,
                    "type": "directory",
                    "children": root_children,
                }
            )

    return {
        "name": "output",
        "type": "directory",
        "children": tree_children,
        "file_count": file_count,
    }


# ── Grouping strategies ──────────────────────────────────────────


def group_flat_by_stem(
    file_nodes: list[dict],
    episode_name: str,
    project: str = "",
) -> list[dict]:
    """Client-mode flat grouping: every video/image file is a 'deliverable' node.

    Used by projects with ui_grouping_strategy == 'flat' (i.e., client_deliverable).
    The filename stem is the shot_id; no take parsing, no pass-anchor grouping.
    Files in `_orphans/` subdirectories are picked up via rglob and presented
    as ordinary deliverables (not quarantined).
    """
    deliverables: list[dict] = []

    for node in file_nodes:
        name = node.get("name", "")
        suffix = name.rsplit(".", 1)[-1].lower() if "." in name else ""
        if suffix not in {"mp4", "mov", "webm", "png", "jpg", "jpeg", "webp"}:
            continue

        stem = name.rsplit(".", 1)[0] if "." in name else name

        deliverables.append(
            {
                "type": "deliverable",
                "name": name,
                "shot_id": stem,
                "path": node.get("path"),
                "media_url": node.get("media_url"),
                "status": node.get("status", "candidate"),
                "status_color": node.get("status_color", "gray"),
            }
        )

    deliverables.sort(key=lambda d: _natural_sort_key(d["shot_id"]))
    return deliverables


def group_by_pass_anchors(
    file_nodes: list[dict],
    episode_name: str,
    project: str = "",
) -> list[dict]:
    """Build an interleaved episode tree per SYNTHESIS §8.

    Layout:
      1. (quarantine banner prepended by get_tree — Phase 3)
      2. coverage_summary node
      3. recent_activity node (if any passes in last 24h)
      4. Interleaved: pass_anchor nodes at first covered shot, shot nodes,
         awaiting_shot placeholders — all sorted by shot number.
      5. Ungrouped trailing files.
    """
    from recoil.execution.pass_store import PassStore
    from recoil.workspace.coverage import (  # local import to avoid circular at module load
        best_status,
        coverage_summary_for_episode,
        recent_activity_for_episode,
    )

    ep_prefix = episode_name.replace("Episode ", "EP").replace(" ", "")
    # Normalise ep_NNN -> EP001
    ep_num_match = re.match(r"ep_(\d+)", ep_prefix)
    if ep_num_match:
        episode_id = f"EP{int(ep_num_match.group(1)):03d}"
    else:
        episode_id = ep_prefix if ep_prefix.startswith("EP") else f"EP{ep_prefix}"

    # ── Bucket raw files ──
    shot_buckets: dict[str, list[dict]] = defaultdict(list)
    pass_bucket: dict[
        str, dict
    ] = {}  # anchor_key -> {"takes":[], "extracted":{shot_num:[nodes]}}
    ungrouped: list[dict] = []

    store = PassStore(project)
    try:
        passes_by_id = {p["pass_id"]: p for p in store.list_passes(episode_id)}
    finally:
        store.close()

    for node in file_nodes:
        name = node["name"]
        parsed_shot = parse_shot_filename(name)
        if parsed_shot:
            shot_num = parsed_shot["shot_num"]
            if "_FROM_PASS_" in name:
                mm = re.match(
                    r"^shot_(\d{1,4}[a-z]?)_FROM_PASS_(\d{3})_"
                    r"([A-Z][A-Z0-9]*(?:_[A-Z][A-Z0-9]*)*)_take(\d+)",
                    name,
                )
                if mm:
                    shot_num_m, counter, tag, _take_n = mm.groups()
                    anchor_key = f"{counter}:{tag}"
                    pb = pass_bucket.setdefault(
                        anchor_key,
                        {"takes": [], "extracted": defaultdict(list)},
                    )
                    pb["extracted"][shot_num_m].append(node)
            shot_buckets[shot_num].append({"parsed": parsed_shot, "node": node})
        else:
            parsed_pass = parse_pass_filename(name)
            if parsed_pass:
                if parsed_pass.get("strategy") == "solo":
                    shot_num = normalize_shot_num(parsed_pass["shot_ids"][0])
                    if shot_num:
                        shot_buckets[shot_num].append(
                            {"parsed": parsed_pass, "node": node}
                        )
                        continue
                    ungrouped.append(node)
                    continue
                grouping = _grouping_from_sidecar(project, node)
                anchor_key, pass_id, record = _pass_bucket_key(
                    parsed=parsed_pass,
                    node=node,
                    grouping=grouping,
                    passes_by_id=passes_by_id,
                )
                pb = pass_bucket.setdefault(
                    anchor_key,
                    {
                        "takes": [],
                        "extracted": defaultdict(list),
                        "pass_id": pass_id,
                        "record": record,
                        "strategy": (
                            grouping.get("strategy")
                            if isinstance(grouping, dict) and grouping.get("strategy")
                            else parsed_pass.get("strategy")
                        ),
                    },
                )
                if pass_id and not pb.get("pass_id"):
                    pb["pass_id"] = pass_id
                if record and not pb.get("record"):
                    pb["record"] = record
                pb["takes"].append({"node": node, "parsed": parsed_pass})
            else:
                ungrouped.append(node)

    anchor_to_pass_id: dict[str, str] = {}
    for pid in passes_by_id:
        anchor = _pass_anchor_key(pid)
        if anchor:
            anchor_to_pass_id[anchor] = pid

    # Build shot nodes (real takes)
    shot_nodes_by_num: dict[str, dict] = {}
    for shot_num, entries in shot_buckets.items():
        takes: list[dict] = []
        statuses: list[str] = []
        models: list[str] = []
        for entry in sorted(
            entries,
            key=lambda e: (
                0
                if e["parsed"]["take_num"] is None
                else (2 if "_FROM_" in e["node"]["name"] else 1),
                e["parsed"]["take_num"] or 0,
                e["node"]["name"].lower(),
            ),
        ):
            node = entry["node"]
            take_name = node["name"]
            if entry["parsed"]["take_num"] is None:
                take_name = f"keyframe ({node['name']})"
            take_entry = {
                "name": take_name,
                "path": node["path"],
                "media_url": node["media_url"],
            }
            for key in (
                "status",
                "status_color",
                "model",
                "shot_id",
                "cost",
                "take_index",
                "take_id",
                "source",
                "sidecar_status",
                "sidecar_source",
                "sidecar_notes",
                "replaced_by",
            ):
                if key in node:
                    take_entry[key] = node[key]
            takes.append(take_entry)
            if node.get("status"):
                statuses.append(node["status"])
            if node.get("model"):
                models.append(node["model"])
        best = best_status(statuses)
        shot_nodes_by_num[shot_num] = {
            "name": f"SH{shot_num.upper()}",
            "type": "shot",
            "shot_id": f"{episode_id}_SH{shot_num.upper()}",
            "path": None,
            "takes": takes,
            "take_count": len(takes),
            "status": best,
            "status_color": _shot_status_color(best),
            "model": models[0] if models else None,
        }

    def _anchor_shot_num(anchor_key: str) -> str:
        pid = anchor_to_pass_id.get(anchor_key)
        if pid and pid in passes_by_id:
            seg_ids = passes_by_id[pid].get("segment_shot_ids") or []
            if seg_ids:
                sn = normalize_shot_num(seg_ids[0])
                if sn:
                    return sn
        return "\uffff"  # sort to end if unknown

    # Build pass_anchor nodes
    pass_nodes: list[dict] = []
    for anchor_key, bucket in pass_bucket.items():
        pid = bucket.get("pass_id") or anchor_to_pass_id.get(anchor_key) or f"PASS_{anchor_key}"
        record = bucket.get("record") or passes_by_id.get(pid, {})
        extracted_children: list[dict] = []
        for shot_num in sorted(bucket["extracted"].keys()):
            for n in bucket["extracted"][shot_num]:
                extracted_children.append(
                    {
                        "name": n["name"],
                        "type": "file",
                        "path": n.get("path"),
                        "media_url": n.get("media_url"),
                        "shot_num": shot_num,
                        "role": "extracted_segment",
                    }
                )
        take_nodes: list[dict] = []
        for t in sorted(bucket["takes"], key=lambda t: t["parsed"]["take_num"]):
            take_nodes.append(
                {
                    "name": t["node"]["name"],
                    "path": t["node"].get("path"),
                    "media_url": t["node"].get("media_url"),
                    "take_number": t["parsed"]["take_num"],
                }
            )
        if ":" in anchor_key and len(anchor_key.split(":")) == 2:
            counter_part = anchor_key.split(":")[0]
            tag_part = anchor_key.split(":")[1]
            name_prefix = f"PASS_{counter_part}"
        else:
            parts = anchor_key.split(":")
            strategy = (parts[0] if parts else bucket.get("strategy") or "group").upper()
            counter_part = parts[1] if len(parts) > 1 else ""
            tag_part = strategy
            name_prefix = f"{strategy}_{counter_part}" if counter_part else strategy
        status = record.get("status") or (
            "segmentation_review" if record.get("cuts_diverged") else "extracted"
        )
        shot_tokens_display = (
            "_".join(bucket["takes"][0]["parsed"]["shot_tokens"])
            if bucket["takes"]
            else ""
        )
        pass_nodes.append(
            {
                "name": f"{name_prefix} [{shot_tokens_display}] {tag_part}".strip(),
                "type": "pass_anchor",
                "pass_id": pid,
                "anchor_shot_num": _anchor_shot_num(anchor_key),
                "takes": take_nodes,
                "take_count": len(take_nodes),
                "extracted": extracted_children,
                "status": status,
            }
        )

    # Uncovered shot placeholders
    try:
        store_exec = _get_store(project)
        all_shot_ids = [
            s.get("shot_id", "")
            for s in store_exec.get_all_shots()
            if s.get("shot_id", "").startswith(episode_id + "_")
        ]
        store_exec.close()
    except Exception as e:
        # Tenet 6: store failure must not silently empty the uncovered list
        # (which would hide unrendered shots from review). Callers (HTTP
        # routes) surface this as HTTP 503.
        log.exception("tree.uncovered: store failure for %s", project)
        raise ExecutionStoreUnavailableError(
            f"could not read uncovered shots for project={project}: {e}"
        ) from e
    covered_nums: set[str] = set()
    for pid, rec in passes_by_id.items():
        for sid in rec.get("segment_shot_ids") or []:
            sn = normalize_shot_num(sid)
            if sn:
                covered_nums.add(sn)
    covered_nums.update(shot_nodes_by_num.keys())

    awaiting_nodes: list[dict] = []
    for sid in all_shot_ids:
        sn = normalize_shot_num(sid)
        if sn and sn not in covered_nums:
            awaiting_nodes.append(
                {
                    "type": "awaiting_shot",
                    "name": f"SH{sn.upper()} — Awaiting Coverage",
                    "shot_id": sid,
                    "shot_num": sn,
                }
            )

    # Interleave by shot number
    timeline: list[tuple[str, int, dict]] = []
    for pn in pass_nodes:
        timeline.append((pn["anchor_shot_num"], 0, pn))
    for sn, node in shot_nodes_by_num.items():
        timeline.append((sn, 1, node))
    for aw in awaiting_nodes:
        timeline.append((aw["shot_num"], 2, aw))
    timeline.sort(key=lambda t: (t[0], t[1]))

    result: list[dict] = []
    summary = coverage_summary_for_episode(project, episode_id)
    result.append(summary)
    recent = recent_activity_for_episode(project, episode_id)
    if recent:
        result.append(recent)
    for _, _, node in timeline:
        result.append(node)
    result.extend(sorted(ungrouped, key=lambda f: f["name"].lower()))
    return result


def group_episode_files_by_shot(
    file_nodes: list[dict],
    episode_name: str,
    project: str = "",
) -> list[dict]:
    """Dispatch grouping by project's ui_grouping_strategy capability.

    Microdrama -> group_by_pass_anchors (existing pass+shot taxonomy).
    Client_deliverable -> group_flat_by_stem (flat list).

    Signature preserved for backward compatibility with all existing callers.
    """
    # Empty project arg: legacy callers that don't know the project name.
    # Default to microdrama behavior (the existing default).
    if not project:
        return group_by_pass_anchors(file_nodes, episode_name, project)

    strategy = get_project(project).ui_grouping_strategy
    grouper = _GROUPERS.get(strategy)
    if grouper is None:
        # Defensive fallback: unknown strategy -> microdrama path
        return group_by_pass_anchors(file_nodes, episode_name, project)
    return grouper(file_nodes, episode_name, project)


# ── Project tree builder ─────────────────────────────────────────


def build_project_tree(project: str) -> dict:
    """Build the categorized project tree.

    Extracted from the body of `get_tree` route (server.py L2216-2432) — the
    parts that compute the tree. Side-effects on caches stay in the route.

    Args:
        project: project slug (already validated by caller).

    Returns:
        dict with keys 'project', 'tree' (with name/type/children/file_count),
        and 'file_count'. Same shape `get_tree` returned pre-Phase-B (frozen
        contract — verified byte-stable in Phase 6).

    Side effects: NONE in this function. The route body in server.py:
      - sweeps orphans (`_sweep_orphans`)
      - extracts passes (`_maybe_extract_passes`)
      - auto-stubs sidecars (`ws_sidecar.auto_stub_missing`)
      - manages the tree cache (`_tree_cache`, `_TREE_CACHE_TTL`)
      - prepends quarantine banners (`_collect_quarantine_node`)
    These all stay in `server.py::get_tree` and call `build_project_tree` to
    compute the body of the response.
    """
    project_dir = projects_root() / project

    # v2 layout: iterate v2 media roots (assets/, sequences/, renders/, state/)
    # instead of the deleted v1 output/ tree. Mirrors the build_metadata_index
    # + scan_output_dir + mcp_server.py policy installed in R6.
    _V2_MEDIA_ROOTS = ("assets", "sequences", "renders", "state")
    media_roots = [
        project_dir / r for r in _V2_MEDIA_ROOTS if (project_dir / r).is_dir()
    ]
    if not media_roots:
        return {"project": project, "tree": {"children": []}, "file_count": 0}

    meta_index = build_metadata_index(project)

    categories = {
        "Episodes": defaultdict(list),
        "Characters": defaultdict(list),
        "Locations": defaultdict(list),
        "Props": defaultdict(list),
        "Exploration": defaultdict(list),
        "Uncategorized": defaultdict(list),
        "Archive": defaultdict(list),
    }
    file_count = 0

    # v2 → display-category mapping for assets/ subkinds.
    # Drives the parts[0] == "assets" branch below.
    _ASSET_KIND_TO_CATEGORY = {
        "identity": "Characters",
        "turn": "Characters",
        "expr": "Characters",
        "loc": "Locations",
        "prop": "Props",
        "scene": "Exploration",
    }

    for root in media_roots:
        for path in root.rglob("*"):
            if not path.is_file() or path.name.startswith("."):
                continue
            if path.name == "_meta":
                continue
            if path.suffix.lower() not in MEDIA_EXTENSIONS:
                continue
            # parts relative to project_dir so the first segment is the v2 root
            # name (assets / sequences / renders / state). Categorization
            # dispatches on parts[0].
            parts = path.relative_to(project_dir).parts
            if any("backup" in p.lower() for p in parts):
                continue
            # v2 layout: never walk _history/ (archives, migration tarballs,
            # debug snapshots). Exact match per R10.3 — startswith("_history")
            # false-positived "_history_backup" style dirs.
            if any(p == "_history" for p in parts):
                continue

            file_count += 1
            rel_path = str(path.relative_to(project_dir))

            node = {
                "name": path.name,
                "type": "file",
                "path": rel_path,
                "media_url": f"/media/{project}/{rel_path}",
                "status": "untracked",
                "status_color": "gray",
            }
            if rel_path in meta_index:
                node.update(meta_index[rel_path])

            # Route to logical bucket via the first v2 root segment.
            root_name = parts[0]

            # Episodes — sequences/ep_NNN/ (previs+frames) + renders/ep_NNN/ (video)
            if root_name in ("sequences", "renders") and len(parts) > 1:
                categories["Episodes"][_clean_episode_name(parts[1])].append(node)

            # Assets — dispatched by asset kind (identity/turn/expr/loc/prop/scene)
            elif root_name == "assets" and len(parts) > 1:
                kind = parts[1]
                cat = _ASSET_KIND_TO_CATEGORY.get(kind)
                if cat is None:
                    # Unknown asset kind — treat as exploration so it's still
                    # visible in the tree rather than silently dropped.
                    entity = kind.replace("_", " ").title()
                    categories["Exploration"][entity].append(node)
                elif cat == "Characters":
                    if len(parts) > 2:
                        entity = parts[2].replace("_", " ").title()
                        if entity.strip() and entity != "Test":
                            categories["Characters"][entity].append(node)
                elif cat == "Locations":
                    if len(parts) > 2:
                        categories["Locations"][
                            _clean_location_name(parts[2])
                        ].append(node)
                elif cat == "Props":
                    if len(parts) > 2:
                        entity = parts[2].replace("_", " ").title()
                        categories["Props"][entity].append(node)
                elif cat == "Exploration":
                    if len(parts) > 2:
                        entity = parts[2].replace("_", " ").title()
                        categories["Exploration"][entity].append(node)
                    else:
                        categories["Exploration"]["Root"].append(node)

            # State — visual/manifests/bundles. Mostly JSON (filtered out by
            # MEDIA_EXTENSIONS already), but media that lands here (rare)
            # routes to Uncategorized for visibility.
            elif root_name == "state":
                if len(parts) > 1:
                    entity = parts[1].replace("_", " ").title()
                    categories["Uncategorized"][entity].append(node)
                else:
                    categories["Uncategorized"]["Root"].append(node)

            # Catch-all (shouldn't happen given the media_roots iteration —
            # but defends against future v2 root additions).
            else:
                if len(parts) == 1:
                    categories["Uncategorized"]["Root"].append(node)
                else:
                    entity = parts[0].replace("_", " ").title()
                    categories["Uncategorized"][entity].append(node)

    # Build the tree structure
    tree_children = []
    display_order = [
        "Episodes",
        "Characters",
        "Locations",
        "Props",
        "Exploration",
        "Uncategorized",
        "Archive",
    ]
    for cat_name in display_order:
        entities = categories[cat_name]
        if not entities:
            continue
        entity_children = []
        for entity_name in sorted(entities.keys()):
            files = entities[entity_name]
            if cat_name == "Episodes":
                # Group episode files by shot ID
                children = group_episode_files_by_shot(files, entity_name, project)
            else:
                children = sorted(files, key=lambda f: f["name"].lower())
            entity_children.append(
                {
                    "name": entity_name,
                    "type": "directory",
                    "children": children,
                }
            )
        tree_children.append(
            {
                "name": cat_name,
                "type": "directory",
                "children": entity_children,
                "collapsed_default": cat_name in ("Archive", "Uncategorized"),
            }
        )

    return {
        "project": project,
        "tree": {
            "name": "output",
            "type": "directory",
            "children": tree_children,
            "file_count": file_count,
        },
        "file_count": file_count,
    }


# Strategy registration — populated at module load (mirrors server.py L1934-1935 placement).
_GROUPERS["pass_anchors"] = group_by_pass_anchors
_GROUPERS["flat"] = group_flat_by_stem


__all__ = [
    "PASS_PATTERN",
    "SHOT_PATTERN",
    "MEDIA_EXTENSIONS",
    "parse_pass_filename",
    "parse_shot_filename",
    "normalize_shot_num",
    "group_by_pass_anchors",
    "group_flat_by_stem",
    "group_episode_files_by_shot",
    "build_metadata_index",
    "scan_output_dir",
    "build_project_tree",
]
