# BUILD_SPEC — Coverage Pass Generation Architecture

**Generated:** 2026-04-13
**Input:** consultations/recoil/coverage-pass-generation-architecture/SYNTHESIS.md
**Detail level:** high
**Visual design:** no
**Phases:** 10
**Estimated build time:** 3-5 hours
**Working directory:** ~/Dropbox/CLAUDE_PROJECTS/recoil

## Validation command

```bash
cd ~/Dropbox/CLAUDE_PROJECTS/recoil && python3 -c "
import ast, sys
for f in [
    'execution/step_types.py',
    'execution/pass_store.py',
    'pipeline/lib/prompt_engine.py',
    'execution/assembler.py',
    'execution/step_runner.py',
    'pipeline/orchestrator/coverage_planner.py',
    'pipeline/orchestrator/production_loop.py',
    'pipeline/tools/prep_character_angles.py',
]:
    try:
        ast.parse(open(f).read())
        print(f'  OK: {f}')
    except Exception as e:
        print(f'FAIL: {f}: {e}')
        sys.exit(1)
print('All files parse clean')
" && cd pipeline && PYTHONPATH=.. python3 -m pytest tests/ -q --tb=short -x 2>&1 | tail -5
```

---

## Phase 1: Data Types + PassStore

### Files to create
- `execution/pass_store.py` — New parallel store for pass-level state
- `pipeline/lib/pass_store.py` — Proxy module (same pattern as pipeline/lib/execution_store.py)

### Files to modify
- `execution/step_types.py` — Add PassResult, SegmentResult dataclasses

### Requirements

**In `step_types.py`, add after StepResult (line ~86):**

```python
@dataclass(frozen=True)
class SegmentResult:
    """Result of a single segment within a coverage pass."""
    source_shot_id: str
    segment_index: int
    timestamp_start_s: float
    timestamp_end_s: float
    boundary_frame_path: Optional[str] = None
    identity_score: Optional[float] = None
    usable: bool = True

@dataclass(frozen=True)
class PassResult:
    """Result of a coverage pass generation."""
    pass_id: str
    success: bool
    video_path: Optional[str]
    cost_usd: float
    segment_results: tuple = ()  # tuple[SegmentResult, ...] — immutable like StepResult
    model: str
    pipeline: str = "coverage_pass"
    error: Optional[str] = None
    take_index: int = -1
    expected_cuts: int = 0
    detected_cuts: int = 0
```

**`pass_store.py` — New file:**

Pattern identical to ExecutionStore but keyed by `pass_id`. One JSON file per episode: `ep_001_pass_state.json` in the project's `state/visual/passes/` directory. Use `core.paths.PROJECTS_ROOT` for path resolution. Create the `passes/` subdirectory on first write.

- `PassStore.__init__(self, project: str)` — sets state dir
- `create_pass(self, pass_id: str, segment_shot_ids: list[str])` — creates pass record
- `update_pass(self, pass_id: str, **fields)` — merges fields into pass record
- `get_pass(self, pass_id: str) -> dict | None` — reads pass record
- `append_pass_take(self, pass_id: str, take: dict)` — appends to takes list
- `link_pass_to_shots(self, pass_id: str, shot_id_map: dict)` — updates constituent shots with `coverage_pass_id`
- `list_passes(self, episode_id: str) -> list[dict]` — returns all passes for an episode
- Thread-safe via internal lock (same pattern as ExecutionStore)
- Atomic writes: write to temp file, os.replace

Pass record schema:
```json
{
    "pass_id": "EP001_PASS_003_L_SADIE_B",
    "status": "pending",
    "segment_shot_ids": ["EP001_SH03", "EP001_SH04", "EP001_SH05"],
    "video_path": null,
    "cost_usd": 0.0,
    "segment_timestamps": {},
    "expected_cuts": 2,
    "detected_cuts": 0,
    "takes": [],
    "retry_strategy": null,
    "created_at": 0.0,
    "updated_at": 0.0
}
```

### What already exists
- `execution/execution_store.py` — Use as reference for file I/O patterns, locking, atomic writes
- `execution/step_types.py` — StepResult, GateVerdict patterns to follow

### Scope boundary
- Do NOT modify ExecutionStore — PassStore is parallel, not integrated
- Do NOT add episode-level aggregation (that's production_loop's job)

