# BUILD_SPEC — Console v2 Phase 4: Remaining ProposalKind Executors

**Generated:** 2026-05-12 (from 1-round Opus consult after console-v2-prod verified green)
**Input:** `consultations/recoil/console-v2-phase-4-2026-05-12/opus_round_1.md`
**Detail level:** max
**Phases:** 5 (one per remaining ProposalKind)
**Predecessor:** `recoil/console-v2/BUILD_SPEC.md` (Phases 0/1/1.5b/2/3 — merged at e8790066)

---

## Dependency Graph

```
Phase A (BeatInsertion):         depends_on console-v2-prod merged (adds _find_beat)
Phase B (MultiBeatDirective):    depends_on A (uses _find_beat)
Phase C (ExtractCutaway):        depends_on A (uses _find_beat)
Phase D (RefSwap):               depends_on A (uses _find_beat)
Phase E (RetryStrategyEdit):     depends_on A (uses _find_beat)
```

The five phases can run in any order so long as the `_find_beat` shared helper (added once) is in place. Recommend serial dispatch: A first to add `_find_beat`, then B/C/D/E in any order.

---

## CODEBASE STATE AT SPEC TIME (shipped in console-v2-prod 2026-05-12)

**Already wired (do NOT re-implement):**
- `recoil/api/executors/__init__.py` — package docstring
- `recoil/api/executors/prompt_rewrite.py` — reference pattern for new executors
- `recoil/api/executors/param_tweak.py` — reference pattern for take-targeted executors
- `recoil/api/executors/script_edit.py` — reference pattern for file-replacement executors
- `recoil/api/adapters/beats.py` — has `set_prompt_override`, `update_take_params`, `get_episode_id_for_beat` in `__all__`; uses `_find_shot_for_take`, `_shots_dir`, `_load_shot`, `atomic_write_json` primitives
- `recoil/api/proposals_routes.py:approve_proposal` — dispatch block matches `kind` string; PromptRewrite, ParameterChange, ScriptEdit branches all wired
- `recoil/api/tests/test_smoke_integration.py` — TestClient + `_reset_acted_for_tests` + `BUS._reset_for_tests()` + `monkeypatch.setattr(_beats, "_shots_dir", ...)` test pattern established
- `recoil/core/prompt_compiler.py:compile()` — `shot["prompt_override"]` read-path (early-return bypass at top of function)

**Executor contract (per shipped pattern):**
- File: `recoil/api/executors/<kind_snake>.py`
- Exports `execute(**kwargs) -> dict`
- Raises `HTTPException(404)` if target not found, `HTTPException(422)` for unresolvable params
- Emits BUS event on both success (`severity="success"`) and error (`severity="error"`)
- Returns `{"<kind>_applied": True, ...details}` on success

**Dispatch contract (in `approve_proposal`):**
- Reads proposal from disk via `_read_modify_write`, captures `kind/target/diff/project`
- Each kind has a dedicated `if kind == "...":` branch BEFORE the 501 fallthrough
- Validates target prefix, extracts adapter params from `diff`, calls executor
- Returns `{ok, status: "executed", result, proposal_id}` on success

---

## Wire Format Reconciliation (applies to all 5 kinds)

The zod contracts in `recoil/console-v2/packages/contracts/src/proposals.ts` define structured sub-objects (`insert`, `directive`, `cutaway`, `swap`, `strategy`), but the on-disk proposal wire format uses a flat `diff: list[_DiffEntry]` where `_DiffEntry` has `{kind, before, after, text, key}`. Each dispatch branch extracts structured fields from the flat list using key-based lookup — the same approach the existing 3 kinds use. **No changes to `_ProposalCreate` or `_DiffEntry` are required.**

---

## Shared Adapter Primitive (add once, before any phase)

All 5 new kinds need beat-level lookup. Extract a private helper in `beats.py` (does NOT replace the existing inline lookup in `set_prompt_override`):

```python
# Add to recoil/api/adapters/beats.py (private, not in __all__)

def _find_beat(
    beat_id: str, project_id: Optional[str] = None
) -> tuple[Path, dict]:
    """Locate the shot file for beat_id. Returns (path, shot_dict).
    Raises KeyError if no shot file exists for beat_id."""
    validate_hierarchy_id("beat_id", beat_id)
    if project_id is not None:
        validate_project_id(project_id)
    candidates: list[str]
    if project_id:
        candidates = [project_id]
    else:
        root = projects_root()
        candidates = [p.name for p in root.iterdir() if p.is_dir()] if root.exists() else []
    for slug in candidates:
        path = _shots_dir(slug) / f"{beat_id}.json"
        shot = _load_shot(path)
        if shot is not None:
            return path, shot
    raise KeyError(f"beat {beat_id!r} not found in any project")
```

Add to Phase A's beats.py changes since Phase A runs first.

---
## Phase A: BeatInsertionProposal

depends_on: console-v2-prod merged


### Resolution
- **Target format:** `"episode:<episode_id>"` (e.g. `"episode:EP001"`)
- **Resolution:** `episode_id = target[len("episode:"):]`
- **Wire format:** diff entries use key-based lookup:
  ```
  [{kind: "insert", key: "sceneId", after: "EP001__synthetic_scene_1"},
   {kind: "insert", key: "afterBeatId", after: "EP001_SH05"},
   {kind: "insert", key: "text", text: "Visual description of the new beat"}]
  ```
- **404 condition:** episode has zero existing beats AND project_id doesn't resolve to a valid project directory (i.e., the shots dir doesn't exist)
- **422 conditions:** missing `text` in diff; empty episode_id; missing project_id

