# BUILD SPEC — Universal Sidecar System

**Source:** `consultations/recoil/universal-sidecar-system/SYNTHESIS.md`
**Target:** `recoil/workspace/` + `recoil/execution/step_runner.py`
**Detail Level:** MAXIMUM (overnight unattended harness build)

---

## Conventions

- **RECOIL_ROOT** = `/Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil`
- **PROJECTS_ROOT** = `/Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/projects`
- All file paths in this spec are relative to RECOIL_ROOT unless noted
- "Sidecar" always means `{filename}.json` adjacent to the media file (NOT in `_meta/`)
- The existing `pipeline/lib/sidecar.py` writes to `_meta/{filename}.json` — that system continues unchanged for `_canonical/` refs. The new universal sidecar module (`workspace/sidecar.py`) writes to `{filename}.json` for all non-canonical files.

---

## Phase 1 — Core Sidecar Module (`workspace/sidecar.py`)

**Goal:** Create `workspace/sidecar.py` — the universal sidecar read/write module.

**Creates:** `workspace/sidecar.py` (NEW FILE — output complete)

### File: `workspace/sidecar.py`

```python
#!/usr/bin/env python3
"""Universal sidecar read/write module for the Recoil Workspace.

Every media file in output/ gets a companion .json sidecar containing its
provenance, status, and lineage. This module handles all sidecar CRUD.

Sidecar naming: {filename}.json — e.g., hero.jpg → hero.jpg.json
This is DIFFERENT from pipeline/lib/sidecar.py which writes to _meta/{filename}.json.
The _meta pattern is used for _canonical/ refs only. This module handles everything else.

Atomic writes: tempfile + os.replace (same pattern as workspace/state.py).
"""

import json
import os
import shutil
import tempfile
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

# ── Constants ─────────────────────────────────────────────────

SCHEMA_VERSION = 1

VALID_STATUSES = frozenset({
    "candidate",
    "front-runner",
    "canonical",
    "demoted",
    "archived",
})

MEDIA_EXTENSIONS = frozenset({
    ".png", ".jpg", ".jpeg", ".webp",
    ".mp4", ".mov", ".webm",
})


# ── Helpers ───────────────────────────────────────────────────

def _now_iso() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


def _sidecar_path(media_path: Path) -> Path:
    """Return the sidecar path for a media file: same dir, {name}.json."""
    return media_path.parent / f"{media_path.name}.json"


def _is_sidecar_file(path: Path) -> bool:
    """Check if a path is itself a sidecar JSON file (to avoid recursion)."""
    if path.suffix != ".json":
        return False
    # A sidecar is {mediafile}.json — check if stripping .json yields a media ext
    stem_path = Path(path.stem)
    return stem_path.suffix.lower() in MEDIA_EXTENSIONS


def _atomic_write_json(path: Path, data: dict) -> None:
    """Write JSON atomically using tempfile + os.replace."""
    path.parent.mkdir(parents=True, exist_ok=True)
    fd, tmp = tempfile.mkstemp(
        dir=str(path.parent),
        prefix=".sidecar_",
        suffix=".json",
    )
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2, default=str)
        os.replace(tmp, str(path))
    except Exception:
        try:
            os.unlink(tmp)
        except OSError:
            pass
        raise


# ── Core Read/Write ───────────────────────────────────────────

def read_sidecar(media_path: Path) -> Optional[dict]:
    """Read the sidecar JSON for a media file.

    Returns None if no sidecar exists or if the JSON is corrupt.
    """
    sc = _sidecar_path(media_path)
    if not sc.is_file():
        return None
    try:
        return json.loads(sc.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, IOError):
        return None


def write_sidecar(media_path: Path, data: dict) -> Path:
    """Write/update sidecar JSON for a media file. Atomic write.

    Always sets updated_at and ensures schema_version is present.
    Returns the sidecar file path.
    """
    data["schema_version"] = SCHEMA_VERSION
    data["updated_at"] = _now_iso()
    sc = _sidecar_path(media_path)
    _atomic_write_json(sc, data)
    return sc


def ensure_sidecar(media_path: Path) -> dict:
    """Read existing sidecar or create a stub for manual drops.

    Returns the sidecar data (either existing or newly created).
    """
    existing = read_sidecar(media_path)
    if existing is not None:
        return existing
    return create_stub_sidecar(media_path)


def create_stub_sidecar(media_path: Path) -> dict:
    """Create a minimal sidecar for a manually-dropped file.

    Returns the created sidecar data.
    """
    data = {
        "schema_version": SCHEMA_VERSION,
        "source": "manual_drop",
        "status": "candidate",
        "created_at": _now_iso(),
        "updated_at": _now_iso(),
        "provenance": {},
        "lineage": {},
        "notes": "",
        "tags": [],
    }
    write_sidecar(media_path, data)
    return data


# ── Status Management ─────────────────────────────────────────

def set_status(media_path: Path, status: str, **extra) -> dict:
    """Update the status field in a sidecar. Returns updated sidecar.

    Extra kwargs are merged into the sidecar dict (e.g., notes, promoted_to).
    Raises ValueError if status is not in VALID_STATUSES.
    """
    if status not in VALID_STATUSES:
        raise ValueError(
            f"Invalid status '{status}'. Must be one of: {', '.join(sorted(VALID_STATUSES))}"
        )
    data = ensure_sidecar(media_path)
    data["status"] = status
    data.update(extra)
    write_sidecar(media_path, data)
    return data


def get_status(media_path: Path) -> Optional[str]:
    """Get the status from a file's sidecar. Returns None if no sidecar."""
    data = read_sidecar(media_path)
    if data is None:
        return None
    return data.get("status")


# ── Canonical Promotion ───────────────────────────────────────

def promote_to_canonical(
    media_path: Path,
    asset_type: str,
    entity_id: str,
    project_dir: Path,
) -> dict:
    """Copy file + sidecar to _canonical/, update casting_state.json, set status.

    Steps:
    1. Copy media file to _canonical/{type}/{entity}/hero.{ext}
    2. Copy sidecar to _canonical/{type}/{entity}/_meta/hero.{ext}.json
       (uses the _meta convention for canonical compatibility)
    3. Update casting_state.json hero_path
    4. Set promoted_to on original file's sidecar
    5. Set status to "canonical" on original

    Returns the updated sidecar data for the original file.
    """
    type_folder_map = {
        "characters": "characters",
        "character": "characters",
        "locations": "locations",
        "location": "locations",
        "props": "props",
        "prop": "props",
    }
    folder = type_folder_map.get(asset_type)
    if folder is None:
        raise ValueError(f"Unknown asset_type: {asset_type}")

    output_dir = project_dir / "output"
    canonical_dir = output_dir / "refs" / "_canonical" / folder / entity_id
    canonical_dir.mkdir(parents=True, exist_ok=True)

    # 1. Copy media file
    ext = media_path.suffix
    hero_dest = canonical_dir / f"hero{ext}"
    shutil.copy2(str(media_path), str(hero_dest))

    # 2. Copy sidecar to _meta/ (canonical format)
    meta_dir = canonical_dir / "_meta"
    meta_dir.mkdir(parents=True, exist_ok=True)
    source_sidecar = read_sidecar(media_path)
    if source_sidecar is None:
        source_sidecar = create_stub_sidecar(media_path)

    # Write canonical sidecar in _meta/ format
    canonical_sidecar_data = dict(source_sidecar)
    canonical_sidecar_data["status"] = "canonical"
    canonical_sidecar_data["promoted_from"] = str(media_path)
    canonical_sidecar_path = meta_dir / f"hero{ext}.json"
    _atomic_write_json(canonical_sidecar_path, canonical_sidecar_data)

    # 3. Update casting_state.json
    # Check both legacy (state/starsend/) and flat (state/) locations
    casting_state_path = project_dir / "state" / "starsend" / "casting_state.json"
    if not casting_state_path.is_file():
        alt_path = project_dir / "state" / "casting_state.json"
        if alt_path.is_file():
            casting_state_path = alt_path
    if casting_state_path.is_file():
        try:
            casting = json.loads(casting_state_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, IOError):
            casting = {}
    else:
        casting_state_path.parent.mkdir(parents=True, exist_ok=True)
        casting = {}

    # Normalize asset_type for casting_state keys
    cast_type = folder  # "characters", "locations", "props"
    if cast_type not in casting:
        casting[cast_type] = {}
    if entity_id not in casting[cast_type]:
        casting[cast_type][entity_id] = {}

    hero_rel = str(hero_dest.relative_to(project_dir))
    casting[cast_type][entity_id]["hero_path"] = hero_rel
    casting[cast_type][entity_id]["updated_at"] = _now_iso()
    _atomic_write_json(casting_state_path, casting)

    # 4-5. Update original sidecar
    promoted_to_rel = str(hero_dest.relative_to(project_dir))
    data = set_status(media_path, "canonical", promoted_to=promoted_to_rel)
    return data


# ── Archive / Restore ─────────────────────────────────────────

def archive_with_sidecar(media_path: Path, project_dir: Path) -> Path:
    """Move file + sidecar to _archive/, preserving relative path.

    Returns the archive destination path for the media file.
    """
    output_dir = project_dir / "output"
    archive_dir = output_dir / "_archive"

    # Compute relative path from project dir
    try:
        rel = media_path.relative_to(project_dir)
    except ValueError:
        # Fallback: use just the filename
        rel = Path(media_path.name)

    archive_dest = archive_dir / rel
    archive_dest.parent.mkdir(parents=True, exist_ok=True)

    # Update sidecar status before moving
    data = ensure_sidecar(media_path)
    data["status"] = "archived"
    data["archived_at"] = _now_iso()
    data["archived_from"] = str(rel)
    write_sidecar(media_path, data)

    # Move media file
    shutil.move(str(media_path), str(archive_dest))

    # Move sidecar
    sc_source = _sidecar_path(media_path)
    if sc_source.is_file():
        sc_dest = _sidecar_path(archive_dest)
        sc_dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(sc_source), str(sc_dest))

    return archive_dest


def restore_from_archive(archive_path: Path, project_dir: Path) -> Path:
    """Move file + sidecar back from _archive/ to original location.

    Uses the archived_from field in the sidecar to determine the original path.
    Falls back to stripping the _archive prefix if no archived_from field.
    Returns the restored path.
    """
    # Read sidecar for original location
    data = read_sidecar(archive_path)
    if data and data.get("archived_from"):
        original_rel = data["archived_from"]
        restore_dest = project_dir / original_rel
    else:
        # Fallback: strip _archive/ prefix
        output_dir = project_dir / "output"
        archive_dir = output_dir / "_archive"
        try:
            rel = archive_path.relative_to(archive_dir)
        except ValueError:
            raise ValueError(f"Cannot determine restore path for {archive_path}")
        restore_dest = project_dir / rel

    restore_dest.parent.mkdir(parents=True, exist_ok=True)

    # Update sidecar status before moving
    if data:
        data["status"] = "candidate"
        data.pop("archived_at", None)
        data.pop("archived_from", None)
        write_sidecar(archive_path, data)

    # Move media file
    shutil.move(str(archive_path), str(restore_dest))

    # Move sidecar
    sc_source = _sidecar_path(archive_path)
    if sc_source.is_file():
        sc_dest = _sidecar_path(restore_dest)
        sc_dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(sc_source), str(sc_dest))

    return restore_dest


# ── Scanner ───────────────────────────────────────────────────

def scan_for_missing_sidecars(output_dir: Path) -> list[Path]:
    """Find media files without sidecars in an output directory.

    Skips _meta/ directories and sidecar JSON files themselves.
    Returns list of media file paths that lack a companion sidecar.
    """
    missing = []
    if not output_dir.is_dir():
        return missing

    for path in output_dir.rglob("*"):
        if not path.is_file():
            continue
        # Skip hidden files, _meta dirs, backup dirs
        parts = path.relative_to(output_dir).parts
        if any(p.startswith(".") or p == "_meta" or "backup" in p.lower() for p in parts):
            continue
        # Skip non-media files
        if path.suffix.lower() not in MEDIA_EXTENSIONS:
            continue
        # Skip files that are sidecar JSONs themselves
        if _is_sidecar_file(path):
            continue
        # Check if sidecar exists
        if not _sidecar_path(path).is_file():
            missing.append(path)

    return missing


def auto_stub_missing(output_dir: Path) -> int:
    """Create stub sidecars for all media files missing them.

    Returns count of stubs created.
    """
    missing = scan_for_missing_sidecars(output_dir)
    for path in missing:
        create_stub_sidecar(path)
    return len(missing)


# ── Pipeline Sidecar Writer ───────────────────────────────────

def write_pipeline_sidecar(
    media_path: Path,
    *,
    model: str,
    prompt: str,
    prompt_layers: Optional[dict] = None,
    refs_used: Optional[list[dict]] = None,
    cost: float = 0.0,
    gate_results: Optional[dict] = None,
    seed: Optional[int] = None,
    generation_params: Optional[dict] = None,
    inputs_snapshot: Optional[dict] = None,
    shot_id: Optional[str] = None,
    pipeline: str = "unknown",
) -> dict:
    """Write a full provenance sidecar for a pipeline-generated file.

    Called by StepRunner after generation completion.
    Returns the sidecar data.
    """
    data = {
        "schema_version": SCHEMA_VERSION,
        "source": "pipeline",
        "status": "candidate",
        "created_at": _now_iso(),
        "updated_at": _now_iso(),
        "provenance": {
            "model": model,
            "prompt": prompt,
            "prompt_layers": prompt_layers or {},
            "refs_used": refs_used or [],
            "cost": cost,
            "gate_results": gate_results or {},
            "seed": seed,
            "generation_params": generation_params or {},
            "pipeline": pipeline,
            "shot_id": shot_id,
        },
        "lineage": {},
        "notes": "",
        "tags": [],
    }

    # Extract useful metadata from inputs_snapshot if available
    if inputs_snapshot:
        # Preserve the full snapshot hash for traceability
        snapshot_hash = inputs_snapshot.get("inputs_snapshot_hash")
        if snapshot_hash:
            data["provenance"]["inputs_snapshot_hash"] = snapshot_hash

        # Extract character refs used
        characters = inputs_snapshot.get("characters", [])
        if characters and not refs_used:
            data["provenance"]["refs_used"] = [
                {
                    "role": "character",
                    "id": c.get("char_id", "unknown"),
                    "display_name": c.get("display_name", ""),
                }
                for c in characters
            ]

        # Extract location info
        location = inputs_snapshot.get("location_id")
        if location:
            data["provenance"]["location_id"] = location

    write_sidecar(media_path, data)
    return data
```

