# BUILD_SPEC — ClientSequenceRunner

**Generated:** 2026-03-27
**Input:** consultations/client-side-steprunner/SYNTHESIS.md
**Detail level:** max
**Visual design:** no
**Phases:** 5
**Estimated build time:** 2-3 hours

## Validation command
```bash
cd ~/Dropbox/CLAUDE_PROJECTS/starsend && python3 -m pytest tests/ -x -q --tb=short 2>&1 | tail -20
```

## DO NOT MODIFY
- `orchestrator/pipeline.py` — series orchestrator, irrelevant
- `lib/execution_store.py` — no state machine changes
- `lib/elements.py` — works as-is for client projects
- `lib/api_client.py` — works as-is
- `lib/recoil_bridge.py` — irrelevant to client work
- `config/starsend_config.json` — no config changes

---

## Phase 1: Image Utilities — Grid Cell Extraction

### Files to create
- `tools/image_utils.py` — Pure image manipulation: extract cells from grid images, resize for start frames

### Exact implementation

**Create `tools/image_utils.py`:**

```python
"""
image_utils.py — Image manipulation utilities for client video workflow.

Grid cell extraction, image resizing, and start frame preparation.
Pure functions with no pipeline dependencies — only PIL.
"""

import logging
from io import BytesIO
from pathlib import Path
from typing import Optional

from PIL import Image

logger = logging.getLogger(__name__)


def extract_grid_cell(
    grid_path: Path,
    row: int,
    col: int,
    grid_size: str = "2x2",
    output_path: Optional[Path] = None,
) -> Path:
    """Extract a single cell from a grid image.

    Args:
        grid_path: Path to the grid image (PNG or JPEG).
        row: Row index (0-based, top to bottom).
        col: Column index (0-based, left to right).
        grid_size: Grid layout string ("2x2", "3x3", "2x3").
        output_path: Where to save the extracted cell. If None,
            saves next to the grid as {stem}_r{row}c{col}.png.

    Returns:
        Path to the saved cell image.

    Raises:
        ValueError: If row/col are out of bounds for the grid size.
        FileNotFoundError: If grid_path doesn't exist.
    """
    if not grid_path.exists():
        raise FileNotFoundError(f"Grid image not found: {grid_path}")

    rows_count, cols_count = _parse_grid_size(grid_size)

    if row < 0 or row >= rows_count:
        raise ValueError(f"Row {row} out of bounds for {grid_size} grid (0-{rows_count - 1})")
    if col < 0 or col >= cols_count:
        raise ValueError(f"Col {col} out of bounds for {grid_size} grid (0-{cols_count - 1})")

    img = Image.open(grid_path)
    width, height = img.size

    cell_w = width // cols_count
    cell_h = height // rows_count

    left = col * cell_w
    upper = row * cell_h
    right = left + cell_w
    lower = upper + cell_h

    cell = img.crop((left, upper, right, lower))

    # Convert to RGB if needed (drop alpha)
    if cell.mode in ("RGBA", "P"):
        cell = cell.convert("RGB")

    if output_path is None:
        output_path = grid_path.parent / f"{grid_path.stem}_r{row}c{col}.png"

    output_path.parent.mkdir(parents=True, exist_ok=True)
    cell.save(str(output_path), format="PNG")
    logger.info("Extracted cell r%dc%d from %s → %s (%dx%d)",
                row, col, grid_path.name, output_path.name, cell.size[0], cell.size[1])
    return output_path


def quadrant_to_rowcol(quadrant: str, grid_size: str = "2x2") -> tuple[int, int]:
    """Convert a human-friendly quadrant name to (row, col).

    Supports:
        "top_left" / "tl" / "1" → (0, 0)
        "top_right" / "tr" / "2" → (0, 1)
        "bottom_left" / "bl" / "3" → (1, 0)
        "bottom_right" / "br" / "4" → (1, 1)
        For 3x3: "1"-"9" maps left-to-right, top-to-bottom.

    Raises:
        ValueError: If quadrant string is unrecognized.
    """
    rows_count, cols_count = _parse_grid_size(grid_size)

    # Named quadrants (2x2 only)
    _NAMED = {
        "top_left": (0, 0), "tl": (0, 0),
        "top_right": (0, 1), "tr": (0, 1),
        "bottom_left": (1, 0), "bl": (1, 0),
        "bottom_right": (1, 1), "br": (1, 1),
    }
    key = quadrant.lower().strip()
    if key in _NAMED:
        return _NAMED[key]

    # Numeric: 1-indexed, left-to-right, top-to-bottom
    try:
        idx = int(key) - 1  # Convert to 0-indexed
        if idx < 0 or idx >= rows_count * cols_count:
            raise ValueError(f"Cell index {key} out of bounds for {grid_size}")
        row = idx // cols_count
        col = idx % cols_count
        return row, col
    except ValueError:
        pass

    raise ValueError(
        f"Unrecognized quadrant: {quadrant!r}. "
        f"Use 'top_left'/'tl'/'1', 'top_right'/'tr'/'2', etc."
    )


def _parse_grid_size(grid_size: str) -> tuple[int, int]:
    """Parse "2x2", "3x3", "2x3" into (rows, cols)."""
    parts = grid_size.lower().split("x")
    if len(parts) != 2:
        raise ValueError(f"Invalid grid_size: {grid_size!r}. Expected 'RxC' (e.g., '2x2').")
    return int(parts[0]), int(parts[1])
```