### Validation
```bash
python3 -c "
import ast; ast.parse(open('execution/step_types.py').read())
ast.parse(open('execution/pass_store.py').read())
from execution.step_types import PassResult, SegmentResult
from execution.pass_store import PassStore
print('Phase 1 OK')
"
```

---

## Phase 2: Prompt Engine — build_seeddance_r2v_prompt()

### Files to modify
- `pipeline/lib/prompt_engine.py` — Add new r2v prompt builder + wire into compile_all_prompts

### Requirements

**New function: `build_seeddance_r2v_prompt()`**

Builds a 3-zone prompt for SeedDance r2v from plan shots + bible data:

**Zone A — Reference declarations:**
```
@Image1 is [character] — [2-3 visual traits from bible]
@Image2 shows [character] from [angle].
@Image3 is [location] — [core spatial description from bible]
```

Each @ImageN corresponds to the ref at that upload index. The prompt builder emits `@Image{ref_type_N}` placeholder tokens (e.g., `@Image{identity_1}`, `@Image{scene_1}`). The assembler resolves these to concrete indices.

**Zone B — Shot list:**
```
Shot 1: Static [shot_type], [focal_length]. [Character] from @Image1 in [location] from @Image3. [Action]. [Duration]s.
Shot 2: [Camera]. Cut to [subject]. [Action]. [Duration]s.
```

- One action, one camera movement, one emotion per shot
- Duration in seconds at end of each shot line
- Include "Cut to" between shots for explicit transitions
- Anchor shot (Shot 1) should be 1 second if refs are scene-contextual, normal duration if refs are clean bg

**Zone C — Global constraints:**
```
[Style description]. Ambient audio only: [scene sounds]. No music.
```

**Function signature:**
```python
def build_seeddance_r2v_prompt(
    shots: list[dict],
    bible: dict,
    project_config: dict,
    episode: int = 1,
    ref_manifest: dict | None = None,
    anchor_duration_s: int = 1,
) -> str:
```

- `ref_manifest` maps ref_type to @ImageN index: `{"identity_1": 1, "identity_2": 2, "scene_1": 3}`
- If `ref_manifest` is None, emit placeholder tokens for assembler resolution
- Pull character visual descriptions from `bible["characters"]`
- Pull location descriptions from `bible["locations"]`
- JIT hydrate skeletons (same pattern as `build_wan_r2v_prompt`)
- Respect prompt length: 4-8 sentences for multi-shot (from PROMPT_BIBLE)
- Include `"No music"` in Zone C unless project_config specifies otherwise

**Wire into `compile_all_prompts()` at line ~3333:**
Replace `compiled["seeddance_t2v"] = build_kling_t2v_prompt(shot)` with:
```python
compiled["seeddance_r2v"] = build_seeddance_r2v_prompt(
    shots=[shot],
    bible=bible,
    project_config=project_config,
    episode=episode,
)
compiled["seeddance_t2v"] = compiled["seeddance_r2v"]  # alias for backward compat
```

Also add a multi-shot variant:
```python
def build_seeddance_r2v_prompt_multi(
    shots: list[dict],
    bible: dict,
    project_config: dict,
    episode: int = 1,
    coverage_pass_dict: dict | None = None,
    ref_manifest: dict | None = None,
    anchor_duration_s: int = 1,
) -> str:
```
This is the multi-shot version that builds from coverage pass segments. Takes a plain dict (NOT the CoveragePass class — prompt_engine must not import from orchestrator). The dict has keys: `segments`, `arc_preamble`, `focus_character`, `location_id`. The `anchor_duration_s` parameter overrides Shot 1's duration (default 1s to lock identity without copying refs). If `anchor_duration_s` is 0 or None, use the segment's natural duration.

### What already exists
- `build_wan_r2v_prompt()` at line 3049 — similar structure, use as reference
- `_build_character_lines()` — helper for bible character descriptions
- `_flatten_lighting_to_prose()` — lighting descriptor helper
- `hydrate_skeleton()` from `lib.jit_prompt` — JIT hydration

### Scope boundary
- Do NOT modify existing prompt builders (kling, veo, wan)
- Do NOT change the PromptPackage structure
- Placeholder tokens (`@Image{identity_1}`) are resolved by the assembler, not here