### Validation Gate: Phase 1

```
GATE_P1:
  Run: python3 -c "
import sys; sys.path.insert(0, '$RECOIL_ROOT')
from workspace.sidecar import (
    read_sidecar, write_sidecar, ensure_sidecar, create_stub_sidecar,
    set_status, get_status, promote_to_canonical, archive_with_sidecar,
    restore_from_archive, scan_for_missing_sidecars, auto_stub_missing,
    write_pipeline_sidecar, SCHEMA_VERSION, VALID_STATUSES, MEDIA_EXTENSIONS,
)
print('Phase 1 PASS: all exports verified')
"
  Expect: "Phase 1 PASS"
```

---

## Phase 2 — Server Endpoints

**Goal:** Add sidecar CRUD, promote, demote, and restore endpoints to `workspace/server.py`.

**Modifies:** `workspace/server.py`

### Step 2a — Add sidecar import

**Insert AFTER line 58** (after the `from workspace.helpers import ...` block):

```python
from workspace import sidecar as ws_sidecar
```

The existing import block on lines 54-59 looks like:

```python
from workspace import state as ws_state
from workspace.helpers import (
    get_store as _get_store,
    get_ops_log_path as _get_ops_log_path,
    shot_status_color as _shot_status_color,
)
```

After the edit, it becomes:

```python
from workspace import state as ws_state
from workspace.helpers import (
    get_store as _get_store,
    get_ops_log_path as _get_ops_log_path,
    shot_status_color as _shot_status_color,
)
from workspace import sidecar as ws_sidecar
```

### Step 2b — Add sidecar read/write endpoint

**Insert AFTER the `/api/archive/{project}` endpoint** (after line 539, before the `/api/shot/{project}/{shot_id}` route). The insertion point is after:

```python
    return JSONResponse({
        "archived": rel_path,
        "destination": str(archive_dest.relative_to(project_dir)),
    })
```

**New code to insert:**

```python


# ── Routes: Sidecar CRUD ─────────────────────────────────────

@app.post("/api/sidecar/{project}")
async def sidecar_crud(project: str, request: Request):
    """Read or write a sidecar for a media file.

    Body:
        {"path": "output/refs/characters/sadie/hero_v4.jpg"}
        — Returns the sidecar data (GET-style read via POST)

        {"path": "...", "data": {"notes": "Best framing", "tags": ["hero-candidate"]}}
        — Merges data into existing sidecar (write)
    """
    body = await request.json()
    rel_path = body.get("path", "")
    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = PROJECTS_ROOT / project
    media_path = project_dir / rel_path

    # Security check
    try:
        media_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not media_path.is_file():
        return JSONResponse({"error": f"Not found: {rel_path}"}, status_code=404)

    write_data = body.get("data")
    if write_data:
        # Merge write: read existing, overlay new fields, write back
        existing = ws_sidecar.ensure_sidecar(media_path)
        existing.update(write_data)
        ws_sidecar.write_sidecar(media_path, existing)
        return JSONResponse({"path": rel_path, "sidecar": existing})
    else:
        # Read only
        data = ws_sidecar.read_sidecar(media_path)
        if data is None:
            return JSONResponse({"path": rel_path, "sidecar": None})
        return JSONResponse({"path": rel_path, "sidecar": data})


@app.post("/api/promote/{project}")
async def promote_item(project: str, request: Request):
    """Promote a file's sidecar status.

    Body:
        {"path": "output/refs/characters/sadie/hero_v4.jpg", "status": "front-runner"}
        — Sets status to front-runner

        {"path": "...", "status": "canonical", "asset_type": "characters", "entity_id": "sadie"}
        — Full canonical promotion (copies to _canonical/, updates casting_state.json)
    """
    body = await request.json()
    rel_path = body.get("path", "")
    status = body.get("status", "front-runner")

    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = PROJECTS_ROOT / project
    media_path = project_dir / rel_path

    # Security check
    try:
        media_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not media_path.is_file():
        return JSONResponse({"error": f"Not found: {rel_path}"}, status_code=404)

    if status == "canonical":
        # Full canonical promotion
        asset_type = body.get("asset_type", "")
        entity_id = body.get("entity_id", "")
        if not asset_type or not entity_id:
            return JSONResponse(
                {"error": "asset_type and entity_id required for canonical promotion"},
                status_code=400,
            )
        try:
            data = ws_sidecar.promote_to_canonical(
                media_path, asset_type, entity_id, project_dir
            )
        except Exception as e:
            return JSONResponse({"error": str(e)}, status_code=500)

        # Invalidate tree cache
        _tree_cache.pop(project, None)

        return JSONResponse({
            "path": rel_path,
            "status": "canonical",
            "promoted_to": data.get("promoted_to"),
            "sidecar": data,
        })
    else:
        # Simple status promotion (front-runner)
        try:
            data = ws_sidecar.set_status(media_path, status)
        except ValueError as e:
            return JSONResponse({"error": str(e)}, status_code=400)

        # Invalidate tree cache
        _tree_cache.pop(project, None)

        return JSONResponse({
            "path": rel_path,
            "status": status,
            "sidecar": data,
        })


@app.post("/api/demote/{project}")
async def demote_item(project: str, request: Request):
    """Set a file's status to demoted.

    Body: {"path": "output/refs/characters/sadie/hero_v4.jpg"}
    """
    body = await request.json()
    rel_path = body.get("path", "")
    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = PROJECTS_ROOT / project
    media_path = project_dir / rel_path

    # Security check
    try:
        media_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not media_path.is_file():
        return JSONResponse({"error": f"Not found: {rel_path}"}, status_code=404)

    data = ws_sidecar.set_status(media_path, "demoted")

    # Invalidate tree cache
    _tree_cache.pop(project, None)

    return JSONResponse({
        "path": rel_path,
        "status": "demoted",
        "sidecar": data,
    })


@app.post("/api/restore/{project}")
async def restore_item(project: str, request: Request):
    """Restore a file from _archive/ back to its original location.

    Body: {"path": "output/_archive/output/refs/characters/sadie/hero_v4.jpg"}
    """
    body = await request.json()
    rel_path = body.get("path", "")
    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = PROJECTS_ROOT / project
    archive_path = project_dir / rel_path

    # Security check
    try:
        archive_path.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not archive_path.is_file():
        return JSONResponse({"error": f"Not found: {rel_path}"}, status_code=404)

    try:
        restored = ws_sidecar.restore_from_archive(archive_path, project_dir)
    except Exception as e:
        return JSONResponse({"error": str(e)}, status_code=500)

    # Invalidate tree cache
    _tree_cache.pop(project, None)

    return JSONResponse({
        "original_path": rel_path,
        "restored_to": str(restored.relative_to(project_dir)),
    })
```

### Validation Gate: Phase 2

```
GATE_P2:
  Run: python3 -c "
import sys; sys.path.insert(0, '$RECOIL_ROOT')
from workspace.server import app
routes = [r.path for r in app.routes if hasattr(r, 'path')]
required = ['/api/sidecar/{project}', '/api/promote/{project}', '/api/demote/{project}', '/api/restore/{project}']
for r in required:
    assert r in routes, f'Missing route: {r}'
print('Phase 2 PASS: all endpoints registered')
"
  Expect: "Phase 2 PASS"
```

---

## Phase 3 — VAT Update (Sidecar-Aware Tree Scan)

**Goal:** Update the VAT scanner to read sidecars during tree scan, attach sidecar status/source/notes to file nodes, and auto-stub missing sidecars.

**Modifies:** `workspace/server.py`

### Step 3a — Update `_build_metadata_index` to read universal sidecars

The existing function at lines 191-244 reads ExecutionStore takes and canonical ref sidecars. We need to add a third source: universal sidecars from `{file}.json`.