### Validation
```bash
python3 -c "import ast; ast.parse(open('tools/image_utils.py').read())" && \
python3 -c "
import sys; sys.path.insert(0, '.')
from tools.image_utils import extract_grid_cell, quadrant_to_rowcol, _parse_grid_size
assert _parse_grid_size('2x2') == (2, 2)
assert _parse_grid_size('3x3') == (3, 3)
assert quadrant_to_rowcol('top_left') == (0, 0)
assert quadrant_to_rowcol('br') == (1, 1)
assert quadrant_to_rowcol('3', '3x3') == (0, 2)
print('Phase 1 OK')
"
```

### Scope boundary
- Do NOT import any pipeline modules (step_runner, execution_store, etc.)
- Do NOT add grid generation logic — this is extraction only
- Do NOT modify any existing files

---

## Phase 2: StepRunner Modification — shot_prompts on Multi-Shot Takes

### Files to modify
- `orchestrator/step_runner.py` — Add optional `shot_prompts` parameter to `execute_multi_shot()`

### What already exists
- `execute_multi_shot()` at line 423 accepts `batch`, `multi_prompt_sequence`, `model`, `start_frame`, `aspect_ratio`, `elements_payload`, `cfg_scale`
- Take records are built at line 549-565 with fields: `take_number`, `file_path`, `prompt_used`, `cost_usd`, `timestamp`, `model`, `pipeline`, `multi_prompt_shots`, `total_duration`, `disposition`
- The `prompt_used` field stores a concatenated summary: `" | ".join(f"[{s.get('prompt', '')[:50]}]" ...)`

### Exact implementation

**In `orchestrator/step_runner.py`, modify the `execute_multi_shot` signature (line 423):**

Replace:
```python
    def execute_multi_shot(
        self,
        batch: list[dict],
        multi_prompt_sequence: list[dict],
        model: str = "kling-v3",
        start_frame: Optional[Path] = None,
        aspect_ratio: str = "9:16",
        elements_payload: Optional[dict] = None,
        cfg_scale: Optional[float] = None,
    ) -> list[StepResult]:
```

With:
```python
    def execute_multi_shot(
        self,
        batch: list[dict],
        multi_prompt_sequence: list[dict],
        model: str = "kling-v3",
        start_frame: Optional[Path] = None,
        aspect_ratio: str = "9:16",
        elements_payload: Optional[dict] = None,
        cfg_scale: Optional[float] = None,
        shot_prompts: Optional[list[dict]] = None,
    ) -> list[StepResult]:
```

**Then, in the take record building block (around line 554), add `shot_prompts` to the take record.** Find this block:

```python
            take_record = {
                "take_number": take_number,
                "file_path": rel_path,
                "prompt_used": prompt_summary,
                "cost_usd": total_cost,
                "timestamp": time.time(),
                "model": model,
                "pipeline": "multi_shot",
                "multi_prompt_shots": len(multi_prompt_sequence),
                "total_duration": total_duration,
                "disposition": None,
            }
```

Replace with:
```python
            take_record = {
                "take_number": take_number,
                "file_path": rel_path,
                "prompt_used": prompt_summary,
                "cost_usd": total_cost,
                "timestamp": time.time(),
                "model": model,
                "pipeline": "multi_shot",
                "multi_prompt_shots": len(multi_prompt_sequence),
                "total_duration": total_duration,
                "disposition": None,
            }
            if shot_prompts is not None:
                take_record["shot_prompts"] = shot_prompts
```

**Note:** `_execute_sequential_shots` is NOT modified — sequential mode records individual per-shot prompts natively via `prompt[:200]` on each take. The `shot_prompts` parameter is only relevant for the multi-shot path where prompts are concatenated into a summary.

### Validation
```bash
python3 -c "import ast; ast.parse(open('orchestrator/step_runner.py').read())" && \
python3 -c "
import inspect, sys; sys.path.insert(0, '.')
from orchestrator.step_runner import StepRunner
sig = inspect.signature(StepRunner.execute_multi_shot)
assert 'shot_prompts' in sig.parameters, 'shot_prompts param missing'
print('Phase 2 OK')
"
```

### Scope boundary
- Do NOT add any other parameters or methods to StepRunner
- Do NOT modify execute_video, execute_keyframe, or execute_previz
- Do NOT change the ExecutionStore state machine
- The `shot_prompts` parameter is Optional — existing callers are unaffected

---

## Phase 3: Client Bridge — Sequence State Management

### Files to modify
- `lib/client_bridge.py` — Add sequence state read/write functions

### What already exists (from Phase 1-2)
- Phase 1 created `tools/image_utils.py` with grid cell extraction
- Phase 2 added `shot_prompts` to StepRunner's execute_multi_shot
- `client_bridge.py` already has: `load_client_storyboard`, `load_client_bible`, `load_client_project_config`, `save_client_bible`, `get_client_refs_dir`
- Constants: `PROJECTS_ROOT` from `lib.constants`

### Exact implementation

**In `lib/client_bridge.py`, add these functions after the existing ones:**