### Validation
```bash
python3 -c "
import ast; ast.parse(open('pipeline/lib/prompt_engine.py').read())
" && grep -q 'def build_seeddance_r2v_prompt' pipeline/lib/prompt_engine.py && \
grep -q 'seeddance_r2v' pipeline/lib/prompt_engine.py && \
grep -q 'Zone A' pipeline/lib/prompt_engine.py && \
echo "Phase 2 OK"
```

---

## Phase 3: Assembler — SeedDance R2V Ref Resolution

### Files to modify
- `execution/assembler.py` — Add r2v ref resolution, rename weight map

### Requirements

**Rename `_SEEDDANCE_WEIGHT_MAP` to `_SEEDDANCE_UPLOAD_PRIORITY` at ALL 3 sites:**
1. Constant definition (~line 806)
2. Usage in `_int_weight_to_seeddance()` (~line 818) — also rename function to `_ref_to_upload_priority()`
3. Call site (~line 687)
```python
_SEEDDANCE_UPLOAD_PRIORITY = {
    "identity": 0.85,    # @Image1-2 (character hero + angles)
    "keyframe": 0.70,    # @Image3 (if used as visual anchor)
    "prop": 0.50,        # @Image4-5
    "expression": 0.40,  # @Image6
    "scene": 0.25,       # @Image7-8 (location/style)
}
```

**New function: `resolve_seedance_r2v_refs()`**
```python
def resolve_seedance_r2v_refs(
    prompt_text: str,
    ref_list: list,  # list of ref objects with .path, .ref_type
) -> tuple[str, list[str]]:
    """Resolve @Image{ref_type_N} placeholders to @ImageN based on upload order.
    
    1. Sort refs by _SEEDDANCE_UPLOAD_PRIORITY (highest first)
    2. Assign @Image1 through @ImageN based on sorted position
    3. Replace placeholder tokens in prompt_text
    4. Return (resolved_prompt, ordered_file_paths)
    """
```

- Sort refs by `_SEEDDANCE_UPLOAD_PRIORITY[ref.ref_type]` descending
- Identity refs first, then keyframe, prop, expression, scene
- Within same type, preserve original order
- Replace `@Image{identity_1}` → `@Image1`, `@Image{identity_2}` → `@Image2`, etc.
- Return the ordered file paths (for upload in that order)

**Update `allocate_references()` for SeedDance r2v pipeline:**
Add a new branch for `pipeline == "r2v"`:
```python
if "seeddance" in model and pipeline == "r2v":
    slots_left = 9
    allocation["identity"] = min(num_chars * 2, 4)  # max 4 identity refs
    slots_left -= allocation["identity"]
    allocation["scene"] = 1  # always 1 location ref
    slots_left -= 1
    if has_props and slots_left > 0:
        allocation["prop"] = min(1, slots_left)
    return allocation
```

### What already exists
- `allocate_references()` at line 45 — add r2v branch to existing seeddance block
- `_SEEDDANCE_WEIGHT_MAP` — rename (grep for all references and update)

### Scope boundary
- Do NOT change allocation logic for other models
- Do NOT modify the ref object structure — just sort and resolve

### Validation
```bash
python3 -c "
import ast; ast.parse(open('execution/assembler.py').read())
" && grep -q '_SEEDDANCE_UPLOAD_PRIORITY' execution/assembler.py && \
grep -q 'resolve_seedance_r2v_refs' execution/assembler.py && \
grep -q "pipeline == .r2v." execution/assembler.py && \
echo "Phase 3 OK"
```

---

## Phase 4: StepRunner — execute_pass()

### Files to modify
- `execution/step_runner.py` — Add execute_pass() method

### Requirements

**New method after `execute_wan_r2v()` (~line 1905):**

```python
def execute_pass(
    self,
    pass_id: str,
    prompt: str,
    reference_image_paths: list[Path],
    segment_shot_ids: list[str],
    expected_segment_timestamps: list[tuple[float, float]],
    model: str = "seeddance-2.0",
    duration: int = 10,
    aspect_ratio: str = "9:16",
    gates: list[GateFunction] | None = None,
    on_status=None,
) -> PassResult:
```

**CRITICAL DESIGN:** `execute_pass()` takes primitive types (str, Path, list), NOT CoveragePass objects. This is because `step_runner.py` lives in `execution/` which cannot import from `pipeline/orchestrator/`. The caller (production_loop.py, which CAN import both) is responsible for building the prompt (via prompt_engine) and resolving refs (via assembler) BEFORE calling execute_pass(). This matches the execute_wan_r2v() pattern: it takes `prompt: str` + `reference_image_paths: list[Path]`.