**Replace the entire `_build_metadata_index` function** (lines 191-244):

Find this exact string to replace:

```python
def _build_metadata_index(project: str) -> dict[str, dict]:
    """Build a path-keyed metadata lookup from ExecutionStore + sidecars."""
    index = {}
    project_dir = PROJECTS_ROOT / project

    # Index ExecutionStore takes (keyed by relative file_path)
    try:
        store = _get_store(project)
        for shot in store.get_all_shots():
            shot_id = shot.get("shot_id", "")
            status = shot.get("status", "previs_pending")
            model = shot.get("model")
            for i, take in enumerate(shot.get("takes", [])):
                fp = take.get("file_path", "")
                if fp:
                    index[fp] = {
                        "shot_id": shot_id,
                        "status": status,
                        "status_color": _shot_status_color(status),
                        "model": model,
                        "cost": take.get("cost", 0),
                        "take_index": i,
                        "take_id": take.get("take_id", f"T{i+1}"),
                        "source": "pipeline",
                    }
        store.close()
    except Exception as e:
        log.warning("Could not read ExecutionStore for metadata index: %s", e)

    # Index canonical ref sidecars
    refs_dir = project_dir / "output" / "refs" / "_canonical"
    if refs_dir.is_dir():
        for meta_file in refs_dir.rglob("_meta/*.json"):
            try:
                sidecar = json.loads(meta_file.read_text())
                ref_file = sidecar.get("file", {})
                ref_path = ref_file.get("path", "")
                if ref_path:
                    # Make relative to project dir
                    try:
                        rel = str(Path(ref_path).relative_to(project_dir))
                    except ValueError:
                        rel = ref_path
                    index[rel] = {
                        "source": "canonical_ref",
                        "status": "hero",
                        "status_color": "blue",
                        "ref_type": sidecar.get("context", {}).get("ref_type", "unknown"),
                        "model": sidecar.get("model"),
                    }
            except Exception:
                continue

    return index
```

Replace with:

```python
def _build_metadata_index(project: str) -> dict[str, dict]:
    """Build a path-keyed metadata lookup from ExecutionStore + sidecars.

    Priority (highest to lowest):
    1. Universal sidecar ({file}.json) — status, source, notes
    2. ExecutionStore takes — shot_id, pipeline status, take info
    3. Canonical ref sidecars (_meta/{file}.json) — canonical ref metadata
    """
    index = {}
    project_dir = PROJECTS_ROOT / project

    # Index ExecutionStore takes (keyed by relative file_path)
    try:
        store = _get_store(project)
        for shot in store.get_all_shots():
            shot_id = shot.get("shot_id", "")
            status = shot.get("status", "previs_pending")
            model = shot.get("model")
            for i, take in enumerate(shot.get("takes", [])):
                fp = take.get("file_path", "")
                if fp:
                    index[fp] = {
                        "shot_id": shot_id,
                        "status": status,
                        "status_color": _shot_status_color(status),
                        "model": model,
                        "cost": take.get("cost", 0),
                        "take_index": i,
                        "take_id": take.get("take_id", f"T{i+1}"),
                        "source": "pipeline",
                    }
        store.close()
    except Exception as e:
        log.warning("Could not read ExecutionStore for metadata index: %s", e)

    # Index canonical ref sidecars
    refs_dir = project_dir / "output" / "refs" / "_canonical"
    if refs_dir.is_dir():
        for meta_file in refs_dir.rglob("_meta/*.json"):
            try:
                sidecar = json.loads(meta_file.read_text())
                ref_file = sidecar.get("file", {})
                ref_path = ref_file.get("path", "")
                if ref_path:
                    # Make relative to project dir
                    try:
                        rel = str(Path(ref_path).relative_to(project_dir))
                    except ValueError:
                        rel = ref_path
                    index[rel] = {
                        "source": "canonical_ref",
                        "status": "hero",
                        "status_color": "blue",
                        "ref_type": sidecar.get("context", {}).get("ref_type", "unknown"),
                        "model": sidecar.get("model"),
                    }
            except Exception:
                continue

    # Index universal sidecars ({file}.json) — overlay on top of existing entries
    output_dir = project_dir / "output"
    if output_dir.is_dir():
        for sc_path in output_dir.rglob("*.json"):
            # Skip _meta/ sidecars (handled above), hidden files, non-sidecar JSON
            if "_meta" in sc_path.parts:
                continue
            if sc_path.name.startswith("."):
                continue
            # A universal sidecar is {mediafile}.json — the stem must have a media extension
            stem_as_path = Path(sc_path.stem)
            if stem_as_path.suffix.lower() not in ws_sidecar.MEDIA_EXTENSIONS:
                continue
            # The media file this sidecar belongs to
            media_path = sc_path.parent / sc_path.stem
            if not media_path.is_file():
                continue

            try:
                sc_data = json.loads(sc_path.read_text(encoding="utf-8"))
            except (json.JSONDecodeError, IOError):
                continue

            try:
                rel = str(media_path.relative_to(project_dir))
            except ValueError:
                continue

            sc_status = sc_data.get("status", "candidate")
            sc_source = sc_data.get("source", "unknown")

            # Map sidecar status to display color
            status_color_map = {
                "candidate": "gray",
                "front-runner": "amber",
                "canonical": "blue",
                "demoted": "gray",
                "archived": "gray",
            }

            # Build sidecar overlay — merge with existing entry if present
            sc_overlay = {
                "sidecar_status": sc_status,
                "sidecar_source": sc_source,
                "sidecar_notes": sc_data.get("notes", ""),
            }

            # Only override status_color if this file is NOT tracked by ExecutionStore
            # (ExecutionStore pipeline status takes priority for color)
            if rel not in index:
                sc_overlay["status"] = sc_status
                sc_overlay["status_color"] = status_color_map.get(sc_status, "gray")
                sc_overlay["source"] = sc_source

            if rel in index:
                index[rel].update(sc_overlay)
            else:
                index[rel] = sc_overlay

    return index
```

### Step 3b — Auto-stub missing sidecars during tree scan

**In the `get_tree` endpoint**, add auto-stubbing after the tree is built. Find this exact block in the `get_tree` function (around line 497):

```python
    resp = {
        "project": project,
        "tree": {
            "name": "output", "type": "directory",
            "children": tree_children, "file_count": file_count,
        },
        "file_count": file_count,
    }
    _tree_cache[project] = resp
    _tree_cache_mtime[project] = _time.monotonic()
    return JSONResponse(resp)
```

Replace with:

```python
    # Auto-stub missing sidecars (cheap operation — only creates files that don't exist)
    try:
        stubbed = ws_sidecar.auto_stub_missing(output_dir)
        if stubbed > 0:
            log.info("Auto-stubbed %d missing sidecars for project %s", stubbed, project)
    except Exception as e:
        log.warning("Auto-stub failed for project %s: %s", project, e)

    resp = {
        "project": project,
        "tree": {
            "name": "output", "type": "directory",
            "children": tree_children, "file_count": file_count,
        },
        "file_count": file_count,
    }
    _tree_cache[project] = resp
    _tree_cache_mtime[project] = _time.monotonic()
    return JSONResponse(resp)
```

### Validation Gate: Phase 3

```
GATE_P3:
  Run: python3 -c "
import sys; sys.path.insert(0, '$RECOIL_ROOT')
import inspect
from workspace.server import _build_metadata_index, get_tree
src = inspect.getsource(_build_metadata_index)
assert 'sidecar_status' in src, 'Missing sidecar_status in metadata index'
assert 'universal sidecars' in src.lower() or 'Index universal sidecars' in src, 'Missing universal sidecar indexing'
tree_src = inspect.getsource(get_tree)
assert 'auto_stub_missing' in tree_src, 'Missing auto-stub in get_tree'
print('Phase 3 PASS: VAT reads sidecars and auto-stubs')
"
  Expect: "Phase 3 PASS"
```

---

## Phase 4 — Archive Update (Sidecar Travels with File)

**Goal:** Update the existing `/api/archive/{project}` endpoint to also move the sidecar alongside the media file. Add restore support.

**Modifies:** `workspace/server.py`

### Step 4a — Update archive endpoint to move sidecars

**Replace the entire `archive_item` function** (lines 505-539). Find this exact string:

```python
@app.post("/api/archive/{project}")
async def archive_item(project: str, request: Request):
    """Move a file or folder to _archive/, preserving relative path."""
    body = await request.json()
    rel_path = body.get("path", "")
    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = PROJECTS_ROOT / project
    source = project_dir / rel_path
    archive_dest = project_dir / "output" / "_archive" / rel_path

    # Security check
    try:
        source.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not source.exists():
        return JSONResponse(
            {"error": f"Not found: {rel_path}"}, status_code=404
        )

    # Create archive directory and move
    import shutil
    archive_dest.parent.mkdir(parents=True, exist_ok=True)
    shutil.move(str(source), str(archive_dest))

    # Invalidate tree cache
    _tree_cache.pop(project, None)

    return JSONResponse({
        "archived": rel_path,
        "destination": str(archive_dest.relative_to(project_dir)),
    })
```

Replace with:

```python
@app.post("/api/archive/{project}")
async def archive_item(project: str, request: Request):
    """Move a file or folder to _archive/, preserving relative path.

    Also moves the companion sidecar JSON if it exists.
    """
    body = await request.json()
    rel_path = body.get("path", "")
    if not rel_path:
        return JSONResponse({"error": "path required"}, status_code=400)

    project_dir = PROJECTS_ROOT / project
    source = project_dir / rel_path

    # Security check
    try:
        source.resolve().relative_to(project_dir.resolve())
    except ValueError:
        return JSONResponse({"error": "Access denied"}, status_code=403)

    if not source.exists():
        return JSONResponse(
            {"error": f"Not found: {rel_path}"}, status_code=404
        )

    if source.is_file():
        # Use sidecar-aware archive for individual files
        try:
            archive_dest = ws_sidecar.archive_with_sidecar(source, project_dir)
        except Exception as e:
            return JSONResponse({"error": str(e)}, status_code=500)
    else:
        # Directory archive — move the whole directory, then update sidecars within
        import shutil
        archive_dest = project_dir / "output" / "_archive" / rel_path
        archive_dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(source), str(archive_dest))

        # Recursively update all sidecar statuses inside the moved directory
        for sc_file in archive_dest.rglob("*.json"):
            if not ws_sidecar._is_sidecar_file(sc_file):
                continue
            try:
                sc_data = json.loads(sc_file.read_text(encoding="utf-8"))
                sc_data["status"] = "archived"
                sc_data["archived_at"] = ws_sidecar._now_iso()
                ws_sidecar._atomic_write_json(sc_file, sc_data)
            except Exception:
                continue  # Best effort — don't fail the archive operation

    # Invalidate tree cache
    _tree_cache.pop(project, None)

    return JSONResponse({
        "archived": rel_path,
        "destination": str(archive_dest.relative_to(project_dir)),
    })
```

### Validation Gate: Phase 4