```python
def _client_state_dir(project):
    """Client-specific state directory (separate from starsend state)."""
    return PROJECTS_ROOT / project / "state" / "client"


def load_sequence_state(project):
    """Load sequence workflow state. Returns empty dict if no state file."""
    state_path = _client_state_dir(project) / "sequences.json"
    if not state_path.exists():
        return {}
    return json.loads(state_path.read_text(encoding="utf-8"))


def save_sequence_state(project, state_data):
    """Save sequence workflow state. Creates directory if needed."""
    state_dir = _client_state_dir(project)
    state_dir.mkdir(parents=True, exist_ok=True)
    state_path = state_dir / "sequences.json"
    state_path.write_text(
        json.dumps(state_data, indent=2, ensure_ascii=False),
        encoding="utf-8",
    )
    logger.info("Saved sequence state: %s (%d sequences)", state_path, len(state_data))


def init_sequence_state(project, episode=1):
    """Initialize sequence state from the client plan.

    Reads ep_NNN_plan.json, creates a sequences.json with every
    sequence set to 'not_started'. Preserves existing state for
    sequences that already have entries (idempotent).

    Returns the full state dict.
    """
    plan = load_client_storyboard(project, episode)
    existing = load_sequence_state(project)

    for seq in plan.get("sequences", []):
        seq_id = seq["id"]
        if seq_id not in existing:
            existing[seq_id] = {
                "status": "not_started",
                "grid_path": None,
                "start_frame": None,
                "start_frame_approved": False,
                "elements": seq.get("elements", []),
                "model": seq.get("model", "kling-o3"),
                "video_path": None,
                "takes": 0,
                "current_take": None,
                "notes": "",
            }

    save_sequence_state(project, existing)
    logger.info("Initialized sequence state for %s: %d sequences", project, len(existing))
    return existing


# Valid sequence state transitions (orchestrator-level, NOT ExecutionStore)
SEQUENCE_TRANSITIONS = {
    "not_started":          {"grid_exploring", "generating", "start_frame_approved"},
    "grid_exploring":       {"grid_review", "not_started"},
    "grid_review":          {"start_frame_approved", "grid_exploring"},
    "start_frame_approved": {"generating"},
    "generating":           {"review", "not_started"},
    "review":               {"approved", "generating", "not_started"},
    "approved":             {"final", "generating"},
    "final":                set(),
}


def transition_sequence(project, seq_id, to_state):
    """Transition a sequence to a new state with validation.

    Raises ValueError if the transition is not allowed.
    """
    state = load_sequence_state(project)
    if seq_id not in state:
        raise KeyError(f"Sequence {seq_id} not found in state for project {project}")

    current = state[seq_id]["status"]
    allowed = SEQUENCE_TRANSITIONS.get(current, set())
    if to_state not in allowed:
        raise ValueError(
            f"Sequence {seq_id}: transition {current} → {to_state} not allowed. "
            f"Valid: {sorted(allowed)}"
        )

    state[seq_id]["status"] = to_state
    save_sequence_state(project, state)
    logger.info("Sequence %s: %s → %s", seq_id, current, to_state)


def update_sequence_field(project, seq_id, **kwargs):
    """Update arbitrary fields on a sequence state entry.

    Does NOT validate transitions — use transition_sequence() for status changes.
    """
    state = load_sequence_state(project)
    if seq_id not in state:
        raise KeyError(f"Sequence {seq_id} not found in state for project {project}")

    for key, value in kwargs.items():
        if key == "status":
            raise ValueError("Use transition_sequence() for status changes")
        state[seq_id][key] = value

    save_sequence_state(project, state)
```

### Validation
```bash
python3 -c "import ast; ast.parse(open('lib/client_bridge.py').read())" && \
python3 -c "
import sys; sys.path.insert(0, '.')
from lib.client_bridge import (
    load_sequence_state, save_sequence_state, init_sequence_state,
    transition_sequence, update_sequence_field, SEQUENCE_TRANSITIONS,
    load_client_storyboard, load_client_bible, load_client_project_config,
)
assert 'not_started' in SEQUENCE_TRANSITIONS
assert 'generating' in SEQUENCE_TRANSITIONS['start_frame_approved']
print('Phase 3 OK')
"
```

### Scope boundary
- Do NOT modify any existing functions in client_bridge.py
- Do NOT import from step_runner or execution_store
- Do NOT add validation logic for the client bible schema (ElementManager handles this)
- Sequence state is a flat JSON dict keyed by sequence ID — no nesting by episode

---

## Phase 4: ClientSequenceRunner — Core Orchestrator

### Files to create
- `tools/client_sequence_runner.py` — The main orchestrator class

### What already exists (from prior phases)
- Phase 1: `tools/image_utils.py` with `extract_grid_cell()`, `quadrant_to_rowcol()`
- Phase 2: `StepRunner.execute_multi_shot()` accepts `shot_prompts` parameter
- Phase 3: `client_bridge.py` has `load_sequence_state`, `save_sequence_state`, `init_sequence_state`, `transition_sequence`, `update_sequence_field`
- Existing: `StepRunner` in `orchestrator/step_runner.py` with `execute_multi_shot()`, `execute_video()`, `execute_keyframe()`
- Existing: `ExecutionStore` in `lib/execution_store.py`
- Existing: `ProjectPaths` in `orchestrator/step_types.py` — `ProjectPaths.for_episode(project, episode)`
- Existing: `ElementManager` in `lib/elements.py` — `ElementManager.build_elements_for_fal(char_ids, project)` and `ElementManager.build_elements_with_info(char_ids, project, location_id)`
- Existing: `lib/model_profiles.py` — `get_cost(model)` returns $/second