**Lifecycle:**
1. Upload refs to fal storage via `client._upload_to_fal(str(path))` where `client = get_client("seeddance-2.0")`
2. Build SeedDance payload as a plain dict with `reference_images` key (NOT `image_urls` — the client converts internally). The `reference_images` key triggers r2v endpoint routing in `SeedDanceClient.submit()`
3. Call `job = client.submit(payload)` — this is ASYNC (returns immediately with request_id). Unlike Wan R2V which blocks, SeedDance uses queue polling.
4. Call `result = client.wait_for_job(job, timeout_s=900)` — MUST call this for SeedDance. 900s timeout for multi-shot r2v.
5. On success: save video, extract segment boundary frames via ffmpeg
6. Run gates on extracted frames (if gates provided)
7. Build PassResult with SegmentResults
8. Return PassResult

**Key implementation details:**
- Prompt is PRE-BUILT by the caller — execute_pass() does not call prompt_engine
- Refs are PRE-ORDERED by the caller — execute_pass() uploads them in the given order
- Do NOT use SeedDancePayload dataclass — use a plain dict: `{"prompt": ..., "reference_images": [...], "duration": ..., "aspect_ratio": ..., "generate_audio": True}`
- Video saved as `{pass_id}_take{N}.mp4` in the episode's video dir
- Cost = duration_s * model cost_per_second (from model_profiles)
- If API fails, return PassResult with success=False
- If gates fail on any segment, mark that segment as `usable=False` in SegmentResult

**Boundary frame extraction helper (private method):**
```python
def _extract_boundary_frames(
    self, video_path: Path, segment_timestamps: list[tuple[float, float]]
) -> list[Path]:
    """Extract one frame per segment at 0.5s into each segment."""
```
Uses `ffmpeg -ss {ts} -i {video} -frames:v 1` per segment.

### What already exists (from prior phases)
- Phase 1: PassResult, SegmentResult in step_types.py
- Phase 1: PassStore in pass_store.py (+ proxy at pipeline/lib/pass_store.py)
- Phase 2: build_seeddance_r2v_prompt_multi() in prompt_engine.py (called by production_loop, NOT step_runner)
- Phase 3: resolve_seedance_r2v_refs() in assembler.py (called by production_loop, NOT step_runner)
- SeedDanceClient with `image_urls` fix (already applied this session)
- `execute_wan_r2v()` — similar SIGNATURE pattern (prompt: str, ref_paths: list[Path]) but DIFFERENT async behavior: Wan blocks on submit(), SeedDance requires wait_for_job()

### Scope boundary
- Do NOT modify execute_video() or execute_wan_r2v()
- Do NOT add retry logic here — that's production_loop's job
- Do NOT auto-split the video file — only extract boundary frames

### Validation
```bash
python3 -c "
import ast; ast.parse(open('execution/step_runner.py').read())
" && grep -q 'def execute_pass' execution/step_runner.py && \
grep -q '_extract_boundary_frames' execution/step_runner.py && \
grep -q 'return PassResult' execution/step_runner.py && \
grep -q 'from execution.step_types import.*PassResult' execution/step_runner.py && \
grep -q 'reference_images' execution/step_runner.py && \
grep -q 'wait_for_job' execution/step_runner.py && \
echo "Phase 4 OK"
```

---

## Phase 5: Coverage Planner Updates

### Files to modify
- `pipeline/orchestrator/coverage_planner.py` — Update model routing, remove Format A generation
- `config/pipeline_config.json` — Update coverage_strategy model_routing and format_mapping

### Requirements

**Model routing update in `coverage_planner.py` (line ~554):**
Change default model routing to use SeedDance for most passes:
```python
if is_env:
    model = routing.get("env_any_tier", "veo-3.1")
elif max_tier >= 3:
    model = routing.get("climax", "kling-v3")
else:
    model = routing.get("character_default", "seeddance-2.0")
```

**Format A removal:**
Modify `_assign_format()` to never return Format A:
```python
def _assign_format(tier: int, pass_type: str, segment_count: int) -> list[str]:
    if pass_type == "env":
        return ["B"]
    # Format A removed — on-demand via Claude Code instead of pre-generating
    # Multi-angle coverage is requested explicitly, not pre-generated
    if tier >= 3:
        return ["C"]  # Coverage reactions at climax only
    return ["B"]
```