```
GATE_P4:
  Run: python3 -c "
import sys; sys.path.insert(0, '$RECOIL_ROOT')
import inspect
from workspace.server import archive_item
src = inspect.getsource(archive_item)
assert 'archive_with_sidecar' in src, 'Archive endpoint does not use sidecar-aware archive'
assert 'ws_sidecar' in src, 'Archive endpoint does not import sidecar module'
assert '_is_sidecar_file' in src, 'Archive endpoint missing recursive sidecar status update for directories'
print('Phase 4 PASS: archive moves sidecars')
"
  Expect: "Phase 4 PASS"
```

---

## Phase 5 — MCP Tools

**Goal:** Add `get_file_provenance` tool and enhance `prime_project` with sidecar status counts.

**Modifies:** `workspace/mcp_server.py`

### Step 5a — Add sidecar import

**Insert AFTER line 29** (after `from workspace import session_log`):

```python
from workspace import sidecar as ws_sidecar
```

### Step 5b — Add `get_file_provenance` tool

**Insert AFTER the `tool_get_activity` function** (after line 1043, before the JSON-RPC protocol handler section). The insertion point is after the closing of tool_get_activity and before the `# JSON-RPC 2.0 PROTOCOL HANDLER` section header.

```python


# ── Tool 13: get_file_provenance ──────────────────────────────

@_register_tool(
    name="get_file_provenance",
    description=(
        "Read the universal sidecar for any media file. Returns full provenance "
        "(model, prompt, refs, cost, gates), status, lineage, and notes. "
        "Works for both pipeline-generated and manually-dropped files."
    ),
    input_schema={
        "type": "object",
        "properties": {
            "path": {
                "type": "string",
                "description": (
                    "File path. Can be absolute or relative to project root "
                    "(e.g. 'output/refs/characters/sadie/hero_v4.jpg')."
                ),
            },
        },
        "required": ["path"],
    },
)
def tool_get_file_provenance(params: dict) -> dict:
    project = ws_state.get_project()
    if not project:
        return {"error": "No project active. Call prime_project first."}

    path = params["path"]

    # Resolve relative paths against project root
    if not path.startswith("/"):
        abs_path = PROJECTS_ROOT / project / path
    else:
        abs_path = Path(path)

    if not abs_path.is_file():
        return {"error": f"File not found: {abs_path}"}

    # Read the universal sidecar
    data = ws_sidecar.read_sidecar(abs_path)
    if data is None:
        # Try the _meta/ sidecar (for canonical refs)
        meta_sidecar = abs_path.parent / "_meta" / f"{abs_path.name}.json"
        if meta_sidecar.is_file():
            try:
                import json as _json
                data = _json.loads(meta_sidecar.read_text(encoding="utf-8"))
                data["_sidecar_format"] = "canonical_meta"
            except Exception:
                pass

    if data is None:
        return {
            "path": path,
            "sidecar": None,
            "note": "No sidecar found. File is untracked.",
        }

    return {
        "path": path,
        "sidecar": data,
        "status": data.get("status"),
        "source": data.get("source"),
        "provenance": data.get("provenance", {}),
        "lineage": data.get("lineage", {}),
        "notes": data.get("notes", ""),
        "tags": data.get("tags", []),
    }
```

### Step 5c — Enhance `prime_project` with sidecar status counts

**In the `tool_prime_project` function**, add sidecar counts after the canonical refs lookup. Find this exact block (around line 170):

```python
    # Get canonical refs (paths only, no images)
    refs = _get_canonical_refs(project)

    # Get last 10 ops log entries
```

Replace with:

```python
    # Get canonical refs (paths only, no images)
    refs = _get_canonical_refs(project)

    # Get sidecar status counts
    sidecar_counts = {"candidate": 0, "front-runner": 0, "canonical": 0, "demoted": 0, "archived": 0, "no_sidecar": 0}
    output_dir = project_dir / "output"
    if output_dir.is_dir():
        from workspace.sidecar import MEDIA_EXTENSIONS as _SC_MEDIA_EXT
        for media_file in output_dir.rglob("*"):
            if not media_file.is_file():
                continue
            if media_file.suffix.lower() not in _SC_MEDIA_EXT:
                continue
            parts = media_file.relative_to(output_dir).parts
            if any(p.startswith(".") or p == "_meta" for p in parts):
                continue
            sc = ws_sidecar.read_sidecar(media_file)
            if sc is None:
                sidecar_counts["no_sidecar"] += 1
            else:
                st = sc.get("status", "candidate")
                if st in sidecar_counts:
                    sidecar_counts[st] += 1

    # Get last 10 ops log entries
```

Then, in the result dict construction, find:

```python
    result = {
        "project": project,
        "session_status": session_status,
        "health": health,
        "conflicts": conflicts,
        "summary": {
            "total_shots": summary["total_shots"],
            "total_cost": summary["total_cost"],
            "by_status": summary["by_status"],
        },
        "episodes": episodes,
        "pending_review": pending_review,
        "characters": refs["characters"],
        "locations": refs["locations"],
        "recent_ops": recent_ops,
    }
```

Replace with:

```python
    result = {
        "project": project,
        "session_status": session_status,
        "health": health,
        "conflicts": conflicts,
        "summary": {
            "total_shots": summary["total_shots"],
            "total_cost": summary["total_cost"],
            "by_status": summary["by_status"],
        },
        "episodes": episodes,
        "pending_review": pending_review,
        "characters": refs["characters"],
        "locations": refs["locations"],
        "sidecar_counts": sidecar_counts,
        "recent_ops": recent_ops,
    }
```

### Step 5d — Enhance `get_shot_detail` with sidecar data

**In the `tool_get_shot_detail` function**, add sidecar data for takes. Find this exact block in the result construction (around line 274):

```python
    result = {
        "shot_id": shot_id,
        "project": project,
        "episode_id": shot.get("episode_id", ""),
        "status": shot.get("status", "previs_pending"),
        "status_color": _shot_status_color(shot.get("status", "previs_pending")),
        "pipeline": shot.get("pipeline"),
        "model": shot.get("model"),
        "cost_incurred": shot.get("cost_incurred", 0),
        "retry_waste_cost": shot.get("retry_waste_cost", 0),
        "attempts": shot.get("attempts", 0),
        "max_attempts": shot.get("max_attempts", 3),
        "gate_results": shot.get("gate_results", {}),
        "output_path": shot.get("output_path"),
        "error_message": shot.get("error_message"),
        "takes": takes,
        "take_count": len(takes),
        "is_coverage": shot.get("is_coverage", False),
        "coverage_of": shot.get("coverage_of"),
        "updated_at": shot.get("updated_at"),
    }
```

**Insert BEFORE this block** (after the Dropbox conflicts check and before result construction):

```python
    # Attach sidecar data to takes that have file paths
    for take in takes:
        fp = take.get("file_path", "")
        if fp:
            abs_fp = PROJECTS_ROOT / project / fp if not fp.startswith("/") else Path(fp)
            sc = ws_sidecar.read_sidecar(abs_fp)
            if sc:
                take["sidecar_status"] = sc.get("status")
                take["sidecar_notes"] = sc.get("notes", "")
                take["sidecar_source"] = sc.get("source", "")

```

### Validation Gate: Phase 5

```
GATE_P5:
  Run: python3 -c "
import sys; sys.path.insert(0, '$RECOIL_ROOT')
from workspace.mcp_server import _TOOLS
assert 'get_file_provenance' in _TOOLS, 'Missing get_file_provenance tool'
assert len(_TOOLS) >= 13, f'Expected 13+ tools, got {len(_TOOLS)}'
print('Phase 5 PASS: MCP tools registered ({} total)'.format(len(_TOOLS)))
"
  Expect: "Phase 5 PASS"
```

---

## Phase 6 — Frontend

**Goal:** Expand context menu (Promote/Demote/Front-runner), add status icons in navigator, make inspector read sidecar data.

**Modifies:** `workspace/static/workspace.js`, `workspace/static/workspace.css`

### Step 6a — CSS additions for sidecar status indicators

**Append to the end of `workspace/static/workspace.css`** (after line 709):

```css

/* ── Sidecar Status Icons ── */
.status-icon {
  font-size: 10px;
  margin-right: 2px;
  flex-shrink: 0;
}

.status-icon.front-runner {
  color: var(--accent-amber);
}

.status-icon.canonical {
  color: var(--accent-blue);
}

.status-icon.demoted {
  color: var(--text-dim);
}

/* Dimmed appearance for demoted files */
.shot-item.demoted {
  opacity: 0.45;
}

.shot-item.demoted .shot-id {
  text-decoration: line-through;
  color: var(--text-dim);
}

/* Front-runner highlight */
.shot-item.front-runner .shot-id {
  color: var(--accent-amber);
  font-weight: 600;
}

/* Canonical lock */
.shot-item.canonical .shot-id {
  color: var(--accent-blue);
}

/* ── Sidecar Inspector Sections ── */
.inspector-provenance-grid {
  display: grid;
  grid-template-columns: 90px 1fr;
  gap: 2px 4px;
  font-size: 11px;
}

.inspector-provenance-key {
  color: var(--text-secondary);
  font-family: var(--font-mono);
  font-size: 10px;
  text-align: right;
  padding-right: 6px;
}

.inspector-provenance-value {
  color: var(--text-primary);
  word-break: break-all;
  font-family: var(--font-mono);
  font-size: 10px;
}

.inspector-prompt-text {
  font-family: var(--font-mono);
  font-size: 10px;
  color: var(--text-secondary);
  background: var(--bg-overlay);
  padding: 6px 8px;
  border-radius: 3px;
  white-space: pre-wrap;
  word-break: break-word;
  max-height: 120px;
  overflow-y: auto;
  margin-top: 4px;
}

.inspector-tags {
  display: flex;
  flex-wrap: wrap;
  gap: 4px;
  margin-top: 4px;
}

.inspector-tag {
  font-family: var(--font-mono);
  font-size: 9px;
  background: var(--bg-overlay);
  color: var(--text-secondary);
  padding: 1px 6px;
  border-radius: 2px;
  border: 1px solid var(--border-dim);
}

.inspector-notes {
  font-family: var(--font-sans);
  font-size: 11px;
  color: var(--text-secondary);
  background: var(--bg-overlay);
  padding: 6px 8px;
  border-radius: 3px;
  margin-top: 4px;
  font-style: italic;
}

/* Context menu status indicators */
.context-menu-item .status-badge {
  font-family: var(--font-mono);
  font-size: 9px;
  padding: 0 4px;
  border-radius: 2px;
  margin-left: auto;
}

.context-menu-item .status-badge.current {
  background: var(--accent-blue);
  color: var(--bg-base);
}

/* ── Canonical Promotion Dialog ── */
.promo-dialog-overlay {
  position: fixed;
  inset: 0;
  background: rgba(0, 0, 0, 0.7);
  z-index: 2000;
  display: flex;
  align-items: center;
  justify-content: center;
}

.promo-dialog {
  background: var(--bg-raised);
  border: 1px solid var(--border-default);
  border-radius: 6px;
  padding: 20px;
  min-width: 320px;
  max-width: 440px;
  box-shadow: 0 8px 32px rgba(0, 0, 0, 0.5);
}

.promo-dialog h3 {
  font-family: var(--font-mono);
  font-size: 12px;
  font-weight: 700;
  letter-spacing: 1px;
  color: var(--text-bright);
  text-transform: uppercase;
  margin-bottom: 12px;
}

.promo-dialog label {
  display: block;
  font-family: var(--font-mono);
  font-size: 11px;
  color: var(--text-secondary);
  margin-bottom: 4px;
  margin-top: 8px;
}

.promo-dialog select,
.promo-dialog input {
  width: 100%;
  padding: 6px 8px;
  background: var(--bg-overlay);
  color: var(--text-primary);
  border: 1px solid var(--border-default);
  border-radius: 3px;
  font-family: var(--font-mono);
  font-size: 11px;
}

.promo-dialog select:focus,
.promo-dialog input:focus {
  border-color: var(--accent-blue);
  outline: none;
}

.promo-dialog-buttons {
  display: flex;
  justify-content: flex-end;
  gap: 8px;
  margin-top: 16px;
}

.promo-dialog-buttons button {
  padding: 6px 16px;
  border-radius: 3px;
  font-family: var(--font-mono);
  font-size: 11px;
  cursor: pointer;
  border: 1px solid var(--border-default);
}

.promo-dialog-buttons .btn-cancel {
  background: transparent;
  color: var(--text-secondary);
}

.promo-dialog-buttons .btn-cancel:hover {
  background: var(--bg-overlay);
  color: var(--text-primary);
}

.promo-dialog-buttons .btn-confirm {
  background: var(--accent-blue);
  color: var(--bg-base);
  border-color: var(--accent-blue);
  font-weight: 600;
}

.promo-dialog-buttons .btn-confirm:hover {
  opacity: 0.9;
}
```