### Exact implementation

**Create `tools/client_sequence_runner.py`:**

```python
#!/usr/bin/env python3
"""
client_sequence_runner.py — Orchestrator for client video projects.

The client-video equivalent of pipeline.py for series work. Reads client
plans (sequences[].shots[]), manages sequence workflow state, and drives
StepRunner's execute_multi_shot() and execute_video() primitives.

Grid exploration reuses execute_keyframe() with grid-specific prompts.
Sequence state lives in projects/{project}/state/client/sequences.json,
separate from ExecutionStore's shot-level execution state.

Usage:
    runner = ClientSequenceRunner("driver-beware")
    runner.explore_grid("SEQ01")          # Generate 2x2 grid for start frame
    runner.pick_cell("SEQ01", "top_left") # Crop and save start frame
    runner.generate("SEQ01")              # Run multi-shot sequence
    runner.status()                       # Show all sequence states
"""

import json
import logging
import os
import sys
import time
from pathlib import Path
from typing import Optional

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from lib.constants import PROJECTS_ROOT
from lib.client_bridge import (
    load_client_storyboard,
    load_client_bible,
    load_client_project_config,
    load_sequence_state,
    save_sequence_state,
    init_sequence_state,
    transition_sequence,
    update_sequence_field,
)
from lib.execution_store import ExecutionStore
from lib.elements import ElementManager
from orchestrator.step_runner import StepRunner
from orchestrator.step_types import ProjectPaths, StepResult

logger = logging.getLogger(__name__)

# Max elements when a start frame is present (fal.ai Kling O3 limit)
MAX_ELEMENTS_WITH_START_FRAME = 3
MAX_ELEMENTS_WITHOUT_START_FRAME = 4


class ClientSequenceRunner:
    """Orchestrates client video workflow at the sequence level.

    Reads client plans natively (sequences[].shots[]), manages
    sequence workflow state, and composes StepRunner primitives.
    """

    def __init__(self, project: str, episode: int = 1):
        self.project = project
        self.episode = episode

        # Load project data
        self.config = load_client_project_config(project)
        self.plan = load_client_storyboard(project, episode)
        self.bible = load_client_bible(project)

        # Initialize pipeline components
        self.paths = ProjectPaths.for_episode(project, episode)
        self.store = ExecutionStore(project)
        self.step_runner = StepRunner(
            store=self.store,
            paths=self.paths,
            validate_frames=True,
        )

        # Initialize sequence state (idempotent — preserves existing)
        self.seq_state = init_sequence_state(project, episode)

    # ── Status ────────────────────────────────────────────────────

    def status(self) -> dict:
        """Return status of all sequences.

        Returns dict keyed by sequence ID with status, takes, model, etc.
        """
        self.seq_state = load_sequence_state(self.project)
        return self.seq_state

    def print_status(self):
        """Print a human-readable status table."""
        state = self.status()
        sequences = self.plan.get("sequences", [])

        print(f"\n{'ID':<8} {'Status':<24} {'Shots':>5} {'Takes':>5} {'Model':<12} {'Notes'}")
        print("-" * 80)

        for seq in sequences:
            seq_id = seq["id"]
            s = state.get(seq_id, {})
            status = s.get("status", "unknown")
            shots = len(seq.get("shots", []))
            takes = s.get("takes", 0)
            model = s.get("model", seq.get("model", "?"))
            notes = s.get("notes", "")[:30]
            print(f"{seq_id:<8} {status:<24} {shots:>5} {takes:>5} {model:<12} {notes}")

        print()

    # ── Grid Exploration ──────────────────────────────────────────

    def explore_grid(
        self,
        seq_id: str,
        prompt: Optional[str] = None,
        grid_size: str = "2x2",
        model: str = "gemini-3.1-flash-image-preview",
        aspect_ratio: Optional[str] = None,
    ) -> StepResult:
        """Generate a grid of start frame options for a sequence.

        Uses execute_keyframe() with a grid-specific prompt and
        "{seq_id}_grid" as the shot_id. The grid image is saved to
        the frames directory and tracked as a keyframe take.

        Args:
            seq_id: Sequence ID (e.g., "SEQ01").
            prompt: Custom grid prompt. If None, builds from first shot's prompt.
            grid_size: Grid layout ("2x2" or "3x3").
            model: Image generation model.
            aspect_ratio: Override aspect ratio (defaults to project config).

        Returns:
            StepResult with the grid image path.
        """
        seq = self._get_sequence(seq_id)
        ar = aspect_ratio or self.config.get("aspect_ratio", "16:9")

        # Build grid prompt from sequence context
        if prompt is None:
            first_shot = seq["shots"][0] if seq.get("shots") else {}
            base_prompt = first_shot.get("prompt", seq.get("narrative", ""))
            style = self.config.get("film_style_suffix", "")
            prompt = (
                f"Generate a {grid_size} grid of style variations. "
                f"Each cell shows the same scene from a different angle or framing. "
                f"Scene: {base_prompt}. Style: {style}. "
                f"Format as a photographic contact sheet with clear dividing lines "
                f"between cells. Each cell is a complete, independent frame."
            )

        # Transition sequence state
        transition_sequence(self.project, seq_id, "grid_exploring")

        # Use execute_keyframe with grid-specific shot_id
        grid_shot_id = f"{seq_id}_grid"

        # Ensure shot exists in ExecutionStore
        if not self.store.get_shot(grid_shot_id):
            self.store.create_shot(grid_shot_id, {"status": "keyframe_pending"})
        else:
            # Reset if re-exploring
            try:
                self.store.force_reset_status(
                    grid_shot_id, "keyframe_pending",
                    reason="Grid re-exploration",
                )
            except Exception:
                pass

        result = self.step_runner.execute_keyframe(
            shot_id=grid_shot_id,
            prompt=prompt,
            model=model,
            aspect_ratio=ar,
        )

        if result.success and result.output_path:
            # Resolve to absolute path
            grid_abs = Path(result.output_path)
            if not grid_abs.is_absolute():
                grid_abs = self.paths.project_root / result.output_path

            update_sequence_field(
                self.project, seq_id,
                grid_path=str(grid_abs),
            )
            transition_sequence(self.project, seq_id, "grid_review")
            logger.info("Grid exploration complete for %s: %s", seq_id, grid_abs)
        else:
            transition_sequence(self.project, seq_id, "not_started")
            logger.error("Grid exploration failed for %s: %s", seq_id, result.error)

        return result

    def pick_cell(
        self,
        seq_id: str,
        quadrant: str,
        grid_size: str = "2x2",
    ) -> Path:
        """Extract a cell from the grid and save as start frame.

        Args:
            seq_id: Sequence ID.
            quadrant: Cell identifier ("top_left", "tl", "1", "br", etc.)
            grid_size: Grid layout (must match what was generated).

        Returns:
            Path to the saved start frame image.
        """
        from tools.image_utils import extract_grid_cell, quadrant_to_rowcol

        state = load_sequence_state(self.project)
        seq_state = state.get(seq_id, {})
        grid_path = seq_state.get("grid_path")

        if not grid_path:
            raise ValueError(f"No grid image found for {seq_id}. Run explore_grid() first.")

        grid_path = Path(grid_path)
        if not grid_path.exists():
            raise FileNotFoundError(f"Grid image not found: {grid_path}")

        # Extract the selected cell
        row, col = quadrant_to_rowcol(quadrant, grid_size)

        clean_dir = self.paths.frames_dir / "clean"
        clean_dir.mkdir(parents=True, exist_ok=True)
        output_path = clean_dir / f"{seq_id.lower()}_start.png"

        start_frame = extract_grid_cell(
            grid_path=grid_path,
            row=row,
            col=col,
            grid_size=grid_size,
            output_path=output_path,
        )

        update_sequence_field(
            self.project, seq_id,
            start_frame=str(start_frame),
            start_frame_approved=True,
        )
        transition_sequence(self.project, seq_id, "start_frame_approved")

        logger.info("Start frame for %s: %s (cell r%dc%d)", seq_id, start_frame, row, col)
        return start_frame

    def set_start_frame(self, seq_id: str, path: Path):
        """Set an external start frame (not from grid exploration).

        Use when a start frame comes from the draft video cut frames
        or is manually provided. Transitions not_started → start_frame_approved.
        """
        if not path.exists():
            raise FileNotFoundError(f"Start frame not found: {path}")
        update_sequence_field(
            self.project, seq_id,
            start_frame=str(path),
            start_frame_approved=True,
        )
        transition_sequence(self.project, seq_id, "start_frame_approved")
        logger.info("External start frame set for %s: %s", seq_id, path)

    # ── Video Generation ──────────────────────────────────────────

    def generate(
        self,
        seq_id: str,
        mode: str = "multi_shot",
        start_frame: Optional[Path] = None,
        prompt_overrides: Optional[dict] = None,
        model_override: Optional[str] = None,
    ) -> list[StepResult]:
        """Generate video for a sequence.

        Args:
            seq_id: Sequence ID (e.g., "SEQ08").
            mode: "multi_shot" (default) or "individual".
            start_frame: Override start frame path. If None, uses
                the approved start frame from sequence state.
            prompt_overrides: Dict mapping shot index (0-based) to
                replacement prompt text. E.g., {2: "new prompt for shot 3"}.
            model_override: Override model for this generation.

        Returns:
            List of StepResults (one for multi_shot, N for individual).
        """
        seq = self._get_sequence(seq_id)
        config_ar = self.config.get("aspect_ratio", "16:9")

        # Resolve start frame
        if start_frame is None:
            sf = self._get_start_frame(seq_id)
            if sf:
                start_frame = Path(sf)

        # Resolve model
        model = model_override or seq.get("model", "kling-o3")

        # Build prompt array with optional overrides
        shot_prompts = []
        for i, shot in enumerate(seq.get("shots", [])):
            prompt = shot.get("prompt", "")
            duration = shot.get("duration", 3)
            if prompt_overrides and i in prompt_overrides:
                prompt = prompt_overrides[i]
            shot_prompts.append({
                "index": i,
                "prompt": prompt,
                "duration": duration,  # Keep as int — API clients convert to string
            })

        # Resolve elements with cap enforcement
        element_ids = seq.get("elements", [])
        elements_payload = self._resolve_elements(element_ids, start_frame)

        # Transition state — handle re-runs from stuck "generating" state
        current_status = self.seq_state.get(seq_id, {}).get("status", "not_started")
        if current_status == "generating":
            # Crashed during previous run — reset to allow re-entry
            state = load_sequence_state(self.project)
            state[seq_id]["status"] = "review"
            save_sequence_state(self.project, state)
            self.seq_state = state
        transition_sequence(self.project, seq_id, "generating")

        # Auto-detect: single-shot sequences must use individual mode
        if len(shot_prompts) < 2 and mode == "multi_shot":
            logger.info("%s: single shot — falling back to individual mode", seq_id)
            mode = "individual"

        if mode == "multi_shot":
            results = self._run_multi_shot(
                seq_id, shot_prompts, model, start_frame,
                config_ar, elements_payload,
            )
        elif mode == "individual":
            results = self._run_individual(
                seq_id, shot_prompts, model, start_frame,
                config_ar, elements_payload,
            )
        else:
            raise ValueError(f"Unknown mode: {mode!r}. Use 'multi_shot' or 'individual'.")

        # Update sequence state based on results
        any_success = any(r.success for r in results)
        total_cost = sum(r.cost_usd for r in results)

        if any_success:
            # Find the video path from successful results
            video_path = next(
                (r.output_path for r in results if r.success and r.output_path),
                None,
            )
            seq_state = load_sequence_state(self.project)
            takes = seq_state.get(seq_id, {}).get("takes", 0) + 1

            update_sequence_field(
                self.project, seq_id,
                video_path=video_path,
                takes=takes,
                current_take=takes,
            )
            transition_sequence(self.project, seq_id, "review")
            logger.info(
                "%s: generation complete — take %d, $%.3f, mode=%s",
                seq_id, takes, total_cost, mode,
            )
        else:
            transition_sequence(self.project, seq_id, "not_started")
            errors = [r.error for r in results if r.error]
            logger.error("%s: generation failed — %s", seq_id, "; ".join(errors))

        return results

    def approve(self, seq_id: str):
        """Approve a sequence's current output."""
        transition_sequence(self.project, seq_id, "approved")
        logger.info("Sequence %s approved", seq_id)

    # ── Internal Methods ──────────────────────────────────────────

    def _run_multi_shot(
        self, seq_id, shot_prompts, model, start_frame,
        aspect_ratio, elements_payload,
    ) -> list[StepResult]:
        """Run a sequence as a single multi-shot API call."""
        # Build batch dicts matching execute_multi_shot's expected format
        batch = []
        for sp in shot_prompts:
            batch.append({
                "shot_id": seq_id,
                "_api_duration": int(sp["duration"]),
            })

        # Ensure the sequence shot exists in ExecutionStore
        if not self.store.get_shot(seq_id):
            self.store.create_shot(seq_id, {"status": "video_pending"})
        else:
            current = (self.store.get_shot(seq_id) or {}).get("status", "")
            if current not in ("video_pending", "video_failed", "video_complete",
                               "video_rejected", "rejected", "failed"):
                try:
                    self.store.force_reset_status(
                        seq_id, "video_pending",
                        reason="Client sequence regeneration",
                    )
                except Exception:
                    pass

        return self.step_runner.execute_multi_shot(
            batch=batch,
            multi_prompt_sequence=shot_prompts,
            model=model,
            start_frame=start_frame,
            aspect_ratio=aspect_ratio,
            elements_payload=elements_payload,
            shot_prompts=shot_prompts,
        )

    def _run_individual(
        self, seq_id, shot_prompts, model, start_frame,
        aspect_ratio, elements_payload,
    ) -> list[StepResult]:
        """Run each shot in a sequence as individual I2V calls."""
        results = []
        for i, sp in enumerate(shot_prompts):
            shot_id = f"{seq_id}_shot{i:02d}"
            duration = int(sp["duration"])

            # First shot uses the sequence start frame; others get None
            sf = start_frame if i == 0 else None

            # Ensure shot exists in ExecutionStore
            if not self.store.get_shot(shot_id):
                self.store.create_shot(shot_id, {"status": "video_pending"})
            else:
                try:
                    self.store.force_reset_status(
                        shot_id, "video_pending",
                        reason="Individual shot regeneration",
                    )
                except Exception:
                    pass

            result = self.step_runner.execute_video(
                shot_id=shot_id,
                prompt=sp["prompt"],
                model=model,
                start_frame=sf,
                duration=duration,
                aspect_ratio=aspect_ratio,
                elements_payload=elements_payload,
            )
            results.append(result)

        return results

    def _resolve_elements(
        self,
        element_ids: list[str],
        start_frame: Optional[Path],
    ) -> dict:
        """Resolve elements with cap enforcement.

        When a start frame is present, Kling O3 supports max 3 elements.
        Without start frame, max 4. Elements are prioritized by plan order
        (first in the array = highest priority).
        """
        if not element_ids:
            return {}

        max_elements = (
            MAX_ELEMENTS_WITH_START_FRAME if start_frame
            else MAX_ELEMENTS_WITHOUT_START_FRAME
        )

        if len(element_ids) > max_elements:
            dropped = element_ids[max_elements:]
            element_ids = element_ids[:max_elements]
            logger.warning(
                "Element cap: keeping %d of %d elements (start_frame=%s). "
                "Dropped: %s",
                max_elements, max_elements + len(dropped),
                "yes" if start_frame else "no",
                dropped,
            )

        return ElementManager.build_elements_for_fal(
            char_ids=element_ids,
            project=self.project,
        )

    def _get_sequence(self, seq_id: str) -> dict:
        """Get sequence data from the plan. Raises KeyError if not found."""
        for seq in self.plan.get("sequences", []):
            if seq["id"] == seq_id:
                return seq
        raise KeyError(f"Sequence {seq_id} not found in plan for {self.project}")

    def _get_start_frame(self, seq_id: str) -> Optional[str]:
        """Get the approved start frame path for a sequence, if any.

        Warns loudly if the state says there should be a start frame
        but the file is missing from disk.
        """
        state = load_sequence_state(self.project)
        seq_state = state.get(seq_id, {})
        sf = seq_state.get("start_frame")
        if not sf:
            return None
        if not Path(sf).exists():
            logger.warning(
                "START FRAME MISSING for %s: %s — generating without it. "
                "Re-run grid exploration or set_start_frame() to fix.",
                seq_id, sf,
            )
            return None
        return sf
```