Remove the Format A angle loop in `build_passes()` (lines ~566-621). Simplify to only generate Format B and C passes.

**Config update in `pipeline_config.json`:**
```json
"model_routing": {
    "env_any_tier": "veo-3.1",
    "character_default": "seeddance-2.0",
    "climax": "kling-v3"
},
"format_mapping": {
    "0": ["B"],
    "1": ["B"],
    "2": ["B"],
    "3": ["C"]
}
```

### What already exists
- `build_passes()` at line 459 — the core function to modify
- `_assign_format()` at line 250 — simplify
- Format A angle loop at lines 566-621 — remove entirely
- `pipeline_config.json` has `coverage_strategy` section

### Scope boundary
- Do NOT modify CoveragePass dataclass (keep format_a_angle_index for backward compat)
- Do NOT change the contiguous run grouping logic
- Do NOT change duration chunking or element budget logic

### Validation
```bash
python3 -c "
import ast; ast.parse(open('pipeline/orchestrator/coverage_planner.py').read())
import json; cfg = json.load(open('config/pipeline_config.json'))
routing = cfg['coverage_strategy']['model_routing']
assert 'character_default' in routing, 'Missing character_default routing'
assert routing['character_default'] == 'seeddance-2.0', 'Wrong default model'
fmt = cfg['coverage_strategy']['format_mapping']
assert 'A' not in str(fmt.values()), 'Format A should be removed'
print('Phase 5 OK')
"
```

---

## Phase 6: Scene Detection + Segment QC

### Files to create
- `pipeline/lib/scene_detect.py` — Lightweight scene detection wrapper

### Requirements

**`scene_detect.py` — New module:**

Wraps ffprobe/ffmpeg for scene change detection. No PySceneDetect dependency (keep it simple).

```python
def detect_scene_changes(
    video_path: Path,
    threshold: float = 0.3,
) -> list[float]:
    """Detect scene change timestamps using ffprobe scene detection.
    
    Returns list of timestamps (seconds) where cuts were detected.
    Uses ffprobe's 'select=gt(scene,threshold)' filter.
    """

def extract_frames_at(
    video_path: Path,
    timestamps: list[float],
    output_dir: Path | None = None,
) -> list[Path]:
    """Extract still frames at specific timestamps.
    
    Uses ffmpeg -ss {ts} -frames:v 1 for each timestamp.
    Output: {video_stem}_frame_{ts:.1f}s.png
    """

def validate_cut_count(
    video_path: Path,
    expected_cuts: int,
    threshold: float = 0.3,
) -> dict:
    """Compare detected cuts to expected.
    
    Returns: {
        "expected": int, "detected": int, "match": bool,
        "timestamps": list[float],
        "status": "exact_match" | "over_cut" | "under_cut"
    }
    """
```

- Uses subprocess to call ffmpeg (no heavy dependencies)
- Add `shutil.which("ffmpeg")` check at module import — raise RuntimeError("ffmpeg not found — install via 'brew install ffmpeg'") if missing
- Scene detection via ffmpeg select filter (more robust than ffprobe lavfi): `ffmpeg -i {path} -vf "select='gt(scene,{threshold})',showinfo" -f null - 2>&1` — parse timestamps from showinfo output
- Frame extraction: `ffmpeg -y -ss {ts} -i {path} -frames:v 1 -q:v 2 {output}`

### What already exists
- ffmpeg is available on both MacBook and Mac Studio
- `_extract_boundary_frames` in StepRunner (Phase 4) uses ffmpeg directly

### Scope boundary
- No PySceneDetect dependency — use ffprobe only
- No video splitting — only detection and frame extraction
- This is a utility module, not integrated into any pipeline step (Phase 7 does integration)

### Validation
```bash
python3 -c "
import ast; ast.parse(open('pipeline/lib/scene_detect.py').read())
from pipeline.lib.scene_detect import detect_scene_changes, extract_frames_at, validate_cut_count
print('Phase 6 OK')
"
```

---

## Phase 7: Pole Position Retry in Production Loop

### Files to modify
- `pipeline/orchestrator/production_loop.py` — Add pass iteration + pole position retry

### Requirements

**New method: `_execute_pass_batch()`**

Parallel to the existing shot-by-shot execution path. Reads `ep_NNN_passes.json` (from coverage planner --lock) and iterates passes:

```python
def _execute_pass_batch(self, passes: list[dict]) -> list[PassResult]:
    """Execute coverage passes sequentially. Phase 1 = single-threaded."""
```

**Import PassStore via:** `from execution.pass_store import PassStore` (sys.path insert at line 29-31 makes recoil root available). Import PassResult/SegmentResult via `from execution.step_types import PassResult, SegmentResult`. Do NOT try orchestrator proxy imports — these types have no proxy.

For each pass:
1. Convert CoveragePass to primitive args (production_loop CAN import CoveragePass — it lives in pipeline/)
2. Build prompt via `build_seeddance_r2v_prompt_multi()` (prompt_engine is accessible from pipeline/)
3. Resolve refs via `resolve_seedance_r2v_refs()` — get ordered paths + resolved prompt
4. Call `self._runner.execute_pass(pass_id, prompt, ref_paths, shot_ids, timestamps, ...)`
5. On success: validate cut count, extract boundary frames, run identity gate
6. On segment failure: queue pole position retry
7. **On content filter failure**: if error matches content_filter patterns, call `soften_prompt()` on the pass prompt and retry once (anime close-ups trigger SeedDance filters — this is a known issue)
8. Record provenance, update PassStore

**New method: `_build_retry_pass()`**

Implements pole position retry:
```python
def _build_retry_pass(
    self,
    failed_pass: CoveragePass,
    failed_segment_index: int,
    all_scene_shots: list[dict],
) -> CoveragePass:
    """Build a retry pass with the failed shot in pole position.
    
    Strategy A (default): Same scene, reordered.
    [failed_shot, neighbor_before, neighbor_after]
    
    Strategy B (fallback): Forward continuation.
    Used when neighbors would cross a scene boundary.
    """
```

- Default: Option A (same-scene reorder) — failed shot first, then neighbors
- If no same-scene neighbors: Option B (forward continuation)
- Swap @Image1 to the failed shot's specific turnaround angle
- Three strikes on same shot → escalate to single-shot Kling i2v (set `retry_strategy: "single_shot_fallback"`)
- Track retry count per shot_id in PassStore

**Wire into main loop:**
Add a `run_passes()` method that:
1. Checks if `ep_NNN_passes.json` exists for the episode
2. If yes, runs pass-level generation via `_execute_pass_batch()`
3. If no, falls back to existing shot-by-shot path
4. Can be triggered by BatchConfig flag: `use_coverage_passes: bool = False`

### What already exists (from prior phases)
- Phase 1: PassStore, PassResult, SegmentResult
- Phase 4: StepRunner.execute_pass()
- Phase 5: Updated coverage_planner with SeedDance routing
- Phase 6: scene_detect.validate_cut_count()
- RetryDispatcher — existing retry infrastructure (reuse failure classification)
- FeedbackAgent — existing fix suggestion system
- AutonomyController — existing auto-approve

### Scope boundary
- Do NOT modify the existing shot-by-shot execution path
- Do NOT change RetryDispatcher's failure categories (add pass-level handling alongside)
- Pass execution is a parallel path selected by config flag
- Do NOT implement cross-pass continuity checks (future work)

### Validation
```bash
python3 -c "
import ast; ast.parse(open('pipeline/orchestrator/production_loop.py').read())
" && grep -q '_execute_pass_batch' pipeline/orchestrator/production_loop.py && \
grep -q '_build_retry_pass' pipeline/orchestrator/production_loop.py && \
grep -q 'pole.position\|pole_position' pipeline/orchestrator/production_loop.py && \
echo "Phase 7 OK"
```

---

## Phase 8: QwenMA Casting Integration

### Files to modify
- `pipeline/tools/prep_character_angles.py` — Add QwenMA as alternative angle generation path alongside existing Gemini NBP path

### Requirements

**IMPORTANT:** The existing file uses Gemini NBP for angle generation, NOT QwenMA. Do NOT replace the Gemini path — add QwenMA as a new generation backend that can be selected via a `--backend qwen-ma` flag (default remains `gemini`).

**New function: `_generate_angles_qwen_ma()`**

Uses fal.ai endpoint `fal-ai/qwen-image-edit-2511-multiple-angles`:

```python
def _generate_angles_qwen_ma(
    hero_url: str,
    angles: list[dict],
    output_dir: Path,
) -> list[Path]:
    """Generate multi-angle views via QwenMA LoRA on fal.ai.
    
    Each angle dict has: name, horizontal_angle (0-360), vertical_angle (-30 to 30), zoom (0-10).
    API field is 'image_urls' (list), NOT 'image_url' (singular).
    Result is in result['image']['url'] or result['images'][0]['url'].
    Cost: ~$0.035/image.
    """
```

**Default angle config for QwenMA:**
```python
QWEN_MA_ANGLES = [
    {"name": "front",           "horizontal_angle": 0,   "vertical_angle": 0,  "zoom": 5},
    {"name": "three_quarter",   "horizontal_angle": 45,  "vertical_angle": 0,  "zoom": 5},
    {"name": "profile",         "horizontal_angle": 90,  "vertical_angle": 0,  "zoom": 5},
    {"name": "back",            "horizontal_angle": 180, "vertical_angle": 0,  "zoom": 5},
]
```

**Wire into existing CLI:**
Add `--backend` argument to the argparse: `choices=["gemini", "qwen-ma"], default="gemini"`. When `qwen-ma` is selected, upload hero via `fal_client.upload_file()`, call `_generate_angles_qwen_ma()`, then continue into existing rembg + Gate 0 pipeline.

**Sidecar provenance:**
When saving turnarounds, write sidecar JSON with `"model": "qwen-ma"`, `"source_hero": hero_path`, `"angle_params": {...}`.

### What already exists
- `prep_character_angles.py` has full Path A (hero → angles via Gemini NBP) and Path B (text → grid → hero → angles)
- The tool already handles rembg, white background, Gate 0 validation
- DEFAULT_ANGLES = ["front", "profile", "three_quarter", "back"]
- Codename mapping for IP-safe generation
- `tools/_archive/test_qwen_multiangle.py` — old QwenMA test code with the correct fal.ai endpoint and numeric angle parameters (use as reference)

### Scope boundary
- Do NOT replace the Gemini NBP path — add QwenMA alongside it
- Do NOT change the rembg/Gate 0 pipeline — QwenMA outputs feed into the same downstream
- Do NOT add new angles beyond the 4 defaults

### Validation
```bash
python3 -c "
import ast; ast.parse(open('pipeline/tools/prep_character_angles.py').read())
" && grep -q "qwen.ma\|qwen_ma" pipeline/tools/prep_character_angles.py && \
grep -q "image_urls" pipeline/tools/prep_character_angles.py && \
grep -q "fal-ai/qwen-image-edit" pipeline/tools/prep_character_angles.py && \
echo "Phase 8 OK"
```

---

## Phase 9: Config Updates

### Files to modify
- `config/model_profiles.json` — Update SeedDance cost, add QwenMA
- `config/PROMPT_BIBLE.yaml` — Already updated this session, verify consistency

### Requirements

**model_profiles.json — SeedDance 2.0 cost correction:**
Update `cost_per_second` from `0.0133` to `0.30` (fal.ai standard). IMPORTANT: Keep the key name as `cost_per_second` — do NOT rename to `cost_per_second_usd` because `core/model_profiles.py:get_cost()` reads this exact key. Add a separate `cost_per_second_fast` field and a `cost_notes` field.

```json
"seeddance-2.0": {
    "cost_per_second": 0.30,
    "cost_per_second_fast": 0.24,
    "cost_notes": "fal.ai pricing. Atlas Cloud: $0.10 standard, $0.081 fast",
    ...
}
```

Also update any test that asserts SeedDance cost values (e.g., `test_seeddance_cost_tracking` — expected cost will change from ~$0.0665 to ~$1.50 for 5s). Search with: `grep -r "0.0133\|0.0665" pipeline/tests/`

**model_profiles.json — Add QwenMA:**
```json
"qwen-ma": {
    "provider": "fal.ai",
    "modality": "image",
    "endpoint": "fal-ai/qwen-image-edit-2511-multiple-angles",
    "cost_per_image_usd": 0.035,
    "capabilities": ["multi-angle", "camera-control"],
    "params": {
        "horizontal_angle": "0-360 degrees",
        "vertical_angle": "-30 to 30",
        "zoom": "0-10"
    }
}
```

**model_profiles.json — Wan 2.7 r2v note:**
Add note that Wan r2v uses `fal_client.subscribe()` (blocking), not queue polling.