### Step 6b — JavaScript: Expanded context menu

**Replace the `showContextMenu` function** in `workspace/static/workspace.js`. Find this exact string:

```javascript
function showContextMenu(event, filePath) {
  event.preventDefault();
  event.stopPropagation();

  // Remove any existing menu
  hideContextMenu();

  const menu = document.createElement('div');
  menu.className = 'context-menu';
  menu.id = 'context-menu';

  menu.innerHTML = ''
    + '<div class="context-menu-item" onclick="archiveItem(\'' + escapeAttr(filePath) + '\')">'
    + '\u{1F5C4} Archive</div>'
    + '<div class="context-menu-divider"></div>'
    + '<div class="context-menu-item" onclick="copyPath(\'' + escapeAttr(filePath) + '\')">'
    + '\u{1F4CB} Copy Path</div>';

  menu.style.left = event.clientX + 'px';
  menu.style.top = event.clientY + 'px';
  document.body.appendChild(menu);

  // Close on click elsewhere
  setTimeout(() => {
    document.addEventListener('click', hideContextMenu, { once: true });
  }, 0);
}
```

Replace with:

```javascript
function showContextMenu(event, filePath) {
  event.preventDefault();
  event.stopPropagation();

  // Remove any existing menu
  hideContextMenu();

  // Look up current sidecar status for this file
  const fileNode = _findFileNode(filePath);
  const currentStatus = (fileNode && fileNode.sidecar_status) || 'candidate';
  const isArchived = filePath.includes('_archive');

  const menu = document.createElement('div');
  menu.className = 'context-menu';
  menu.id = 'context-menu';

  let items = '';

  if (isArchived) {
    // Archive context: only Restore and Copy Path
    items += '<div class="context-menu-item" onclick="restoreItem(\'' + escapeAttr(filePath) + '\')">'
      + '\u21A9 Restore from Archive</div>';
  } else {
    // Status actions
    if (currentStatus !== 'front-runner') {
      items += '<div class="context-menu-item" onclick="setFileStatus(\'' + escapeAttr(filePath) + '\', \'front-runner\')">'
        + '\u2605 Set Front-runner</div>';
    }
    items += '<div class="context-menu-item" onclick="showCanonicalDialog(\'' + escapeAttr(filePath) + '\')">'
      + '\u2191 Promote to Canonical</div>';
    if (currentStatus !== 'demoted') {
      items += '<div class="context-menu-item" onclick="setFileStatus(\'' + escapeAttr(filePath) + '\', \'demoted\')">'
        + '\u2193 Demote</div>';
    }
    items += '<div class="context-menu-divider"></div>';
    items += '<div class="context-menu-item destructive" onclick="archiveItem(\'' + escapeAttr(filePath) + '\')">'
      + '\u{1F5C4} Archive</div>';
  }

  items += '<div class="context-menu-divider"></div>';
  items += '<div class="context-menu-item" onclick="copyPath(\'' + escapeAttr(filePath) + '\')">'
    + '\u{1F4CB} Copy Path</div>';

  menu.innerHTML = items;
  menu.style.left = event.clientX + 'px';
  menu.style.top = event.clientY + 'px';
  document.body.appendChild(menu);

  // Close on click elsewhere
  setTimeout(() => {
    document.addEventListener('click', hideContextMenu, { once: true });
  }, 0);
}
```

### Step 6c — JavaScript: Helper to find file node in tree

**Insert AFTER the `hideContextMenu` function** (after line 382 in the original file):

```javascript

function _findFileNode(filePath) {
  // Recursively search the tree for a file node by path
  if (!WS.tree || !WS.tree.children) return null;
  function search(nodes) {
    for (const node of nodes) {
      if (node.type === 'file' && node.path === filePath) return node;
      if (node.children) {
        const found = search(node.children);
        if (found) return found;
      }
    }
    return null;
  }
  return search(WS.tree.children);
}
```

### Step 6d — JavaScript: Status action functions

**Insert AFTER the `copyPath` function** (after line 399):

```javascript

async function setFileStatus(filePath, status) {
  hideContextMenu();
  if (!WS.project || !filePath) return;

  let endpoint = '/api/promote/' + WS.project;
  if (status === 'demoted') {
    endpoint = '/api/demote/' + WS.project;
  }

  const body = { path: filePath, status: status };
  const resp = await apiPost(endpoint, body);
  if (resp && !resp.error) {
    // Force tree refresh
    WS.treeHash = null;
    await pollShots();
    // Refresh inspector if this file is selected
    if (WS.selection.length === 1 && WS.selection[0] === filePath) {
      await loadSidecarInspector(filePath);
    }
  }
}

async function restoreItem(filePath) {
  hideContextMenu();
  if (!WS.project || !filePath) return;

  const resp = await apiPost('/api/restore/' + WS.project, { path: filePath });
  if (resp && resp.restored_to) {
    // Force tree refresh
    WS.treeHash = null;
    await pollShots();
  }
}

function showCanonicalDialog(filePath) {
  hideContextMenu();

  // Create overlay
  const overlay = document.createElement('div');
  overlay.className = 'promo-dialog-overlay';
  overlay.id = 'promo-dialog-overlay';
  overlay.onclick = function(e) { if (e.target === overlay) overlay.remove(); };

  overlay.innerHTML = ''
    + '<div class="promo-dialog">'
    + '<h3>Promote to Canonical</h3>'
    + '<label>Asset Type</label>'
    + '<select id="promo-asset-type">'
    + '<option value="characters">Character</option>'
    + '<option value="locations">Location</option>'
    + '<option value="props">Prop</option>'
    + '</select>'
    + '<label>Entity ID (e.g., sadie, apartment)</label>'
    + '<input type="text" id="promo-entity-id" placeholder="entity_id">'
    + '<div class="promo-dialog-buttons">'
    + '<button class="btn-cancel" onclick="document.getElementById(\'promo-dialog-overlay\').remove()">Cancel</button>'
    + '<button class="btn-confirm" onclick="confirmCanonicalPromotion(\'' + escapeAttr(filePath) + '\')">Promote</button>'
    + '</div>'
    + '</div>';

  document.body.appendChild(overlay);

  // Focus the entity ID input
  setTimeout(() => {
    const input = document.getElementById('promo-entity-id');
    if (input) input.focus();
  }, 50);
}

async function confirmCanonicalPromotion(filePath) {
  const assetType = document.getElementById('promo-asset-type').value;
  const entityId = document.getElementById('promo-entity-id').value.trim();

  if (!entityId) {
    alert('Entity ID is required');
    return;
  }

  // Remove dialog
  const overlay = document.getElementById('promo-dialog-overlay');
  if (overlay) overlay.remove();

  const resp = await apiPost('/api/promote/' + WS.project, {
    path: filePath,
    status: 'canonical',
    asset_type: assetType,
    entity_id: entityId,
  });

  if (resp && !resp.error) {
    WS.treeHash = null;
    await pollShots();
    if (WS.selection.length === 1 && WS.selection[0] === filePath) {
      await loadSidecarInspector(filePath);
    }
  } else if (resp && resp.error) {
    alert('Promotion failed: ' + resp.error);
  }
}
```

### Step 6e — JavaScript: Status icons in navigator file rendering

**Replace the file rendering block** in the `renderTreeNode` function. Find this exact string:

```javascript
    } else {
      const isSelected = WS.selection.includes(node.path);
      let cls = 'shot-item';
      if (isSelected) cls += ' selected';

      const statusColor = node.status_color || 'gray';
      let label = node.shot_id || node.name;
      label = label.replace(/\.(png|jpg|jpeg|webp|mp4|mov)$/i, '');
      label = label.replace(/^[A-Z_-]+EP\d+_/i, '');

      html += '<div class="' + cls + '" ';
      html += 'data-file-path="' + escapeAttr(node.path) + '" ';
      html += 'data-media-url="' + escapeAttr(node.media_url || '') + '" ';
      if (node.shot_id) html += 'data-shot-id="' + escapeAttr(node.shot_id) + '" ';
      html += 'onclick="selectFile(\'' + escapeAttr(node.path) + '\', this, event)" ';
      html += 'oncontextmenu="showContextMenu(event, \'' + escapeAttr(node.path) + '\')">';
      html += '<span class="status-dot ' + statusColor + '"></span>';
      html += '<span class="shot-id">' + escapeHtml(label) + '</span>';
      html += '</div>';
    }
```

Replace with:

```javascript
    } else {
      const isSelected = WS.selection.includes(node.path);
      const sidecarStatus = node.sidecar_status || '';
      let cls = 'shot-item';
      if (isSelected) cls += ' selected';
      if (sidecarStatus === 'demoted') cls += ' demoted';
      if (sidecarStatus === 'front-runner') cls += ' front-runner';
      if (sidecarStatus === 'canonical') cls += ' canonical';

      const statusColor = node.status_color || 'gray';
      let label = node.shot_id || node.name;
      label = label.replace(/\.(png|jpg|jpeg|webp|mp4|mov)$/i, '');
      label = label.replace(/^[A-Z_-]+EP\d+_/i, '');

      // Sidecar status icon
      let statusIcon = '';
      if (sidecarStatus === 'front-runner') {
        statusIcon = '<span class="status-icon front-runner">\u2605</span>';
      } else if (sidecarStatus === 'canonical') {
        statusIcon = '<span class="status-icon canonical">\u{1F512}</span>';
      }

      html += '<div class="' + cls + '" ';
      html += 'data-file-path="' + escapeAttr(node.path) + '" ';
      html += 'data-media-url="' + escapeAttr(node.media_url || '') + '" ';
      if (node.shot_id) html += 'data-shot-id="' + escapeAttr(node.shot_id) + '" ';
      html += 'onclick="selectFile(\'' + escapeAttr(node.path) + '\', this, event)" ';
      html += 'oncontextmenu="showContextMenu(event, \'' + escapeAttr(node.path) + '\')">';
      html += '<span class="status-dot ' + statusColor + '"></span>';
      html += statusIcon;
      html += '<span class="shot-id">' + escapeHtml(label) + '</span>';
      html += '</div>';
    }
```