### Diff schema
The dispatch reads diff as a flat list. Extracts:
- `key == "sceneId"` → `entry["after"]` (the synthetic scene id; used to derive episode if needed)
- `key == "afterBeatId"` → `entry["after"]` (nullable; stored as metadata, does not affect ordering in MVP)
- `key == "text"` → `entry["after"]` or `entry["text"]` (the beat's visual description; required)

### Write target
**Creates a new file:** `projects/<project>/state/visual/shots/<new_beat_id>.json`

Beat ID naming: scan existing shots for the episode, find highest `_SH\d+` suffix, increment. E.g., if `EP001_SH36` is the highest, new beat = `EP001_SH37`.

Full JSON skeleton for the new file:
```json
{
    "schema_version": 1,
    "shot_id": "EP001_SH37",
    "episode_id": "EP001",
    "status": "pending",
    "takes": [],
    "prompt_override": "The visual description text from the proposal",
    "inserted_after": "EP001_SH05",
    "created_by": "BeatInsertionProposal"
}
```

`prompt_override` is set so that `prompt_compiler.py:compile()` will use this text directly at generation time — no separate script-to-prompt step needed.

### Adapter

```python
# In recoil/api/adapters/beats.py

def _next_beat_id(episode_id: str, project_id: str) -> str:
    """Generate the next sequential beat_id for an episode."""
    max_num = 0
    pattern = re.compile(
        rf'^{re.escape(episode_id)}_SH(\d+)$', re.IGNORECASE
    )
    for path in _list_beat_files(project_id):
        m = pattern.match(path.stem)
        if m:
            max_num = max(max_num, int(m.group(1)))
    return f"{episode_id}_SH{max_num + 1:02d}"


def insert_beat(
    episode_id: str,
    text: str,
    project_id: str,
    after_beat_id: Optional[str] = None,
) -> dict:
    """Create a new shot JSON file for a brand-new beat in the episode.

    Raises KeyError if project_id's shots directory does not exist.
    """
    validate_project_id(project_id)
    shots_dir = _shots_dir(project_id)
    if not shots_dir.exists():
        raise KeyError(
            f"shots directory does not exist for project {project_id!r}"
        )
    new_beat_id = _next_beat_id(episode_id, project_id)
    shot = {
        "schema_version": 1,
        "shot_id": new_beat_id,
        "episode_id": episode_id,
        "status": "pending",
        "takes": [],
        "prompt_override": text,
        "inserted_after": after_beat_id,
        "created_by": "BeatInsertionProposal",
    }
    path = shots_dir / f"{new_beat_id}.json"
    atomic_write_json(path, shot)
    return {
        "shot_id": new_beat_id,
        "episode_id": episode_id,
        "beat_insertion_applied": True,
    }
```

Add `"insert_beat"` to `__all__`.

### Executor

```python
# recoil/api/executors/beat_insertion.py
"""Executor for BeatInsertionProposal.

Creates a new shot JSON file with prompt_override set to the proposal text.
"""
from __future__ import annotations

import logging
from typing import Optional

from fastapi import HTTPException

from recoil.api.adapters import beats as beats_adapter
from recoil.api.eventbus import BUS

logger = logging.getLogger(__name__)

_SCOPE = "api/executors/beat_insertion"


def execute(
    episode_id: str,
    text: str,
    project_id: Optional[str] = None,
    after_beat_id: Optional[str] = None,
) -> dict:
    """Create a new beat (shot JSON file) in the given episode.

    Args:
        episode_id: Episode identifier (e.g. "EP001").
        text: Visual description for the new beat (becomes prompt_override).
        project_id: Required project slug.
        after_beat_id: Optional beat to insert after (stored as metadata).

    Returns:
        {"shot_id": str, "episode_id": str, "beat_insertion_applied": True}

    Raises:
        HTTPException(422): If project_id is absent or empty.
        HTTPException(404): If the project's shots directory does not exist.
    """
    if not project_id:
        raise HTTPException(
            status_code=422,
            detail={
                "error": "project_id_required",
                "message": "BeatInsertionProposal requires project_id",
            },
        )
    try:
        result = beats_adapter.insert_beat(
            episode_id=episode_id,
            text=text,
            project_id=project_id,
            after_beat_id=after_beat_id,
        )
    except KeyError as exc:
        BUS.emit_sync(
            severity="failure",
            scope=_SCOPE,
            summary=f"beat_insertion_target_not_found: {episode_id}",
            payload={
                "episode_id": episode_id,
                "project_id": project_id,
                "error": str(exc),
            },
        )
        raise HTTPException(
            status_code=404,
            detail={
                "error": "shots_dir_not_found",
                "episode_id": episode_id,
                "project_id": project_id,
                "message": str(exc),
            },
        )
    BUS.emit_sync(
        severity="success",
        scope=_SCOPE,
        summary=f"beat_insertion_applied: {result['shot_id']}",
        payload={
            **result,
            "episode_id": episode_id,
            "after_beat_id": after_beat_id,
            "text_length": len(text),
        },
    )
    return result
```

### Dispatch branch

```python
    # ── BeatInsertionProposal dispatch ─────────────────────────────────
    if kind == "BeatInsertionProposal":
        if not target.startswith("episode:"):
            return JSONResponse(
                status_code=422,
                content={
                    "error": "invalid_target",
                    "detail": f"BeatInsertionProposal target must start with 'episode:'; got {target!r}",
                    "proposal_id": proposal_id,
                },
            )
        episode_id = target[len("episode:"):]
        text = None
        after_beat_id = None
        for entry in diff:
            k = entry.get("key")
            if k == "text":
                text = entry.get("after") or entry.get("text")
            elif k == "afterBeatId":
                after_beat_id = entry.get("after")
        if not text:
            return JSONResponse(
                status_code=422,
                content={
                    "error": "empty_text",
                    "detail": "BeatInsertionProposal diff contains no 'text' value",
                    "proposal_id": proposal_id,
                },
            )
        result = _executor_beat_insertion.execute(
            episode_id=episode_id,
            text=text,
            project_id=proposal_project if proposal_project != "default" else None,
            after_beat_id=after_beat_id,
        )
        BUS.emit_sync(
            severity="success",
            scope=_BUS_SCOPE,
            summary=f"proposal approved + executed: {captured.get('title')}",
            payload={"id": proposal_id, "kind": kind, "execution_result": result},
        )
        return {"ok": True, "status": "executed", "result": result, "proposal_id": proposal_id}
```

### Smoke test

```python
def test_beat_insertion_creates_new_shot(
    client: TestClient, tmp_path: Path, monkeypatch
) -> None:
    """Approve a BeatInsertionProposal → new shot JSON file created."""
    import json as _json
    from recoil.api.adapters import beats as _beats

    fake_project = "smoke_bi_project"
    shots_dir = tmp_path / fake_project / "state" / "visual" / "shots"
    shots_dir.mkdir(parents=True)
    # Seed one existing beat so _next_beat_id increments from it.
    existing = {"shot_id": "EP001_SH01", "episode_id": "EP001", "takes": [], "status": "pending"}
    (shots_dir / "EP001_SH01.json").write_text(_json.dumps(existing), encoding="utf-8")

    monkeypatch.setattr(_beats, "_shots_dir", lambda pid: tmp_path / pid / "state" / "visual" / "shots")

    create_body = {
        "target": "episode:EP001",
        "title": "Smoke: insert beat",
        "est_cost_usd": 0.0,
        "est_time": "instant",
        "kind": "BeatInsertionProposal",
        "project": fake_project,
        "diff": [
            {"kind": "insert", "key": "text", "text": "A new establishing shot of the city at dawn"},
            {"kind": "insert", "key": "afterBeatId", "after": "EP001_SH01"},
        ],
    }
    cr = client.post("/api/chat/proposals", json=create_body)
    assert cr.status_code == 200, cr.text
    proposal_id = cr.json()["id"]

    ar = client.post(
        f"/api/chat/proposals/{proposal_id}/approve",
        json={"project": fake_project},
    )
    assert ar.status_code == 200, ar.text
    result = ar.json()
    assert result.get("status") == "executed"
    assert result.get("ok") is True

    # New shot file should exist with the next sequential ID.
    new_path = shots_dir / "EP001_SH02.json"
    assert new_path.exists(), f"Expected {new_path} to exist"
    new_shot = _json.loads(new_path.read_text())
    assert new_shot["shot_id"] == "EP001_SH02"
    assert new_shot["episode_id"] == "EP001"
    assert new_shot["prompt_override"] == "A new establishing shot of the city at dawn"
    assert new_shot["inserted_after"] == "EP001_SH01"
    assert new_shot["status"] == "pending"

    history = BUS.history()
    insert_events = [e for e in history if "beat_insertion_applied" in e.summary]
    assert len(insert_events) >= 1
```

### Validation

```bash
# Grep targets
grep -r "beat_insertion" recoil/api/executors/ recoil/api/adapters/beats.py recoil/api/proposals_routes.py
grep -r "insert_beat" recoil/api/adapters/beats.py
grep -r "BeatInsertionProposal" recoil/api/proposals_routes.py recoil/api/tests/

# Pytest
python -m pytest recoil/api/tests/test_smoke_integration.py::test_beat_insertion_creates_new_shot -xvs
```

### Risks / open questions
- **Ordering is append-only in MVP.** `afterBeatId` is stored as metadata but doesn't affect file sort order. Beats in the UI are sorted by filename. True positional insertion would require renumbering downstream shot files or introducing an explicit ordering manifest — deferred.
- **Episode must have a shots directory.** If the project exists but has no `state/visual/shots/` yet, the executor returns 404. The harness should ensure the dir exists before creating BeatInsertion proposals.

---

## Phase B: MultiBeatDirectiveProposal

depends_on: A (uses _find_beat)


### Resolution
- **Target format:** `"episode:<episode_id>"` (e.g. `"episode:EP001"`)
- **Resolution:** episode_id extracted from target, but the REAL targets are the `beatIds` in the diff.
- **Wire format:**
  ```
  [{kind: "directive", key: "beatIds", after: ["EP001_SH01", "EP001_SH03", "EP001_SH05"]},
   {kind: "directive", key: "note", text: "Increase tension and reduce dialogue across these beats"}]
  ```
- **404 condition:** any beat_id in the list does not resolve to a shot file on disk
- **422 conditions:** empty beatIds list; missing note text

### Diff schema
Dispatch extracts:
- `key == "beatIds"` → `entry["after"]` (a JSON list of beat_id strings; required, min 1)
- `key == "note"` → `entry["after"]` or `entry["text"]` (the directive text; required)

### Write target
**Mutates existing files:** each beat's shot JSON at `projects/<project>/state/visual/shots/<beat_id>.json`.

Appends to `shot["directives"]` (a list of strings). If the field doesn't exist, it's created as `[note]`. Append-only — the field is a growing log of director notes.

**Scope boundary:** the generation pipeline does NOT read `shot["directives"]` today. This is a metadata field for editorial/UI visibility. Pipeline wiring is a future follow-on.

### Adapter

```python
# In recoil/api/adapters/beats.py

def add_directive_to_beats(
    beat_ids: list[str],
    note: str,
    project_id: Optional[str] = None,
) -> dict:
    """Append a directive note to each beat's shot JSON.

    Raises KeyError if any beat_id is not found (fails fast on first miss).
    """
    updated: list[str] = []
    for bid in beat_ids:
        path, shot = _find_beat(bid, project_id)
        directives = shot.get("directives", [])
        directives.append(note)
        shot["directives"] = directives
        atomic_write_json(path, shot)
        updated.append(bid)
    return {"beat_ids_updated": updated, "directive_applied": True}
```

Add `"add_directive_to_beats"` to `__all__`.

### Executor

```python
# recoil/api/executors/multi_beat_directive.py
"""Executor for MultiBeatDirectiveProposal.

Appends a directive note to each affected beat's shot JSON.
"""
from __future__ import annotations

import logging
from typing import Optional

from fastapi import HTTPException

from recoil.api.adapters import beats as beats_adapter
from recoil.api.eventbus import BUS

logger = logging.getLogger(__name__)

_SCOPE = "api/executors/multi_beat_directive"


def execute(
    beat_ids: list[str],
    note: str,
    project_id: Optional[str] = None,
) -> dict:
    """Append note to shot["directives"] for every beat in beat_ids.

    Args:
        beat_ids: List of beat identifiers to apply the directive to.
        note: The directive text.
        project_id: Optional project slug.

    Returns:
        {"beat_ids_updated": list[str], "directive_applied": True}

    Raises:
        HTTPException(404): If any beat_id not found on disk.
    """
    try:
        result = beats_adapter.add_directive_to_beats(
            beat_ids=beat_ids, note=note, project_id=project_id
        )
    except KeyError as exc:
        BUS.emit_sync(
            severity="failure",
            scope=_SCOPE,
            summary=f"multi_beat_directive_target_not_found",
            payload={"beat_ids": beat_ids, "error": str(exc)},
        )
        raise HTTPException(
            status_code=404,
            detail={
                "error": "beat_not_found",
                "beat_ids": beat_ids,
                "message": str(exc),
            },
        )
    BUS.emit_sync(
        severity="success",
        scope=_SCOPE,
        summary=f"multi_beat_directive_applied: {len(beat_ids)} beats",
        payload={**result, "note_length": len(note)},
    )
    return result
```

### Dispatch branch

```python
    # ── MultiBeatDirectiveProposal dispatch ────────────────────────────
    if kind == "MultiBeatDirectiveProposal":
        if not target.startswith("episode:"):
            return JSONResponse(
                status_code=422,
                content={
                    "error": "invalid_target",
                    "detail": f"MultiBeatDirectiveProposal target must start with 'episode:'; got {target!r}",
                    "proposal_id": proposal_id,
                },
            )
        beat_ids = None
        note = None
        for entry in diff:
            k = entry.get("key")
            if k == "beatIds":
                beat_ids = entry.get("after")
            elif k == "note":
                note = entry.get("after") or entry.get("text")
        if not beat_ids or not isinstance(beat_ids, list) or len(beat_ids) == 0:
            return JSONResponse(
                status_code=422,
                content={
                    "error": "empty_beat_ids",
                    "detail": "MultiBeatDirectiveProposal diff contains no beatIds list",
                    "proposal_id": proposal_id,
                },
            )
        if not note:
            return JSONResponse(
                status_code=422,
                content={
                    "error": "empty_note",
                    "detail": "MultiBeatDirectiveProposal diff contains no note",
                    "proposal_id": proposal_id,
                },
            )
        result = _executor_multi_beat_directive.execute(
            beat_ids=beat_ids,
            note=note,
            project_id=proposal_project if proposal_project != "default" else None,
        )
        BUS.emit_sync(
            severity="success",
            scope=_BUS_SCOPE,
            summary=f"proposal approved + executed: {captured.get('title')}",
            payload={"id": proposal_id, "kind": kind, "execution_result": result},
        )
        return {"ok": True, "status": "executed", "result": result, "proposal_id": proposal_id}
```

### Smoke test

```python
def test_multi_beat_directive_applies_to_all(
    client: TestClient, tmp_path: Path, monkeypatch
) -> None:
    """Approve a MultiBeatDirectiveProposal → all target beats get the directive."""
    import json as _json
    from recoil.api.adapters import beats as _beats

    fake_project = "smoke_mbd_project"
    shots_dir = tmp_path / fake_project / "state" / "visual" / "shots"
    shots_dir.mkdir(parents=True)
    for i in (1, 3, 5):
        bid = f"EP001_SH{i:02d}"
        data = {"shot_id": bid, "episode_id": "EP001", "takes": [], "status": "pending"}
        (shots_dir / f"{bid}.json").write_text(_json.dumps(data), encoding="utf-8")

    monkeypatch.setattr(_beats, "_shots_dir", lambda pid: tmp_path / pid / "state" / "visual" / "shots")

    create_body = {
        "target": "episode:EP001",
        "title": "Smoke: multi-beat directive",
        "est_cost_usd": 0.0,
        "est_time": "instant",
        "kind": "MultiBeatDirectiveProposal",
        "project": fake_project,
        "diff": [
            {"kind": "directive", "key": "beatIds", "after": ["EP001_SH01", "EP001_SH03", "EP001_SH05"]},
            {"kind": "directive", "key": "note", "text": "Increase visual tension"},
        ],
    }
    cr = client.post("/api/chat/proposals", json=create_body)
    assert cr.status_code == 200, cr.text
    proposal_id = cr.json()["id"]

    ar = client.post(
        f"/api/chat/proposals/{proposal_id}/approve",
        json={"project": fake_project},
    )
    assert ar.status_code == 200, ar.text
    result = ar.json()
    assert result.get("status") == "executed"

    for i in (1, 3, 5):
        bid = f"EP001_SH{i:02d}"
        updated = _json.loads((shots_dir / f"{bid}.json").read_text())
        assert "Increase visual tension" in updated.get("directives", [])

    history = BUS.history()
    dir_events = [e for e in history if "multi_beat_directive_applied" in e.summary]
    assert len(dir_events) >= 1
```

### Validation

```bash
grep -r "multi_beat_directive" recoil/api/executors/ recoil/api/adapters/beats.py recoil/api/proposals_routes.py
grep -r "add_directive_to_beats" recoil/api/adapters/beats.py
grep -r "MultiBeatDirectiveProposal" recoil/api/proposals_routes.py recoil/api/tests/

python -m pytest recoil/api/tests/test_smoke_integration.py::test_multi_beat_directive_applies_to_all -xvs
```

### Risks / open questions
- **Pipeline does not read `shot["directives"]` today.** This is a write-only field until the generation pipeline is taught to incorporate director notes into prompt compilation. Flag for follow-on.
- **Partial failure semantics.** If beat 3 of 5 is not found, beats 1-2 have already been mutated. The executor aborts with 404 for the missing beat. This is acceptable for single-operator MVP — no concurrent writers, and the directive is append-only so re-running after fixing the missing beat is safe.

---

## Phase C: ExtractCutawayProposal

depends_on: A (uses _find_beat)


### Resolution
- **Target format:** `"beat:<from_beat_id>"` (e.g. `"beat:EP001_SH05"`)
- **Resolution:** `from_beat_id = target[len("beat:"):]`. The cutaway is extracted FROM this beat.
- **Wire format:**
  ```
  [{kind: "cutaway", text: "Close-up of the envelope on the table, seal visible"}]
  ```
  `fromBeatId` is the target itself — no need to repeat it in the diff.
- **404 condition:** `from_beat_id` does not resolve to a shot file on disk
- **422 conditions:** missing description text

### Diff schema
Dispatch extracts:
- First diff entry with `kind == "cutaway"` → `entry["text"]` or `entry["after"]` (the cutaway description; required)

### Write target
**Creates a new file** AND **mutates the source file.**

New file: `projects/<project>/state/visual/shots/<from_beat_id>_CUT<NN>.json`

Cutaway ID naming: scan for existing `<from_beat_id>_CUT\d+` files, find max, increment. First cutaway from EP001_SH05 → `EP001_SH05_CUT01`.

New file JSON skeleton:
```json
{
    "schema_version": 1,
    "shot_id": "EP001_SH05_CUT01",
    "episode_id": "EP001",
    "status": "pending",
    "takes": [],
    "prompt_override": "Close-up of the envelope on the table, seal visible",
    "cutaway_source": "EP001_SH05",
    "is_coverage": true,
    "coverage_of": "EP001_SH05",
    "created_by": "ExtractCutawayProposal"
}
```

Source file mutation: appends the new cutaway ID to `shot["cutaways"]` list.

`is_coverage` and `coverage_of` are set to leverage the existing coverage tracking fields already present in some shot dicts (seen in the tartarus data).

### Adapter

```python
# In recoil/api/adapters/beats.py

def _next_cutaway_id(source_beat_id: str, project_id: str) -> str:
    """Generate the next cutaway beat_id for a source beat."""
    max_num = 0
    pattern = re.compile(
        rf'^{re.escape(source_beat_id)}_CUT(\d+)$', re.IGNORECASE
    )
    for path in _list_beat_files(project_id):
        m = pattern.match(path.stem)
        if m:
            max_num = max(max_num, int(m.group(1)))
    return f"{source_beat_id}_CUT{max_num + 1:02d}"


def extract_cutaway(
    from_beat_id: str,
    description: str,
    project_id: Optional[str] = None,
) -> dict:
    """Create a new cutaway shot derived from an existing beat.

    Creates the new shot file AND marks the source beat with a cutaways list.
    Raises KeyError if from_beat_id is not found.
    """
    path, source_shot = _find_beat(from_beat_id, project_id)
    resolved_project = path.parent.parent.parent.parent.name
    episode_id = _derive_episode_id(source_shot, path, resolved_project)
    cutaway_id = _next_cutaway_id(from_beat_id, resolved_project)
    cutaway_shot = {
        "schema_version": 1,
        "shot_id": cutaway_id,
        "episode_id": episode_id,
        "status": "pending",
        "takes": [],
        "prompt_override": description,
        "cutaway_source": from_beat_id,
        "is_coverage": True,
        "coverage_of": from_beat_id,
        "created_by": "ExtractCutawayProposal",
    }
    cutaway_path = path.parent / f"{cutaway_id}.json"
    atomic_write_json(cutaway_path, cutaway_shot)
    cutaways = source_shot.get("cutaways", [])
    cutaways.append(cutaway_id)
    source_shot["cutaways"] = cutaways
    atomic_write_json(path, source_shot)
    return {
        "shot_id": cutaway_id,
        "cutaway_source": from_beat_id,
        "episode_id": episode_id,
        "extract_cutaway_applied": True,
    }
```

Add `"extract_cutaway"` to `__all__`.

### Executor

```python
# recoil/api/executors/extract_cutaway.py
"""Executor for ExtractCutawayProposal.

Creates a new cutaway shot file derived from an existing beat.
"""
from __future__ import annotations

import logging
from typing import Optional

from fastapi import HTTPException

from recoil.api.adapters import beats as beats_adapter
from recoil.api.eventbus import BUS

logger = logging.getLogger(__name__)

_SCOPE = "api/executors/extract_cutaway"


def execute(
    from_beat_id: str,
    description: str,
    project_id: Optional[str] = None,
) -> dict:
    """Extract a cutaway shot from an existing beat.

    Args:
        from_beat_id: The source beat to extract from.
        description: Visual description for the cutaway (becomes prompt_override).
        project_id: Optional project slug.

    Returns:
        {"shot_id": str, "cutaway_source": str, "episode_id": str,
         "extract_cutaway_applied": True}

    Raises:
        HTTPException(404): If from_beat_id not found on disk.
    """
    try:
        result = beats_adapter.extract_cutaway(
            from_beat_id=from_beat_id,
            description=description,
            project_id=project_id,
        )
    except KeyError as exc:
        BUS.emit_sync(
            severity="failure",
            scope=_SCOPE,
            summary=f"extract_cutaway_target_not_found: {from_beat_id}",
            payload={"from_beat_id": from_beat_id, "error": str(exc)},
        )
        raise HTTPException(
            status_code=404,
            detail={
                "error": "beat_not_found",
                "from_beat_id": from_beat_id,
                "message": str(exc),
            },
        )
    BUS.emit_sync(
        severity="success",
        scope=_SCOPE,
        summary=f"extract_cutaway_applied: {result['shot_id']}",
        payload={
            **result,
            "from_beat_id": from_beat_id,
            "description_length": len(description),
        },
    )
    return result
```

### Dispatch branch

```python
    # ── ExtractCutawayProposal dispatch ────────────────────────────────
    if kind == "ExtractCutawayProposal":
        if not target.startswith("beat:"):
            return JSONResponse(
                status_code=422,
                content={
                    "error": "invalid_target",
                    "detail": f"ExtractCutawayProposal target must start with 'beat:'; got {target!r}",
                    "proposal_id": proposal_id,
                },
            )
        from_beat_id = target[len("beat:"):]
        description = ""
        for entry in diff:
            description = entry.get("text") or entry.get("after") or ""
            if description:
                break
        if not description:
            return JSONResponse(
                status_code=422,
                content={
                    "error": "empty_description",
                    "detail": "ExtractCutawayProposal diff contains no description text",
                    "proposal_id": proposal_id,
                },
            )
        result = _executor_extract_cutaway.execute(
            from_beat_id=from_beat_id,
            description=description,
            project_id=proposal_project if proposal_project != "default" else None,
        )
        BUS.emit_sync(
            severity="success",
            scope=_BUS_SCOPE,
            summary=f"proposal approved + executed: {captured.get('title')}",
            payload={"id": proposal_id, "kind": kind, "execution_result": result},
        )
        return {"ok": True, "status": "executed", "result": result, "proposal_id": proposal_id}
```

### Smoke test

```python
def test_extract_cutaway_creates_new_shot(
    client: TestClient, tmp_path: Path, monkeypatch
) -> None:
    """Approve an ExtractCutawayProposal → new cutaway shot + source marked."""
    import json as _json
    from recoil.api.adapters import beats as _beats

    fake_project = "smoke_ec_project"
    shots_dir = tmp_path / fake_project / "state" / "visual" / "shots"
    shots_dir.mkdir(parents=True)
    source_beat = "EP001_SH05"
    source_data = {
        "shot_id": source_beat,
        "episode_id": "EP001",
        "takes": [],
        "status": "pending",
    }
    (shots_dir / f"{source_beat}.json").write_text(
        _json.dumps(source_data), encoding="utf-8"
    )

    monkeypatch.setattr(_beats, "_shots_dir", lambda pid: tmp_path / pid / "state" / "visual" / "shots")

    create_body = {
        "target": f"beat:{source_beat}",
        "title": "Smoke: extract cutaway",
        "est_cost_usd": 0.0,
        "est_time": "instant",
        "kind": "ExtractCutawayProposal",
        "project": fake_project,
        "diff": [{"kind": "cutaway", "text": "Close-up of the sealed envelope"}],
    }
    cr = client.post("/api/chat/proposals", json=create_body)
    assert cr.status_code == 200, cr.text
    proposal_id = cr.json()["id"]

    ar = client.post(
        f"/api/chat/proposals/{proposal_id}/approve",
        json={"project": fake_project},
    )
    assert ar.status_code == 200, ar.text
    result = ar.json()
    assert result.get("status") == "executed"

    # New cutaway file exists.
    cutaway_path = shots_dir / "EP001_SH05_CUT01.json"
    assert cutaway_path.exists()
    cutaway = _json.loads(cutaway_path.read_text())
    assert cutaway["shot_id"] == "EP001_SH05_CUT01"
    assert cutaway["cutaway_source"] == source_beat
    assert cutaway["prompt_override"] == "Close-up of the sealed envelope"
    assert cutaway["is_coverage"] is True

    # Source beat updated with cutaways list.
    source_updated = _json.loads((shots_dir / f"{source_beat}.json").read_text())
    assert "EP001_SH05_CUT01" in source_updated.get("cutaways", [])

    history = BUS.history()
    cut_events = [e for e in history if "extract_cutaway_applied" in e.summary]
    assert len(cut_events) >= 1
```

### Validation

```bash
grep -r "extract_cutaway" recoil/api/executors/ recoil/api/adapters/beats.py recoil/api/proposals_routes.py
grep -r "ExtractCutawayProposal" recoil/api/proposals_routes.py recoil/api/tests/

python -m pytest recoil/api/tests/test_smoke_integration.py::test_extract_cutaway_creates_new_shot -xvs
```

### Risks / open questions
- **Cutaway file sorts between source and next beat.** `EP001_SH05_CUT01.json` sorts after `EP001_SH05.json` and before `EP001_SH06.json` lexicographically, so cutaways appear adjacent to their source in the beat list. This is intentional and works without explicit ordering.
- **Non-atomic two-file mutation.** The cutaway file is written first, then the source is updated. If the process crashes between the two writes, a cutaway file exists but the source doesn't reference it. This is safe — the cutaway is valid on its own via `cutaway_source` and `coverage_of` fields. A future reconciliation pass could heal orphaned `cutaways` lists.

---

## Phase D: RefSwapProposal

depends_on: A (uses _find_beat)


### Resolution
- **Target format:** `"beat:<beat_id>"` (e.g. `"beat:EP001_SH02"`)
- **Resolution:** `beat_id = target[len("beat:"):]`
- **Wire format:**
  ```
  [{kind: "swap", before: "sadie_hero.png", after: "sadie_front.png"},
   {kind: "promptAdd", text: "Camera now shows profile angle"}]
  ```
  The `swap` entry uses the natural `before`/`after` fields of `_DiffEntry` directly. Optional `promptAdd` entries carry additional prompt text.
- **404 condition:** beat_id does not resolve to a shot file on disk
- **422 conditions:** missing `before` or `after` in the swap entry

### Diff schema
Dispatch extracts:
- First entry with `kind == "swap"` → `entry["before"]` (old ref identifier), `entry["after"]` (new ref identifier). Both required.
- All entries with `kind == "promptAdd"` → `entry["text"]` or `entry["after"]` (optional prompt additions)

### Write target
**Mutates existing file:** `projects/<project>/state/visual/shots/<beat_id>.json`

**Architecture decision: pattern (c) — `shot["ref_overrides"]`.**

Rationale: Refs live on the filesystem under `projects/<p>/output/refs/_canonical/`, resolved at generation time by `ref_resolver.py`. The shot/take dicts do NOT carry ref paths today. Rather than physically moving/renaming ref files (destructive, affects all shots sharing that ref) or modifying a non-existent manifest, we write a per-shot override field — exactly mirroring the `prompt_override` pattern. The ref resolver would need to be taught to read this field (scope boundary, same as prompt_override for the video pipeline).

Fields written:
- `shot["ref_overrides"]` — list of `{"before": str, "after": str}` dicts (append if field exists)
- `shot["prompt_additions"]` — list of strings (append if field exists; only if promptAdd entries present)

### Adapter

```python
# In recoil/api/adapters/beats.py

def set_ref_overrides(
    beat_id: str,
    swap_before: str,
    swap_after: str,
    prompt_additions: Optional[list[str]] = None,
    project_id: Optional[str] = None,
) -> dict:
    """Write a ref override entry to the shot dict for beat_id.

    Appends to shot["ref_overrides"] and optionally to shot["prompt_additions"].
    Raises KeyError if beat_id is not found.
    """
    path, shot = _find_beat(beat_id, project_id)
    overrides = shot.get("ref_overrides", [])
    overrides.append({"before": swap_before, "after": swap_after})
    shot["ref_overrides"] = overrides
    if prompt_additions:
        existing_adds = shot.get("prompt_additions", [])
        existing_adds.extend(prompt_additions)
        shot["prompt_additions"] = existing_adds
    atomic_write_json(path, shot)
    return {
        "shot_id": shot.get("shot_id") or path.stem,
        "ref_swap_applied": True,
        "swap": {"before": swap_before, "after": swap_after},
        "prompt_additions_count": len(prompt_additions) if prompt_additions else 0,
    }
```

Add `"set_ref_overrides"` to `__all__`.

### Executor

```python
# recoil/api/executors/ref_swap.py
"""Executor for RefSwapProposal.

Writes shot["ref_overrides"] to record a reference image swap.
Scope boundary: ref_resolver.py does not read this field yet —
pipeline wiring is a separate follow-on (mirrors prompt_override pattern).
"""
from __future__ import annotations

import logging
from typing import Optional

from fastapi import HTTPException

from recoil.api.adapters import beats as beats_adapter
from recoil.api.eventbus import BUS

logger = logging.getLogger(__name__)

_SCOPE = "api/executors/ref_swap"


def execute(
    beat_id: str,
    swap_before: str,
    swap_after: str,
    prompt_additions: Optional[list[str]] = None,
    project_id: Optional[str] = None,
) -> dict:
    """Write a ref swap override onto the shot dict for beat_id.

    Args:
        beat_id: Shot file stem (e.g. "EP001_SH02").
        swap_before: The ref identifier being replaced.
        swap_after: The replacement ref identifier.
        prompt_additions: Optional list of prompt text to append.
        project_id: Optional project slug.

    Returns:
        {"shot_id": str, "ref_swap_applied": True, "swap": {...}, ...}

    Raises:
        HTTPException(404): If beat_id not found on disk.
    """
    try:
        result = beats_adapter.set_ref_overrides(
            beat_id=beat_id,
            swap_before=swap_before,
            swap_after=swap_after,
            prompt_additions=prompt_additions,
            project_id=project_id,
        )
    except KeyError:
        BUS.emit_sync(
            severity="failure",
            scope=_SCOPE,
            summary=f"ref_swap_target_not_found: {beat_id}",
            payload={"beat_id": beat_id},
        )
        raise HTTPException(
            status_code=404,
            detail={
                "error": "beat_not_found",
                "beat_id": beat_id,
                "message": f"Cannot swap ref: beat {beat_id!r} not found on disk.",
            },
        )
    BUS.emit_sync(
        severity="success",
        scope=_SCOPE,
        summary=f"ref_swap_applied: {beat_id}",
        payload={
            **result,
            "beat_id": beat_id,
        },
    )
    return result
```

### Dispatch branch

```python
    # ── RefSwapProposal dispatch ───────────────────────────────────────
    if kind == "RefSwapProposal":
        if not target.startswith("beat:"):
            return JSONResponse(
                status_code=422,
                content={
                    "error": "invalid_target",
                    "detail": f"RefSwapProposal target must start with 'beat:'; got {target!r}",
                    "proposal_id": proposal_id,
                },
            )
        beat_id = target[len("beat:"):]
        swap_before = None
        swap_after = None
        prompt_additions = []
        for entry in diff:
            if entry.get("kind") == "swap":
                swap_before = entry.get("before")
                swap_after = entry.get("after")
            elif entry.get("kind") == "promptAdd":
                pa = entry.get("text") or entry.get("after")
                if pa:
                    prompt_additions.append(pa)
        if not swap_before or not swap_after:
            return JSONResponse(
                status_code=422,
                content={
                    "error": "incomplete_swap",
                    "detail": "RefSwapProposal diff must contain a 'swap' entry with before and after",
                    "proposal_id": proposal_id,
                },
            )
        result = _executor_ref_swap.execute(
            beat_id=beat_id,
            swap_before=swap_before,
            swap_after=swap_after,
            prompt_additions=prompt_additions or None,
            project_id=proposal_project if proposal_project != "default" else None,
        )
        BUS.emit_sync(
            severity="success",
            scope=_BUS_SCOPE,
            summary=f"proposal approved + executed: {captured.get('title')}",
            payload={"id": proposal_id, "kind": kind, "execution_result": result},
        )
        return {"ok": True, "status": "executed", "result": result, "proposal_id": proposal_id}
```

### Smoke test

```python
def test_ref_swap_writes_override(
    client: TestClient, tmp_path: Path, monkeypatch
) -> None:
    """Approve a RefSwapProposal → shot JSON gets ref_overrides field."""
    import json as _json
    from recoil.api.adapters import beats as _beats

    fake_project = "smoke_rs_project"
    shots_dir = tmp_path / fake_project / "state" / "visual" / "shots"
    shots_dir.mkdir(parents=True)
    beat_id = "EP001_SH02"
    shot_data = {"shot_id": beat_id, "takes": [], "status": "pending"}
    (shots_dir / f"{beat_id}.json").write_text(
        _json.dumps(shot_data), encoding="utf-8"
    )

    monkeypatch.setattr(_beats, "_shots_dir", lambda pid: tmp_path / pid / "state" / "visual" / "shots")

    create_body = {
        "target": f"beat:{beat_id}",
        "title": "Smoke: ref swap",
        "est_cost_usd": 0.0,
        "est_time": "instant",
        "kind": "RefSwapProposal",
        "project": fake_project,
        "diff": [
            {"kind": "swap", "before": "sadie_hero.png", "after": "sadie_front.png"},
            {"kind": "promptAdd", "text": "Profile angle, dramatic side lighting"},
        ],
    }
    cr = client.post("/api/chat/proposals", json=create_body)
    assert cr.status_code == 200, cr.text
    proposal_id = cr.json()["id"]

    ar = client.post(
        f"/api/chat/proposals/{proposal_id}/approve",
        json={"project": fake_project},
    )
    assert ar.status_code == 200, ar.text
    result = ar.json()
    assert result.get("status") == "executed"

    updated = _json.loads((shots_dir / f"{beat_id}.json").read_text())
    assert len(updated.get("ref_overrides", [])) == 1
    assert updated["ref_overrides"][0]["before"] == "sadie_hero.png"
    assert updated["ref_overrides"][0]["after"] == "sadie_front.png"
    assert "Profile angle, dramatic side lighting" in updated.get("prompt_additions", [])

    history = BUS.history()
    swap_events = [e for e in history if "ref_swap_applied" in e.summary]
    assert len(swap_events) >= 1
```

### Validation

```bash
grep -r "ref_swap" recoil/api/executors/ recoil/api/adapters/beats.py recoil/api/proposals_routes.py
grep -r "set_ref_overrides" recoil/api/adapters/beats.py
grep -r "RefSwapProposal" recoil/api/proposals_routes.py recoil/api/tests/

python -m pytest recoil/api/tests/test_smoke_integration.py::test_ref_swap_writes_override -xvs
```

### Risks / open questions
- **`ref_resolver.py` does not read `shot["ref_overrides"]` today.** The write path is complete; the read path is a follow-on. This mirrors the `prompt_override` / video pipeline gap — approved and understood as scope boundary. When the resolver is taught to read this field, the lookup cascade becomes: `shot["ref_overrides"]` → canonical filesystem refs.
- **Ref identifiers are opaque strings.** The spec does not validate that `swap_before` matches an actual ref on disk or that `swap_after` exists. Validation is the responsibility of the MCP tool creating the proposal (it should inspect the refs directory before proposing a swap). The executor is a pure write-path.
- **Multiple swaps accumulate.** Approving two RefSwapProposals on the same beat appends two entries to `ref_overrides`. The resolver (when wired) should apply them in order. No deduplication — if the same `before` is swapped twice, the last entry wins.

---

## Phase E: RetryStrategyEditProposal

depends_on: A (uses _find_beat)


### Resolution
- **Target format:** `"beat:<beat_id>"` (e.g. `"beat:EP001_SH02"`)
- **Resolution:** `beat_id = target[len("beat:"):]`
- **Wire format:**
  ```
  [{kind: "strategy", key: "name", after: "add_turnaround_angles"},
   {kind: "strategy", key: "rationale", text: "Proven winner for identity drift per pipeline-learnings §10g"}]
  ```
- **404 condition:** beat_id does not resolve to a shot file on disk
- **422 conditions:** missing strategy name; invalid strategy name (not in `RetryStrategyName` enum)

### Diff schema
Dispatch extracts:
- `key == "name"` → `entry["after"]` (strategy name string; must be a valid `RetryStrategyName.value`)
- `key == "rationale"` → `entry["after"]` or `entry["text"]` (required)

### Write target
**Mutates existing file:** `projects/<project>/state/visual/shots/<beat_id>.json`

Writes `shot["pinned_strategy"] = {"name": "<strategy_name>", "rationale": "<text>"}`. This is a replace, not append — only one strategy can be pinned at a time.

**Scope boundary:** `StrategyEngine.select_and_apply()` does NOT read this field today. `production_loop.py` does not exist yet. When the production loop is built, it should check `shot.get("pinned_strategy")` and bypass automatic strategy selection if set.

### Adapter

```python
# In recoil/api/adapters/beats.py

def pin_strategy(
    beat_id: str,
    strategy_name: str,
    rationale: str,
    project_id: Optional[str] = None,
) -> dict:
    """Pin a retry strategy onto the shot dict for beat_id.

    Writes shot["pinned_strategy"] = {"name": ..., "rationale": ...}.
    Raises KeyError if beat_id is not found.
    """
    path, shot = _find_beat(beat_id, project_id)
    shot["pinned_strategy"] = {"name": strategy_name, "rationale": rationale}
    atomic_write_json(path, shot)
    return {
        "shot_id": shot.get("shot_id") or path.stem,
        "pinned_strategy_set": True,
        "strategy_name": strategy_name,
    }
```

Add `"pin_strategy"` to `__all__`.

### Executor

```python
# recoil/api/executors/retry_strategy_edit.py
"""Executor for RetryStrategyEditProposal.

Pins a named retry strategy onto a shot's dict. Scope boundary:
StrategyEngine does not read shot["pinned_strategy"] yet — production
loop wiring is a separate follow-on.
"""
from __future__ import annotations

import logging
from typing import Optional

from fastapi import HTTPException

from recoil.api.adapters import beats as beats_adapter
from recoil.api.eventbus import BUS

logger = logging.getLogger(__name__)

_SCOPE = "api/executors/retry_strategy_edit"

try:
    from recoil.pipeline.orchestrator.strategy_registry import RetryStrategyName
    _VALID_STRATEGY_NAMES: frozenset[str] = frozenset(
        s.value for s in RetryStrategyName
    )
except ImportError:
    _VALID_STRATEGY_NAMES = frozenset()


def execute(
    beat_id: str,
    strategy_name: str,
    rationale: str,
    project_id: Optional[str] = None,
) -> dict:
    """Pin a retry strategy onto the shot dict for beat_id.

    Args:
        beat_id: Shot file stem (e.g. "EP001_SH02").
        strategy_name: A valid RetryStrategyName string value
            (e.g. "add_turnaround_angles").
        rationale: Human-readable justification for pinning this strategy.
        project_id: Optional project slug.

    Returns:
        {"shot_id": str, "pinned_strategy_set": True, "strategy_name": str}

    Raises:
        HTTPException(404): If beat_id not found on disk.
        HTTPException(422): If strategy_name is not a valid enum value.
    """
    if _VALID_STRATEGY_NAMES and strategy_name not in _VALID_STRATEGY_NAMES:
        raise HTTPException(
            status_code=422,
            detail={
                "error": "invalid_strategy_name",
                "strategy_name": strategy_name,
                "valid_names": sorted(_VALID_STRATEGY_NAMES),
                "message": f"Unknown retry strategy {strategy_name!r}",
            },
        )
    try:
        result = beats_adapter.pin_strategy(
            beat_id=beat_id,
            strategy_name=strategy_name,
            rationale=rationale,
            project_id=project_id,
        )
    except KeyError:
        BUS.emit_sync(
            severity="failure",
            scope=_SCOPE,
            summary=f"retry_strategy_edit_target_not_found: {beat_id}",
            payload={"beat_id": beat_id},
        )
        raise HTTPException(
            status_code=404,
            detail={
                "error": "beat_not_found",
                "beat_id": beat_id,
                "message": f"Cannot pin strategy: beat {beat_id!r} not found on disk.",
            },
        )
    BUS.emit_sync(
        severity="success",
        scope=_SCOPE,
        summary=f"retry_strategy_edit_applied: {beat_id} → {strategy_name}",
        payload={
            **result,
            "beat_id": beat_id,
            "rationale_length": len(rationale),
        },
    )
    return result
```

### Dispatch branch

```python
    # ── RetryStrategyEditProposal dispatch ─────────────────────────────
    if kind == "RetryStrategyEditProposal":
        if not target.startswith("beat:"):
            return JSONResponse(
                status_code=422,
                content={
                    "error": "invalid_target",
                    "detail": f"RetryStrategyEditProposal target must start with 'beat:'; got {target!r}",
                    "proposal_id": proposal_id,
                },
            )
        beat_id = target[len("beat:"):]
        strategy_name = None
        rationale = None
        for entry in diff:
            k = entry.get("key")
            if k == "name":
                strategy_name = entry.get("after")
            elif k == "rationale":
                rationale = entry.get("after") or entry.get("text")
        if not strategy_name:
            return JSONResponse(
                status_code=422,
                content={
                    "error": "missing_strategy_name",
                    "detail": "RetryStrategyEditProposal diff must include a 'name' key",
                    "proposal_id": proposal_id,
                },
            )
        if not rationale:
            return JSONResponse(
                status_code=422,
                content={
                    "error": "missing_rationale",
                    "detail": "RetryStrategyEditProposal diff must include a 'rationale' key",
                    "proposal_id": proposal_id,
                },
            )
        result = _executor_retry_strategy_edit.execute(
            beat_id=beat_id,
            strategy_name=strategy_name,
            rationale=rationale,
            project_id=proposal_project if proposal_project != "default" else None,
        )
        BUS.emit_sync(
            severity="success",
            scope=_BUS_SCOPE,
            summary=f"proposal approved + executed: {captured.get('title')}",
            payload={"id": proposal_id, "kind": kind, "execution_result": result},
        )
        return {"ok": True, "status": "executed", "result": result, "proposal_id": proposal_id}
```

### Smoke test

```python
def test_retry_strategy_edit_pins_strategy(
    client: TestClient, tmp_path: Path, monkeypatch
) -> None:
    """Approve a RetryStrategyEditProposal → shot JSON gets pinned_strategy."""
    import json as _json
    from recoil.api.adapters import beats as _beats

    fake_project = "smoke_rse_project"
    shots_dir = tmp_path / fake_project / "state" / "visual" / "shots"
    shots_dir.mkdir(parents=True)
    beat_id = "EP001_SH02"
    shot_data = {"shot_id": beat_id, "takes": [], "status": "pending"}
    (shots_dir / f"{beat_id}.json").write_text(
        _json.dumps(shot_data), encoding="utf-8"
    )

    monkeypatch.setattr(_beats, "_shots_dir", lambda pid: tmp_path / pid / "state" / "visual" / "shots")

    create_body = {
        "target": f"beat:{beat_id}",
        "title": "Smoke: pin retry strategy",
        "est_cost_usd": 0.0,
        "est_time": "instant",
        "kind": "RetryStrategyEditProposal",
        "project": fake_project,
        "diff": [
            {"kind": "strategy", "key": "name", "after": "add_turnaround_angles"},
            {"kind": "strategy", "key": "rationale", "text": "Proven winner for identity drift"},
        ],
    }
    cr = client.post("/api/chat/proposals", json=create_body)
    assert cr.status_code == 200, cr.text
    proposal_id = cr.json()["id"]

    ar = client.post(
        f"/api/chat/proposals/{proposal_id}/approve",
        json={"project": fake_project},
    )
    assert ar.status_code == 200, ar.text
    result = ar.json()
    assert result.get("status") == "executed"

    updated = _json.loads((shots_dir / f"{beat_id}.json").read_text())
    pinned = updated.get("pinned_strategy")
    assert pinned is not None
    assert pinned["name"] == "add_turnaround_angles"
    assert pinned["rationale"] == "Proven winner for identity drift"

    history = BUS.history()
    strat_events = [e for e in history if "retry_strategy_edit_applied" in e.summary]
    assert len(strat_events) >= 1
```

### Validation

```bash
grep -r "retry_strategy_edit" recoil/api/executors/ recoil/api/adapters/beats.py recoil/api/proposals_routes.py
grep -r "pin_strategy" recoil/api/adapters/beats.py
grep -r "RetryStrategyEditProposal" recoil/api/proposals_routes.py recoil/api/tests/

python -m pytest recoil/api/tests/test_smoke_integration.py::test_retry_strategy_edit_pins_strategy -xvs
```

### Risks / open questions
- **StrategyEngine does not read `shot["pinned_strategy"]` today.** Write path is complete; read path integration into production_loop.py is a follow-on. When wired, the loop should check this field BEFORE calling `select_and_apply()` and skip automatic selection if a strategy is pinned.
- **Strategy validation uses try/except import.** The executor attempts to import `RetryStrategyName` from `strategy_registry.py` for validation. If the import fails (strategy_registry has heavy pipeline deps), validation is skipped and any string is accepted. This is a pragmatic tradeoff — the executor shouldn't fail to load just because the pipeline orchestrator has import issues. The BUS event still records the exact strategy name for auditability.
- **`pinned_strategy` is replace-not-append.** Approving a second RetryStrategyEditProposal on the same beat overwrites the previous pin. This is intentional — only one strategy should be pinned at a time. To un-pin, a separate "remove pinned strategy" operation would be needed (out of scope — the user can approve a different strategy to overwrite, or manually edit the JSON).
- **This kind does NOT need a deeper consultation.** The write path is straightforward, the strategy registry has a clean enum to validate against, and the scope boundary (no production_loop.py read path) is documented. The `from_score_card` bridge in strategy_registry.py is substrate-only and unrelated to the pin mechanism.

---

---

## Summary of all changes (cross-phase)


| File | Changes |
|------|---------|
| `recoil/api/adapters/beats.py` | Add `_find_beat`, `_next_beat_id`, `_next_cutaway_id`, `insert_beat`, `add_directive_to_beats`, `extract_cutaway`, `set_ref_overrides`, `pin_strategy`. Update `__all__`. |
| `recoil/api/executors/beat_insertion.py` | New file |
| `recoil/api/executors/multi_beat_directive.py` | New file |
| `recoil/api/executors/extract_cutaway.py` | New file |
| `recoil/api/executors/ref_swap.py` | New file |
| `recoil/api/executors/retry_strategy_edit.py` | New file |
| `recoil/api/proposals_routes.py` | Add 5 import lines + 5 dispatch branches before the 501 fallback |
| `recoil/api/tests/test_smoke_integration.py` | Add 5 test functions |

---

## Verification Checklist (run after all 5 phases)

```bash
# 1. All tests pass
\
PYTHONPATH=/Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS \
  pytest recoil/api/tests/ -v --tb=short

# 2. All 5 new executor files exist and pass syntax
python3 -c "
import ast, pathlib
expected = {'beat_insertion.py', 'multi_beat_directive.py', 'extract_cutaway.py', 'ref_swap.py', 'retry_strategy_edit.py'}
present = {f.name for f in pathlib.Path('recoil/api/executors').glob('*.py')}
missing = expected - present
assert not missing, f'missing executors: {missing}'
for name in expected:
    ast.parse(pathlib.Path('recoil/api/executors') / name).read_text() if False else ast.parse(open(f'recoil/api/executors/{name}').read())
print('all 5 executors OK')
"

# 3. All 5 dispatch branches present in proposals_routes.py
for kind in BeatInsertionProposal MultiBeatDirectiveProposal ExtractCutawayProposal RefSwapProposal RetryStrategyEditProposal; do
  grep -q "if kind == \"$kind\"" recoil/api/proposals_routes.py && echo "$kind: dispatch wired" || echo "$kind: MISSING"
done

# 4. _find_beat helper present in beats.py
grep -q "def _find_beat" recoil/api/adapters/beats.py && echo "_find_beat: present"

# 5. New adapter methods exposed in __all__
python3 -c "
from recoil.api.adapters import beats
need = {'insert_beat', 'add_directive_to_beats', 'extract_cutaway', 'set_ref_overrides', 'pin_strategy'}
present = set(beats.__all__) & need
missing = need - present
assert not missing, f'__all__ missing: {missing}'
print('adapter __all__: complete')
"
```

---

## Known Integration Gaps (carried from Phase 4 consult)

1. **`pinned_strategy` not read by production_loop.py.** RetryStrategyEditProposal writes `shot["pinned_strategy"]` but `production_loop.py` does not check it before calling `select_and_apply()`. Pin path complete; read path is a follow-on. Same pattern as the original prompt_override gap.

2. **`ref_overrides` not read by ref_resolver.** RefSwapProposal writes `shot["ref_overrides"][before] = after` but `recoil/core/ref_resolver.py:resolve_entity_refs()` does not consult this dict at generation time. Swap is recorded but not applied. Follow-on.

3. **MultiBeatDirective notes are advisory only.** `add_directive_to_beats` appends to `shot["directives"][]` but no generation path reads this field today. Operators see directives in the JSON; the engine doesn't yet act on them.

4. **ExtractCutaway creates new shot JSON only.** Source beat is unmodified. Hierarchy / episode-arc files are not updated. Downstream tools that index beats from those files will not see the new cutaway until they re-index.

These gaps mirror the original `prompt_override` write-only gap surfaced in console-v2-prod Phase 1. They are non-blocking for Phase 4 — the proposal lifecycle works (create → approve → executor fires → BUS event), but the engine read-path for each kind requires a separate wire-up phase. Flag each in the BUS event payload (`integration_gap` key) so operators see the gap in the events drawer.