**PROMPT_BIBLE.yaml — Verify SeedDance section is correct:**
The section was updated earlier this session. Verify:
- `role_labeling: true` with `label_syntax: "@ImageN"`
- `duration.range: [4, 15]` with `auto_supported: true`
- `generate_audio.default_on: true`
- `gotchas` includes `image_urls` not `reference_images`
- Cost notes reflect actual pricing

### Scope boundary
- Do NOT change model capabilities or routing logic (that's Phase 5)
- Only update cost data and add missing model entries

### Validation
```bash
python3 -c "
import json
d = json.load(open('config/model_profiles.json'))
sd = d.get('seeddance-2.0', {})
cost = sd.get('cost_per_second_usd', 0)
assert cost >= 0.20, f'SeedDance cost not updated: {cost}'
assert 'qwen-ma' in d, 'QwenMA not added'
print('Phase 9 OK')
"
```

---

## Phase 10: /pass Skill

### Files to create
- `~/.claude/skills/pass/SKILL.md` — New Claude Code skill for pass-level generation

### Requirements

**Skill definition:**

```
/pass plan --episode N          # Dry-run build_coverage_passes, show pass summary
/pass validate --episode N      # Run coverage_validator, report BLOCK/WARN/INFO
/pass generate --episode N --pass PASS_ID  # Generate a single pass
/pass generate --episode N --all           # Generate all passes (overnight mode)
/pass review --episode N        # Show pass results in workspace
/pass estimate --episode N      # Cost estimate for all passes
/pass retry --pass PASS_ID      # Pole position retry for a failed pass
```

**Validation hooks (before generation):**
1. Run `validate_all_passes()` — abort on any BLOCK
2. Verify all reference image paths resolve
3. Check budget: `sum(pass.duration_s * cost_per_sec) <= budget_remaining`
4. Show cost estimate and ask for confirmation

**The skill should:**
- Read ep_NNN_passes.json for pass data
- Use workspace MCP tools (prime_project, get_shot_detail) for review
- Use StepRunner.execute_pass() for generation
- Use PassStore for state tracking
- Show results per-segment after generation
- Cost estimation uses model_profiles.json rates

### What already exists (from prior phases)
- Phase 1: PassStore
- Phase 2: build_seeddance_r2v_prompt_multi()
- Phase 4: StepRunner.execute_pass()
- Phase 5: Updated coverage_planner
- Phase 7: Production loop pass iteration
- Existing skills in `~/.claude/skills/` — follow naming patterns
- `/workspace` skill — use as reference for MCP tool integration

### Scope boundary
- Skill is a thin wrapper — all logic lives in the engine code
- Do NOT duplicate engine logic in the skill
- Do NOT add visual UI components (workspace handles display)

### Validation
```bash
test -f ~/.claude/skills/pass/SKILL.md && \
grep -q '/pass plan' ~/.claude/skills/pass/SKILL.md && \
grep -q '/pass generate' ~/.claude/skills/pass/SKILL.md && \
grep -q '/pass estimate' ~/.claude/skills/pass/SKILL.md && \
echo "Phase 10 OK"
```

---

## Cross-Phase Summary

| Phase | Creates | Modifies | Depends On |
|-------|---------|----------|------------|
| 1 | pass_store.py, pipeline/lib/pass_store.py (proxy) | step_types.py | — |
| 2 | — | prompt_engine.py | — |
| 3 | — | assembler.py | — |
| 4 | — | step_runner.py | 1 |
| 5 | — | coverage_planner.py, pipeline_config.json | — |
| 6 | scene_detect.py | — | — |
| 7 | — | production_loop.py | 1, 2, 3, 4, 5, 6 |
| 8 | — | prep_character_angles.py | — |
| 9 | — | model_profiles.json | — |
| 10 | skills/pass/SKILL.md | — | 1, 2, 3, 4, 5, 7 |

**Key architecture note:** Phases 2 and 3 (prompt building + ref resolution) are called by Phase 7 (production_loop), NOT by Phase 4 (step_runner). Phase 4 is a dumb executor that takes pre-built prompt + pre-ordered refs. This avoids cross-layer imports between execution/ and pipeline/.

Phases 1-3 and 5-6 and 8-9 have no interdependencies and can be built in parallel.
Phase 4 depends on 1 only. Phase 7 depends on 1, 2, 3, 4, 5, 6 (it wires everything together). Phase 10 depends on everything.