### Step 6f — JavaScript: Sidecar-aware inspector

**Insert AFTER the `loadShotDetail` function** (after line 554):

```javascript

async function loadSidecarInspector(filePath) {
  // Load sidecar data for a file that may not have a shot_id
  if (!WS.project || !filePath) return;

  const resp = await apiPost('/api/sidecar/' + WS.project, { path: filePath });
  if (!resp) return;

  const inspector = document.getElementById('inspector');
  const sidecar = resp.sidecar;

  if (!sidecar) {
    // No sidecar — show basic file info
    inspector.innerHTML = '<div class="inspector-section"><div class="inspector-section-header">FILE</div>'
      + '<div class="inspector-section-body">'
      + inspectorRow('Path', filePath)
      + inspectorRow('Status', 'untracked')
      + '</div></div>';
    return;
  }

  let html = '';

  // ── Status Section ──
  html += '<div class="inspector-section">';
  html += '<div class="inspector-section-header" onclick="toggleInspectorSection(this)">';
  html += '<span class="inspector-chevron">\u25BC</span> STATUS';
  html += '</div>';
  html += '<div class="inspector-section-body">';
  html += inspectorRow('Path', filePath);

  const statusColorMap = {
    'candidate': 'gray', 'front-runner': 'amber', 'canonical': 'blue',
    'demoted': 'gray', 'archived': 'gray',
  };
  const sc = sidecar.status || 'unknown';
  const scColor = statusColorMap[sc] || 'gray';
  html += inspectorRow('Status', '<span class="inspector-value status-' + scColor + '">' + escapeHtml(sc) + '</span>', true);
  html += inspectorRow('Source', sidecar.source || 'unknown');
  html += inspectorRow('Created', sidecar.created_at || '');
  html += inspectorRow('Updated', sidecar.updated_at || '');

  if (sidecar.promoted_to) {
    html += inspectorRow('Promoted To', sidecar.promoted_to);
  }
  html += '</div></div>';

  // ── Provenance Section ──
  const prov = sidecar.provenance;
  if (prov && Object.keys(prov).length > 0) {
    html += '<div class="inspector-section">';
    html += '<div class="inspector-section-header" onclick="toggleInspectorSection(this)">';
    html += '<span class="inspector-chevron">\u25BC</span> PROVENANCE';
    html += '</div>';
    html += '<div class="inspector-section-body">';

    if (prov.model) html += inspectorRow('Model', prov.model);
    if (prov.pipeline) html += inspectorRow('Pipeline', prov.pipeline);
    if (prov.shot_id) html += inspectorRow('Shot', prov.shot_id);
    if (prov.cost != null) html += inspectorRow('Cost', '$' + Number(prov.cost).toFixed(4));

    if (prov.prompt) {
      html += '<div style="margin-top:4px"><span class="inspector-key" style="display:block;text-align:left;margin-bottom:2px">Prompt</span>';
      html += '<div class="inspector-prompt-text">' + escapeHtml(prov.prompt) + '</div></div>';
    }

    // Refs used
    const refs = prov.refs_used;
    if (refs && refs.length > 0) {
      html += '<div style="margin-top:6px"><span class="inspector-key" style="display:block;text-align:left;margin-bottom:2px">Refs Used</span>';
      for (const ref of refs) {
        const refLabel = (ref.role || '') + ': ' + (ref.id || ref.display_name || ref.path || '?');
        html += '<div style="font-family:var(--font-mono);font-size:10px;color:var(--text-secondary);padding:1px 0">' + escapeHtml(refLabel) + '</div>';
      }
      html += '</div>';
    }

    // Gate results
    const gates = prov.gate_results;
    if (gates && Object.keys(gates).length > 0) {
      html += '<div style="margin-top:6px"><span class="inspector-key" style="display:block;text-align:left;margin-bottom:2px">Gate Results</span>';
      for (const [key, val] of Object.entries(gates)) {
        const gateColor = val === 'pass' ? 'var(--accent-green)' : (val === 'fail' ? 'var(--accent-red)' : 'var(--text-secondary)');
        html += '<div style="font-family:var(--font-mono);font-size:10px;color:' + gateColor + ';padding:1px 0">' + escapeHtml(key) + ': ' + escapeHtml(String(val)) + '</div>';
      }
      html += '</div>';
    }

    if (prov.generation_params && Object.keys(prov.generation_params).length > 0) {
      html += '<div style="margin-top:6px"><span class="inspector-key" style="display:block;text-align:left;margin-bottom:2px">Params</span>';
      for (const [key, val] of Object.entries(prov.generation_params)) {
        html += '<div style="font-family:var(--font-mono);font-size:10px;color:var(--text-secondary);padding:1px 0">' + escapeHtml(key) + ': ' + escapeHtml(String(val)) + '</div>';
      }
      html += '</div>';
    }

    html += '</div></div>';
  }

  // ── Lineage Section ──
  const lineage = sidecar.lineage;
  if (lineage && Object.keys(lineage).length > 0) {
    html += '<div class="inspector-section">';
    html += '<div class="inspector-section-header" onclick="toggleInspectorSection(this)">';
    html += '<span class="inspector-chevron collapsed">\u25BC</span> LINEAGE';
    html += '</div>';
    html += '<div class="inspector-section-body collapsed">';
    if (lineage.derived_from) html += inspectorRow('Derived From', lineage.derived_from);
    if (lineage.method) html += inspectorRow('Method', lineage.method);
    if (lineage.parent_hash) html += inspectorRow('Parent Hash', lineage.parent_hash);
    html += '</div></div>';
  }

  // ── Notes Section ──
  if (sidecar.notes) {
    html += '<div class="inspector-section">';
    html += '<div class="inspector-section-header" onclick="toggleInspectorSection(this)">';
    html += '<span class="inspector-chevron">\u25BC</span> NOTES';
    html += '</div>';
    html += '<div class="inspector-section-body">';
    html += '<div class="inspector-notes">' + escapeHtml(sidecar.notes) + '</div>';
    html += '</div></div>';
  }

  // ── Tags Section ──
  const tags = sidecar.tags;
  if (tags && tags.length > 0) {
    html += '<div class="inspector-section">';
    html += '<div class="inspector-section-header" onclick="toggleInspectorSection(this)">';
    html += '<span class="inspector-chevron">\u25BC</span> TAGS';
    html += '</div>';
    html += '<div class="inspector-section-body">';
    html += '<div class="inspector-tags">';
    for (const tag of tags) {
      html += '<span class="inspector-tag">' + escapeHtml(tag) + '</span>';
    }
    html += '</div>';
    html += '</div></div>';
  }

  inspector.innerHTML = html;
}
```

### Step 6g — JavaScript: Update `selectFile` to use sidecar inspector

**In the `selectFile` function**, update the fallback inspector display (for files without shot IDs). Find this exact block:

```javascript
    } else {
      // No shot state — show basic info in inspector
      WS.shotDetail = null;
      const inspector = document.getElementById('inspector');
      inspector.innerHTML = '<div class="inspector-section"><div class="inspector-section-header">FILE</div>'
        + '<div class="inspector-row"><span class="inspector-key">Path</span><span class="inspector-value">' + escapeHtml(selectedPath) + '</span></div>'
        + '<div class="inspector-row"><span class="inspector-key">Status</span><span class="inspector-value">untracked</span></div>'
        + '</div>';
    }
```

Replace with:

```javascript
    } else {
      // No shot state — load sidecar inspector
      WS.shotDetail = null;
      await loadSidecarInspector(selectedPath);
    }
```

### Validation Gate: Phase 6

```
GATE_P6:
  Run: |
    # Check JS has the new functions
    grep -c 'function showCanonicalDialog' $RECOIL_ROOT/workspace/static/workspace.js
    grep -c 'function loadSidecarInspector' $RECOIL_ROOT/workspace/static/workspace.js
    grep -c 'function setFileStatus' $RECOIL_ROOT/workspace/static/workspace.js
    grep -c 'function restoreItem' $RECOIL_ROOT/workspace/static/workspace.js
    grep -c '_findFileNode' $RECOIL_ROOT/workspace/static/workspace.js
    # Check CSS has sidecar styles
    grep -c 'status-icon' $RECOIL_ROOT/workspace/static/workspace.css
    grep -c 'promo-dialog' $RECOIL_ROOT/workspace/static/workspace.css
    grep -c 'inspector-provenance' $RECOIL_ROOT/workspace/static/workspace.css
  Expect: All counts >= 1
```

---

## Phase 7 — Pipeline Integration (StepRunner Sidecar Writes)

**Goal:** StepRunner writes a sidecar with full provenance after each successful generation.

**Modifies:** `execution/step_runner.py`

### Step 7b — Add sidecar write to execute_video success path

In `execute_video`, find the successful completion block (around line 671 in the original file). Find this exact sequence:

```python
            logger.info("Shot %s: video complete → %s ($%.3f)", shot_id, rel_path, cost)

            return StepResult(
                shot_id=shot_id,
                success=True,
                final_state="video_complete",
                output_path=rel_path,
                cost_usd=cost,
                error=None,
                take_index=take_idx,
                gate_verdict=gate_verdict,
                model=model,
                pipeline="video",
            )
```

Replace with:

```python
            logger.info("Shot %s: video complete → %s ($%.3f)", shot_id, rel_path, cost)

            # Write universal sidecar with full provenance
            try:
                from workspace.sidecar import write_pipeline_sidecar
                write_pipeline_sidecar(
                    video_path,
                    model=model,
                    prompt=prompt,
                    refs_used=[],
                    cost=cost,
                    gate_results={
                        g.gate_name: ("pass" if g.passed else "fail")
                        for g in [gate_verdict] if g
                    } if gate_verdict else {},
                    generation_params={
                        "duration": duration,
                        "aspect_ratio": aspect_ratio,
                        "mode": video_mode,
                    },
                    inputs_snapshot=inputs_snapshot,
                    shot_id=shot_id,
                    pipeline="video",
                )
            except Exception as e:
                logger.warning("Sidecar write failed for %s: %s", shot_id, e)

            return StepResult(
                shot_id=shot_id,
                success=True,
                final_state="video_complete",
                output_path=rel_path,
                cost_usd=cost,
                error=None,
                take_index=take_idx,
                gate_verdict=gate_verdict,
                model=model,
                pipeline="video",
            )
```

### Step 7c — Add sidecar write to execute_keyframe success path

In `execute_keyframe`, find the successful completion block (around line 1385 in the original file). Find this exact sequence:

```python
                logger.info("Shot %s: keyframe generated → %s ($%.3f, %d attempts)",
                           shot_id, rel_path, total_cost, attempts)

                return StepResult(
                    shot_id=shot_id,
                    success=True,
                    final_state="keyframe_generated",
                    output_path=rel_path,
                    cost_usd=total_cost,
                    error=None,
                    take_index=take_idx,
                    gate_verdict=gate_verdict,
                    model=model,
```

Replace with:

```python
                logger.info("Shot %s: keyframe generated → %s ($%.3f, %d attempts)",
                           shot_id, rel_path, total_cost, attempts)

                # Write universal sidecar with full provenance
                try:
                    from workspace.sidecar import write_pipeline_sidecar
                    write_pipeline_sidecar(
                        keyframe_path,
                        model=model,
                        prompt=prompt,
                        refs_used=[],
                        cost=total_cost,
                        gate_results={
                            g.gate_name: ("pass" if g.passed else "fail")
                            for g in [gate_verdict] if g
                        } if gate_verdict else {},
                        generation_params={
                            "aspect_ratio": aspect_ratio,
                            "attempts": attempts,
                        },
                        inputs_snapshot=inputs_snapshot,
                        shot_id=shot_id,
                        pipeline="still",
                    )
                except Exception as e:
                    logger.warning("Sidecar write failed for %s: %s", shot_id, e)

                return StepResult(
                    shot_id=shot_id,
                    success=True,
                    final_state="keyframe_generated",
                    output_path=rel_path,
                    cost_usd=total_cost,
                    error=None,
                    take_index=take_idx,
                    gate_verdict=gate_verdict,
                    model=model,
```

### Validation Gate: Phase 7

```
GATE_P7:
  Run: |
    grep -c 'write_pipeline_sidecar' $RECOIL_ROOT/execution/step_runner.py
  Expect: >= 2 (one in execute_video, one in execute_keyframe)
```

---

## Phase 8 — Tests

**Goal:** Create a test suite for the sidecar module.

**Creates:** `workspace/tests/test_sidecar.py` (NEW FILE — output complete)

First, ensure the test directory exists:

```bash
mkdir -p $RECOIL_ROOT/workspace/tests
touch $RECOIL_ROOT/workspace/tests/__init__.py
```

### File: `workspace/tests/test_sidecar.py`

```python
#!/usr/bin/env python3
"""Tests for workspace/sidecar.py — Universal Sidecar Module.

Run: python3 -m pytest workspace/tests/test_sidecar.py -v
"""

import json
import os
import sys
import tempfile
from pathlib import Path

import pytest

# ── Path setup ────────────────────────────────────────────────
_RECOIL_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_RECOIL_ROOT) not in sys.path:
    sys.path.insert(0, str(_RECOIL_ROOT))

from workspace.sidecar import (
    MEDIA_EXTENSIONS,
    SCHEMA_VERSION,
    VALID_STATUSES,
    _sidecar_path,
    archive_with_sidecar,
    auto_stub_missing,
    create_stub_sidecar,
    ensure_sidecar,
    get_status,
    promote_to_canonical,
    read_sidecar,
    restore_from_archive,
    scan_for_missing_sidecars,
    set_status,
    write_pipeline_sidecar,
    write_sidecar,
)


# ── Fixtures ──────────────────────────────────────────────────

@pytest.fixture
def tmp_project(tmp_path):
    """Create a minimal project directory structure."""
    project_dir = tmp_path / "test_project"
    output_dir = project_dir / "output"
    refs_dir = output_dir / "refs" / "characters" / "sadie"
    refs_dir.mkdir(parents=True)
    canonical_dir = output_dir / "refs" / "_canonical" / "characters" / "sadie"
    canonical_dir.mkdir(parents=True)
    (canonical_dir / "_meta").mkdir()
    archive_dir = output_dir / "_archive"
    archive_dir.mkdir()
    state_dir = project_dir / "state" / "starsend"
    state_dir.mkdir(parents=True)
    # Also create flat state dir for fallback path tests
    (project_dir / "state").mkdir(parents=True, exist_ok=True)
    return project_dir


@pytest.fixture
def sample_image(tmp_project):
    """Create a sample image file."""
    img = tmp_project / "output" / "refs" / "characters" / "sadie" / "hero_v4.jpg"
    img.write_bytes(b"\xff\xd8\xff\xe0" + b"\x00" * 100)  # minimal JPEG header
    return img


# ── Core Read/Write Tests ─────────────────────────────────────

class TestSidecarPath:
    def test_sidecar_path_jpg(self, sample_image):
        sc = _sidecar_path(sample_image)
        assert sc.name == "hero_v4.jpg.json"
        assert sc.parent == sample_image.parent

    def test_sidecar_path_mp4(self, tmp_path):
        video = tmp_path / "scene_01.mp4"
        sc = _sidecar_path(video)
        assert sc.name == "scene_01.mp4.json"


class TestReadWrite:
    def test_write_and_read(self, sample_image):
        data = {"status": "candidate", "notes": "test"}
        write_sidecar(sample_image, data)
        result = read_sidecar(sample_image)
        assert result is not None
        assert result["status"] == "candidate"
        assert result["notes"] == "test"
        assert result["schema_version"] == SCHEMA_VERSION
        assert "updated_at" in result

    def test_read_missing_returns_none(self, sample_image):
        assert read_sidecar(sample_image) is None

    def test_write_is_atomic(self, sample_image):
        """Verify atomic write creates the file and it's valid JSON."""
        write_sidecar(sample_image, {"test": True})
        sc = _sidecar_path(sample_image)
        assert sc.is_file()
        data = json.loads(sc.read_text())
        assert data["test"] is True

    def test_write_overwrites(self, sample_image):
        write_sidecar(sample_image, {"version": 1})
        write_sidecar(sample_image, {"version": 2})
        result = read_sidecar(sample_image)
        assert result["version"] == 2

    def test_read_corrupt_json_returns_none(self, sample_image):
        sc = _sidecar_path(sample_image)
        sc.write_text("not json {{{")
        assert read_sidecar(sample_image) is None


class TestEnsureSidecar:
    def test_creates_stub_when_missing(self, sample_image):
        data = ensure_sidecar(sample_image)
        assert data["source"] == "manual_drop"
        assert data["status"] == "candidate"
        # Verify file was actually written
        assert _sidecar_path(sample_image).is_file()

    def test_returns_existing_when_present(self, sample_image):
        write_sidecar(sample_image, {"source": "pipeline", "status": "front-runner"})
        data = ensure_sidecar(sample_image)
        assert data["source"] == "pipeline"
        assert data["status"] == "front-runner"


class TestCreateStub:
    def test_stub_has_required_fields(self, sample_image):
        data = create_stub_sidecar(sample_image)
        assert data["schema_version"] == SCHEMA_VERSION
        assert data["source"] == "manual_drop"
        assert data["status"] == "candidate"
        assert "created_at" in data
        assert "updated_at" in data
        assert data["provenance"] == {}
        assert data["lineage"] == {}


# ── Status Management Tests ───────────────────────────────────

class TestSetStatus:
    def test_set_valid_status(self, sample_image):
        for status in VALID_STATUSES:
            data = set_status(sample_image, status)
            assert data["status"] == status

    def test_set_invalid_status_raises(self, sample_image):
        with pytest.raises(ValueError, match="Invalid status"):
            set_status(sample_image, "bogus")

    def test_extra_kwargs_merged(self, sample_image):
        data = set_status(sample_image, "front-runner", notes="Best one yet")
        assert data["notes"] == "Best one yet"

    def test_creates_sidecar_if_missing(self, sample_image):
        data = set_status(sample_image, "front-runner")
        assert data["source"] == "manual_drop"  # auto-stubbed
        assert data["status"] == "front-runner"


class TestGetStatus:
    def test_returns_none_when_no_sidecar(self, sample_image):
        assert get_status(sample_image) is None

    def test_returns_correct_status(self, sample_image):
        set_status(sample_image, "demoted")
        assert get_status(sample_image) == "demoted"


# ── Canonical Promotion Tests ─────────────────────────────────

class TestPromoteToCanonical:
    def test_promotion_copies_file(self, sample_image, tmp_project):
        promote_to_canonical(
            sample_image,
            asset_type="characters",
            entity_id="sadie",
            project_dir=tmp_project,
        )
        hero = tmp_project / "output" / "refs" / "_canonical" / "characters" / "sadie" / "hero.jpg"
        assert hero.is_file()

    def test_promotion_writes_canonical_sidecar(self, sample_image, tmp_project):
        promote_to_canonical(
            sample_image,
            asset_type="characters",
            entity_id="sadie",
            project_dir=tmp_project,
        )
        meta_sc = tmp_project / "output" / "refs" / "_canonical" / "characters" / "sadie" / "_meta" / "hero.jpg.json"
        assert meta_sc.is_file()
        data = json.loads(meta_sc.read_text())
        assert data["status"] == "canonical"

    def test_promotion_updates_casting_state(self, sample_image, tmp_project):
        promote_to_canonical(
            sample_image,
            asset_type="characters",
            entity_id="sadie",
            project_dir=tmp_project,
        )
        # Check both legacy and flat locations
        casting_path = tmp_project / "state" / "starsend" / "casting_state.json"
        alt_path = tmp_project / "state" / "casting_state.json"
        assert casting_path.is_file() or alt_path.is_file()
        actual_path = casting_path if casting_path.is_file() else alt_path
        casting = json.loads(actual_path.read_text())
        assert "characters" in casting
        assert "sadie" in casting["characters"]
        assert "hero_path" in casting["characters"]["sadie"]

    def test_promotion_uses_flat_casting_state(self, sample_image, tmp_project):
        """When state/casting_state.json exists (no starsend/ subfolder), use it."""
        # Create a flat casting_state and remove the starsend one
        import shutil
        starsend_dir = tmp_project / "state" / "starsend"
        if starsend_dir.is_dir():
            shutil.rmtree(str(starsend_dir))
        flat_casting = tmp_project / "state" / "casting_state.json"
        flat_casting.write_text(json.dumps({"characters": {}}))

        promote_to_canonical(
            sample_image,
            asset_type="characters",
            entity_id="sadie",
            project_dir=tmp_project,
        )
        casting = json.loads(flat_casting.read_text())
        assert "sadie" in casting["characters"]

    def test_promotion_sets_original_status(self, sample_image, tmp_project):
        data = promote_to_canonical(
            sample_image,
            asset_type="characters",
            entity_id="sadie",
            project_dir=tmp_project,
        )
        assert data["status"] == "canonical"
        assert "promoted_to" in data

    def test_promotion_invalid_type_raises(self, sample_image, tmp_project):
        with pytest.raises(ValueError, match="Unknown asset_type"):
            promote_to_canonical(
                sample_image,
                asset_type="widgets",
                entity_id="foo",
                project_dir=tmp_project,
            )

    def test_promotion_singular_type_works(self, sample_image, tmp_project):
        """Test that 'character' (singular) is accepted as well as 'characters'."""
        promote_to_canonical(
            sample_image,
            asset_type="character",
            entity_id="sadie",
            project_dir=tmp_project,
        )
        hero = tmp_project / "output" / "refs" / "_canonical" / "characters" / "sadie" / "hero.jpg"
        assert hero.is_file()


# ── Archive / Restore Tests ───────────────────────────────────

class TestArchive:
    def test_archive_moves_file(self, sample_image, tmp_project):
        original = Path(str(sample_image))  # copy path before move
        dest = archive_with_sidecar(sample_image, tmp_project)
        assert dest.is_file()
        assert not original.is_file()
        assert "_archive" in str(dest)

    def test_archive_moves_sidecar(self, sample_image, tmp_project):
        write_sidecar(sample_image, {"notes": "test archive"})
        dest = archive_with_sidecar(sample_image, tmp_project)
        sc = _sidecar_path(dest)
        assert sc.is_file()
        data = json.loads(sc.read_text())
        assert data["status"] == "archived"
        assert data["notes"] == "test archive"

    def test_archive_sets_archived_from(self, sample_image, tmp_project):
        dest = archive_with_sidecar(sample_image, tmp_project)
        sc_data = read_sidecar(dest)
        assert sc_data is not None
        assert "archived_from" in sc_data


class TestRestore:
    def test_restore_from_archive(self, sample_image, tmp_project):
        # Archive first
        dest = archive_with_sidecar(sample_image, tmp_project)
        # Then restore
        restored = restore_from_archive(dest, tmp_project)
        assert restored.is_file()
        assert "_archive" not in str(restored)

    def test_restore_sets_candidate_status(self, sample_image, tmp_project):
        dest = archive_with_sidecar(sample_image, tmp_project)
        restored = restore_from_archive(dest, tmp_project)
        sc_data = read_sidecar(restored)
        assert sc_data is not None
        assert sc_data["status"] == "candidate"
        assert "archived_from" not in sc_data

    def test_restore_moves_sidecar(self, sample_image, tmp_project):
        write_sidecar(sample_image, {"notes": "round trip"})
        dest = archive_with_sidecar(sample_image, tmp_project)
        restored = restore_from_archive(dest, tmp_project)
        sc_data = read_sidecar(restored)
        assert sc_data is not None
        assert sc_data["notes"] == "round trip"


# ── Scanner Tests ─────────────────────────────────────────────

class TestScanner:
    def test_finds_missing_sidecars(self, tmp_project):
        # Create some media files without sidecars
        output = tmp_project / "output"
        (output / "test1.jpg").write_bytes(b"\xff" * 10)
        (output / "test2.png").write_bytes(b"\x89PNG" + b"\x00" * 10)
        (output / "test3.mp4").write_bytes(b"\x00" * 10)

        missing = scan_for_missing_sidecars(output)
        assert len(missing) == 3

    def test_ignores_files_with_sidecars(self, sample_image, tmp_project):
        write_sidecar(sample_image, {"status": "candidate"})
        output = tmp_project / "output"
        missing = scan_for_missing_sidecars(output)
        paths = [str(p) for p in missing]
        assert str(sample_image) not in paths

    def test_ignores_meta_directories(self, tmp_project):
        output = tmp_project / "output"
        meta_img = output / "refs" / "_canonical" / "characters" / "sadie" / "_meta" / "hero.jpg"
        meta_img.parent.mkdir(parents=True, exist_ok=True)
        meta_img.write_bytes(b"\xff" * 10)
        missing = scan_for_missing_sidecars(output)
        paths = [str(p) for p in missing]
        assert str(meta_img) not in paths

    def test_auto_stub_creates_sidecars(self, tmp_project):
        output = tmp_project / "output"
        (output / "a.jpg").write_bytes(b"\xff" * 10)
        (output / "b.png").write_bytes(b"\x89" * 10)

        count = auto_stub_missing(output)
        assert count == 2

        # Verify sidecars were created
        assert (output / "a.jpg.json").is_file()
        assert (output / "b.png.json").is_file()

    def test_auto_stub_idempotent(self, tmp_project):
        output = tmp_project / "output"
        (output / "a.jpg").write_bytes(b"\xff" * 10)

        count1 = auto_stub_missing(output)
        count2 = auto_stub_missing(output)
        assert count1 == 1
        assert count2 == 0  # Already has sidecar


# ── Pipeline Sidecar Tests ────────────────────────────────────

class TestPipelineSidecar:
    def test_write_pipeline_sidecar(self, sample_image):
        data = write_pipeline_sidecar(
            sample_image,
            model="seedream-v4.5",
            prompt="Medium close-up of Sadie",
            cost=0.039,
            gate_results={"black_frame": "pass"},
            generation_params={"aspect_ratio": "9:16"},
            shot_id="EP001_SH03",
            pipeline="video",
        )
        assert data["source"] == "pipeline"
        assert data["status"] == "candidate"
        assert data["provenance"]["model"] == "seedream-v4.5"
        assert data["provenance"]["cost"] == 0.039
        assert data["provenance"]["shot_id"] == "EP001_SH03"

    def test_pipeline_sidecar_with_inputs_snapshot(self, sample_image):
        snapshot = {
            "inputs_snapshot_hash": "abc123",
            "characters": [
                {"char_id": "sadie", "display_name": "Sadie"},
            ],
            "location_id": "int_apartment",
        }
        data = write_pipeline_sidecar(
            sample_image,
            model="kling-v3",
            prompt="Test prompt",
            inputs_snapshot=snapshot,
            pipeline="video",
        )
        assert data["provenance"]["inputs_snapshot_hash"] == "abc123"
        assert data["provenance"]["location_id"] == "int_apartment"
        assert len(data["provenance"]["refs_used"]) == 1
        assert data["provenance"]["refs_used"][0]["id"] == "sadie"

    def test_pipeline_sidecar_persists(self, sample_image):
        write_pipeline_sidecar(
            sample_image,
            model="test",
            prompt="test",
            pipeline="still",
        )
        result = read_sidecar(sample_image)
        assert result is not None
        assert result["source"] == "pipeline"


# ── Edge Cases ────────────────────────────────────────────────

class TestEdgeCases:
    def test_nonexistent_directory_scanner(self):
        missing = scan_for_missing_sidecars(Path("/tmp/does_not_exist_abc123"))
        assert missing == []

    def test_hidden_files_ignored(self, tmp_project):
        output = tmp_project / "output"
        (output / ".hidden.jpg").write_bytes(b"\xff" * 10)
        missing = scan_for_missing_sidecars(output)
        assert len(missing) == 0

    def test_schema_version_always_set(self, sample_image):
        write_sidecar(sample_image, {"custom": "data"})
        result = read_sidecar(sample_image)
        assert result["schema_version"] == SCHEMA_VERSION

    def test_updated_at_always_set(self, sample_image):
        write_sidecar(sample_image, {})
        result = read_sidecar(sample_image)
        assert "updated_at" in result
        assert result["updated_at"].endswith("Z")
```