### Validation
```bash
python3 -c "import ast; ast.parse(open('tools/client_sequence_runner.py').read())" && \
python3 -c "
import sys; sys.path.insert(0, '.')
from tools.client_sequence_runner import ClientSequenceRunner, MAX_ELEMENTS_WITH_START_FRAME
assert MAX_ELEMENTS_WITH_START_FRAME == 3
import inspect
cls = ClientSequenceRunner
assert hasattr(cls, 'explore_grid')
assert hasattr(cls, 'pick_cell')
assert hasattr(cls, 'generate')
assert hasattr(cls, 'status')
assert hasattr(cls, 'approve')
assert hasattr(cls, 'set_start_frame')
assert hasattr(cls, '_resolve_elements')
print('Phase 4 OK')
"
```

### Scope boundary
- Do NOT modify StepRunner, ExecutionStore, or ElementManager
- Do NOT add Console/UI integration (that's Week 2)
- Do NOT add grid retry logic yet (that's an enhancement after Day 1)
- Do NOT handle audio generation (Kling O3 generates audio by default)
- The orchestrator calls existing StepRunner methods — it does NOT call API clients directly

---

## Phase 5: CLI Entry Point — client_generate.py

### Files to create
- `tools/client_generate.py` — CLI tool wrapping ClientSequenceRunner

### What already exists (from prior phases)
- Phase 4 created `tools/client_sequence_runner.py` with `ClientSequenceRunner`
- `ClientSequenceRunner` has methods: `explore_grid()`, `pick_cell()`, `generate()`, `status()`, `print_status()`, `approve()`
- Phase 1 created `tools/image_utils.py` with `quadrant_to_rowcol()`

### Exact implementation

**Create `tools/client_generate.py`:**

```python
#!/usr/bin/env python3
"""
client_generate.py — CLI for client video generation workflow.

Commands:
    status    Show sequence status for a project
    grid      Generate a start frame exploration grid
    pick      Extract a cell from a grid as start frame
    generate  Generate video for a sequence
    approve   Approve a sequence's output
    init      Initialize sequence state from plan

Usage:
    python tools/client_generate.py status driver-beware
    python tools/client_generate.py grid driver-beware SEQ01
    python tools/client_generate.py pick driver-beware SEQ01 --quadrant top_left
    python tools/client_generate.py generate driver-beware SEQ08
    python tools/client_generate.py generate driver-beware SEQ08 --mode individual
    python tools/client_generate.py approve driver-beware SEQ08
"""

import argparse
import logging
import os
import sys

sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))


def cmd_status(args):
    from tools.client_sequence_runner import ClientSequenceRunner
    runner = ClientSequenceRunner(args.project, args.episode)
    runner.print_status()


def cmd_init(args):
    from lib.client_bridge import init_sequence_state
    state = init_sequence_state(args.project, args.episode)
    print(f"Initialized {len(state)} sequences for {args.project}")


def cmd_grid(args):
    from tools.client_sequence_runner import ClientSequenceRunner
    runner = ClientSequenceRunner(args.project, args.episode)
    result = runner.explore_grid(
        seq_id=args.seq_id,
        prompt=args.prompt,
        grid_size=args.grid_size,
        model=args.model or "gemini-3.1-flash-image-preview",
    )
    if result.success:
        print(f"Grid saved: {result.output_path}")
        print(f"Cost: ${result.cost_usd:.3f}")
        print(f"Next: python tools/client_generate.py pick {args.project} {args.seq_id} --quadrant <tl|tr|bl|br>")
    else:
        print(f"Grid generation failed: {result.error}")
        sys.exit(1)


def cmd_pick(args):
    from tools.client_sequence_runner import ClientSequenceRunner
    runner = ClientSequenceRunner(args.project, args.episode)
    start_frame = runner.pick_cell(
        seq_id=args.seq_id,
        quadrant=args.quadrant,
        grid_size=args.grid_size,
    )
    print(f"Start frame saved: {start_frame}")
    print(f"Next: python tools/client_generate.py generate {args.project} {args.seq_id}")


def cmd_generate(args):
    from tools.client_sequence_runner import ClientSequenceRunner
    runner = ClientSequenceRunner(args.project, args.episode)

    # Parse prompt overrides: "2:new prompt,4:another prompt"
    prompt_overrides = None
    if args.shot_override:
        prompt_overrides = {}
        for override in args.shot_override:
            idx_str, prompt = override.split(":", 1)
            prompt_overrides[int(idx_str)] = prompt

    results = runner.generate(
        seq_id=args.seq_id,
        mode=args.mode,
        prompt_overrides=prompt_overrides,
        model_override=args.model,
    )

    total_cost = sum(r.cost_usd for r in results)
    success_count = sum(1 for r in results if r.success)

    print(f"\n{args.seq_id}: {success_count}/{len(results)} shots succeeded, ${total_cost:.3f}")
    for r in results:
        status = "OK" if r.success else "FAIL"
        print(f"  {r.shot_id}: {status} — {r.output_path or r.error}")

    if success_count > 0:
        print(f"\nNext: review output, then:")
        print(f"  python tools/client_generate.py approve {args.project} {args.seq_id}")


def cmd_approve(args):
    from tools.client_sequence_runner import ClientSequenceRunner
    runner = ClientSequenceRunner(args.project, args.episode)
    runner.approve(args.seq_id)
    print(f"Sequence {args.seq_id} approved")


def main():
    parser = argparse.ArgumentParser(
        description="Client video generation workflow",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument(
        "--episode", type=int, default=1,
        help="Episode number (default: 1)",
    )
    parser.add_argument(
        "-v", "--verbose", action="store_true",
        help="Enable debug logging",
    )

    subparsers = parser.add_subparsers(dest="command", required=True)

    # status
    p_status = subparsers.add_parser("status", help="Show sequence status")
    p_status.add_argument("project", help="Project name (e.g., driver-beware)")

    # init
    p_init = subparsers.add_parser("init", help="Initialize sequence state from plan")
    p_init.add_argument("project", help="Project name")

    # grid
    p_grid = subparsers.add_parser("grid", help="Generate start frame grid")
    p_grid.add_argument("project", help="Project name")
    p_grid.add_argument("seq_id", help="Sequence ID (e.g., SEQ01)")
    p_grid.add_argument("--prompt", help="Custom grid prompt (default: auto from plan)")
    p_grid.add_argument("--grid-size", default="2x2", help="Grid layout (default: 2x2)")
    p_grid.add_argument("--model", help="Image model override")

    # pick
    p_pick = subparsers.add_parser("pick", help="Pick a cell from grid as start frame")
    p_pick.add_argument("project", help="Project name")
    p_pick.add_argument("seq_id", help="Sequence ID")
    p_pick.add_argument("--quadrant", required=True,
                        help="Cell to extract: top_left/tl/1, top_right/tr/2, etc.")
    p_pick.add_argument("--grid-size", default="2x2", help="Grid layout (default: 2x2)")

    # generate
    p_gen = subparsers.add_parser("generate", help="Generate video for sequence")
    p_gen.add_argument("project", help="Project name")
    p_gen.add_argument("seq_id", help="Sequence ID")
    p_gen.add_argument("--mode", choices=["multi_shot", "individual"],
                       default="multi_shot", help="Generation mode (default: multi_shot)")
    p_gen.add_argument("--shot-override", action="append",
                       help="Override shot prompt: INDEX:PROMPT (0-based)")
    p_gen.add_argument("--model", help="Model override")

    # approve
    p_approve = subparsers.add_parser("approve", help="Approve sequence output")
    p_approve.add_argument("project", help="Project name")
    p_approve.add_argument("seq_id", help="Sequence ID")

    args = parser.parse_args()

    # Configure logging
    level = logging.DEBUG if args.verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format="%(asctime)s %(name)s %(levelname)s %(message)s",
        datefmt="%H:%M:%S",
    )

    # Dispatch
    commands = {
        "status": cmd_status,
        "init": cmd_init,
        "grid": cmd_grid,
        "pick": cmd_pick,
        "generate": cmd_generate,
        "approve": cmd_approve,
    }
    commands[args.command](args)


if __name__ == "__main__":
    main()
```

### Validation
```bash
python3 -c "import ast; ast.parse(open('tools/client_generate.py').read())" && \
python3 tools/client_generate.py --help 2>&1 | grep -q "Client video generation" && \
python3 tools/client_generate.py status --help 2>&1 | grep -q "project" && \
python3 tools/client_generate.py grid --help 2>&1 | grep -q "seq_id" && \
python3 tools/client_generate.py generate --help 2>&1 | grep -q "mode" && \
echo "Phase 5 OK"
```

### Scope boundary
- Do NOT add Console API endpoints
- Do NOT import or depend on pipeline.py or recoil_bridge.py
- The CLI is a thin dispatcher — all logic lives in ClientSequenceRunner
- Do NOT add interactive prompts (everything via CLI flags)