### Validation Gate: Phase 8

```
GATE_P8:
  Run: cd $RECOIL_ROOT && python3 -m pytest workspace/tests/test_sidecar.py -v --tb=short 2>&1 | tail -5
  Expect: "passed" in output, no "FAILED"
```

---

## Post-Build Verification

After all 8 phases complete, run this final validation:

```bash
cd $RECOIL_ROOT

# 1. Import check — all modules load without error
python3 -c "
import sys; sys.path.insert(0, '.')
from workspace.sidecar import read_sidecar, write_sidecar, ensure_sidecar, set_status
from workspace.sidecar import promote_to_canonical, archive_with_sidecar, restore_from_archive
from workspace.sidecar import scan_for_missing_sidecars, auto_stub_missing, write_pipeline_sidecar
from workspace.server import app
from workspace.mcp_server import _TOOLS
print('Import check: PASS')
"

# 2. Server route check
python3 -c "
import sys; sys.path.insert(0, '.')
from workspace.server import app
routes = [r.path for r in app.routes if hasattr(r, 'path')]
required = [
    '/api/sidecar/{project}',
    '/api/promote/{project}',
    '/api/demote/{project}',
    '/api/restore/{project}',
    '/api/archive/{project}',
    '/api/tree/{project}',
]
for r in required:
    assert r in routes, f'Missing: {r}'
print('Route check: PASS')
"

# 3. MCP tool count
python3 -c "
import sys; sys.path.insert(0, '.')
from workspace.mcp_server import _TOOLS
assert 'get_file_provenance' in _TOOLS
assert len(_TOOLS) >= 13
print(f'MCP tool check: PASS ({len(_TOOLS)} tools)')
"

# 4. Test suite
python3 -m pytest workspace/tests/test_sidecar.py -v --tb=short

# 5. StepRunner sidecar writes
grep -c 'write_pipeline_sidecar' execution/step_runner.py

# 6. Frontend functions
grep -c 'loadSidecarInspector' workspace/static/workspace.js
grep -c 'showCanonicalDialog' workspace/static/workspace.js
grep -c 'promo-dialog' workspace/static/workspace.css
```

---

## Summary of Changes by File

| File | Action | LOC (est.) |
|------|--------|-----------|
| `workspace/sidecar.py` | **NEW** | ~380 |
| `workspace/server.py` | MODIFY — 4 new endpoints, VAT update, archive update | ~250 |
| `workspace/mcp_server.py` | MODIFY — 1 new tool, enhanced prime_project + get_shot_detail | ~120 |
| `workspace/static/workspace.js` | MODIFY — context menu, status icons, sidecar inspector | ~280 |
| `workspace/static/workspace.css` | MODIFY — append sidecar styles | ~200 |
| `execution/step_runner.py` | MODIFY — 2 sidecar write calls (video + keyframe) | ~30 |
| `workspace/tests/__init__.py` | **NEW** | 0 |
| `workspace/tests/test_sidecar.py` | **NEW** | ~340 |
| **Total** | | **~1,600** |

---

## Hazard Notes for Sub-Agent

1. **Do NOT modify `pipeline/lib/sidecar.py`** — that module handles `_meta/` sidecars for `_canonical/` refs. Leave it completely untouched.

2. **The `workspace/sidecar.py` module writes to `{file}.json`** (adjacent to media), NOT to `_meta/{file}.json`. The only exception is `promote_to_canonical()` which writes to `_meta/` for canonical compatibility.

3. **The `_build_metadata_index` function has 3 data sources** with priority: universal sidecar > ExecutionStore > canonical `_meta/` sidecar. ExecutionStore pipeline status_color takes precedence for files tracked by the pipeline. Universal sidecar status is exposed as `sidecar_status` in the tree node.

4. **The archive endpoint already exists** at `/api/archive/{project}`. Phase 4 REPLACES it — do not create a second endpoint.

5. **The context menu in workspace.js already exists** with Archive and Copy Path. Phase 6 REPLACES the `showContextMenu` function — do not create a second function.

6. **StepRunner sidecar writes are wrapped in try/except** because the workspace module may not always be importable (e.g., in test environments without full workspace setup). The sidecar write is non-critical — generation must never fail because of a sidecar error.

7. **Auto-stub runs on every tree scan** but is cheap — `scan_for_missing_sidecars` only does `rglob` + existence checks, and `create_stub_sidecar` only writes files that don't exist. The tree scan already does `rglob`, so the overhead is minimal.

8. **CSS uses existing design system variables** — `var(--accent-blue)`, `var(--accent-amber)`, `var(--text-dim)`, etc. Do not introduce new color variables.
