# BUILD_SPEC — Production Orchestrator

**Generated:** 2026-04-04
**Input:** `~/Dropbox/CLAUDE_PROJECTS/docs/superpowers/specs/2026-04-04-production-orchestrator-design.md`
**Detail level:** max
**Visual design:** no
**Phases:** 7
**Estimated build time:** 4-5 hours (harness execution)

## Validation command

```bash
cd /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil && python3 -c "
import ast, sys, pathlib
files = [
    'pipeline/orchestrator/production_types.py',
    'pipeline/orchestrator/autonomy_controller.py',
    'pipeline/orchestrator/take_provenance.py',
    'pipeline/orchestrator/retry_dispatcher.py',
    'pipeline/orchestrator/batch_manager.py',
    'pipeline/orchestrator/learning_engine.py',
    'pipeline/orchestrator/production_loop.py',
]
ok = True
for f in files:
    try:
        ast.parse(pathlib.Path(f).read_text())
    except SyntaxError as e:
        print(f'SYNTAX ERROR: {f}: {e}', file=sys.stderr)
        ok = False
    except FileNotFoundError:
        print(f'MISSING: {f}', file=sys.stderr)
        ok = False
if ok:
    print('All files parse OK')
else:
    sys.exit(1)
"
```

---

## Phase 1: Production Types & Configuration

### Files to create
- `pipeline/orchestrator/production_types.py`

### Scope boundary
- Do NOT modify any existing files
- Do NOT import from any new files (this phase is self-contained)
- Only stdlib + typing imports

### Exact implementation

**Create `pipeline/orchestrator/production_types.py`:**

```python
"""
production_types.py — Type definitions for the Production Orchestrator.

Frozen/immutable dataclasses for orchestration state. These are the contracts
between the production loop and its subsystems (batch manager, retry dispatcher,
autonomy controller, learning engine, provenance writer).
"""

import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Optional, Any


# ── Enums ──────────────────────────────────────────────────────────

class BatchStatus(str, Enum):
    """Lifecycle of a production batch."""
    CREATED = "created"
    RUNNING = "running"
    PAUSED = "paused"
    BUDGET_EXHAUSTED = "budget_exhausted"
    COMPLETE = "complete"
    FAILED = "failed"


class FailureCategory(str, Enum):
    """Classification of generation failures for retry routing."""
    TRANSIENT = "transient"          # 429, 500, 503, timeout — auto-retry with backoff
    GATE_MECHANICAL = "gate_mechanical"  # Gate 1 fail — retry with different seed
    GATE_IDENTITY = "gate_identity"      # Gate 2A fail — retry with stronger refs
    GATE_WARDROBE = "gate_wardrobe"      # Gate 2A wardrobe fail — check phase, swap ref
    GATE_VIDEO_DRIFT = "gate_video_drift"  # Gate 3 — flag for review (not auto-reject)
    CONTENT_FILTER = "content_filter"    # Model refused — needs prompt rewrite
    PERMANENT = "permanent"              # Exhausted retries or unfixable
    BUDGET = "budget"                    # Budget exceeded — pause batch


class ShotPhase(str, Enum):
    """Which generation phase a shot is in."""
    PREVIS = "previs"
    KEYFRAME = "keyframe"
    VIDEO = "video"


# ── Configuration ──────────────────────────────────────────────────

@dataclass(frozen=True)
class RetryPolicy:
    """Per-failure-category retry configuration."""
    max_retries: int = 3
    base_backoff_seconds: float = 2.0
    max_backoff_seconds: float = 120.0
    backoff_multiplier: float = 2.0


DEFAULT_RETRY_POLICIES: dict[FailureCategory, RetryPolicy] = {
    FailureCategory.TRANSIENT: RetryPolicy(max_retries=5, base_backoff_seconds=5.0, max_backoff_seconds=120.0),
    FailureCategory.GATE_MECHANICAL: RetryPolicy(max_retries=3, base_backoff_seconds=0.0),
    FailureCategory.GATE_IDENTITY: RetryPolicy(max_retries=2, base_backoff_seconds=0.0),
    FailureCategory.GATE_WARDROBE: RetryPolicy(max_retries=2, base_backoff_seconds=0.0),
    FailureCategory.GATE_VIDEO_DRIFT: RetryPolicy(max_retries=0),  # Don't retry — flag for review
    FailureCategory.CONTENT_FILTER: RetryPolicy(max_retries=0),    # Needs human intervention
    FailureCategory.PERMANENT: RetryPolicy(max_retries=0),
    FailureCategory.BUDGET: RetryPolicy(max_retries=0),
}


@dataclass(frozen=True)
class AutonomyConfig:
    """Thresholds for auto-approve vs flag-for-human."""
    enabled: bool = False
    require_all_gates_pass: bool = True
    require_gate_3: bool = False
    exclude_shot_types: tuple[str, ...] = ("two_shot", "complex")
    max_auto_approve_per_batch: int = 0  # 0 = unlimited when enabled


@dataclass(frozen=True)
class BatchConfig:
    """Configuration for a production batch."""
    project: str
    episode_id: str
    budget_usd: float = 25.0
    max_concurrent: int = 1           # Phase 1: sequential
    poll_interval_seconds: float = 5.0
    autonomy: AutonomyConfig = field(default_factory=AutonomyConfig)
    retry_policies: dict[FailureCategory, RetryPolicy] = field(
        default_factory=lambda: dict(DEFAULT_RETRY_POLICIES)
    )
    shot_filter: Optional[list[str]] = None   # If set, only process these shot_ids
    max_attempts_per_shot: int = 5


# ── Runtime State ──────────────────────────────────────────────────

@dataclass
class BatchState:
    """Mutable runtime state for a production batch."""
    batch_id: str = ""
    config: BatchConfig = None
    status: BatchStatus = BatchStatus.CREATED
    started_at: float = 0.0
    completed_at: float = 0.0
    total_cost: float = 0.0
    shots_completed: int = 0
    shots_failed: int = 0
    shots_pending: int = 0
    shots_in_review: int = 0
    auto_approved: int = 0
    error_message: Optional[str] = None

    def to_dict(self) -> dict:
        return {
            "batch_id": self.batch_id,
            "project": self.config.project if self.config else "",
            "episode_id": self.config.episode_id if self.config else "",
            "status": self.status.value,
            "started_at": self.started_at,
            "completed_at": self.completed_at,
            "total_cost": round(self.total_cost, 4),
            "shots_completed": self.shots_completed,
            "shots_failed": self.shots_failed,
            "shots_pending": self.shots_pending,
            "shots_in_review": self.shots_in_review,
            "auto_approved": self.auto_approved,
            "error_message": self.error_message,
        }


@dataclass
class RetryRequest:
    """A queued retry with backoff and fix instructions."""
    shot_id: str
    failure_category: FailureCategory
    attempt_number: int
    retry_at: float                    # timestamp — don't retry before this
    fix_suggestion: Optional[dict] = None  # From HealerAgent
    error_message: Optional[str] = None
    original_model: Optional[str] = None

    @property
    def ready(self) -> bool:
        return time.time() >= self.retry_at


@dataclass
class ProvenanceRecord:
    """Full reproduction recipe for a single take."""
    take_id: str           # e.g. "EP001_SH012_T3"
    shot_id: str           # e.g. "EP001_SH012"
    episode_id: str
    project: str
    attempt: int
    timestamp: float = field(default_factory=time.time)

    # Generation parameters
    phase: str = ""         # "previs", "keyframe", "video"
    model: str = ""
    endpoint: str = ""
    prompt: str = ""
    negative_prompt: str = ""
    seed: Optional[int] = None
    params: dict = field(default_factory=dict)
    refs_used: list[dict] = field(default_factory=list)

    # Video-specific
    video_model: Optional[str] = None
    video_prompt: Optional[str] = None
    video_duration: Optional[int] = None
    start_frame_path: Optional[str] = None

    # Gate results
    gates: dict[str, dict] = field(default_factory=dict)

    # Cost
    cost: dict[str, float] = field(default_factory=dict)

    # Human review
    human_review: Optional[dict] = None

    # Lineage
    parent_take: Optional[str] = None
    change_reason: Optional[str] = None
    prompt_diff: Optional[str] = None

    def to_dict(self) -> dict:
        d = {
            "take_id": self.take_id,
            "shot_id": self.shot_id,
            "episode_id": self.episode_id,
            "project": self.project,
            "attempt": self.attempt,
            "timestamp": self.timestamp,
            "generation": {
                "phase": self.phase,
                "model": self.model,
                "endpoint": self.endpoint,
                "prompt": self.prompt,
                "negative_prompt": self.negative_prompt,
                "seed": self.seed,
                "params": self.params,
                "refs_used": self.refs_used,
            },
            "gates": self.gates,
            "cost": self.cost,
        }
        if self.video_model:
            d["video"] = {
                "model": self.video_model,
                "prompt": self.video_prompt,
                "duration": self.video_duration,
                "start_frame": self.start_frame_path,
            }
        if self.human_review:
            d["human_review"] = self.human_review
        if self.parent_take:
            d["lineage"] = {
                "parent_take": self.parent_take,
                "change_reason": self.change_reason,
                "prompt_diff": self.prompt_diff,
            }
        return d
```

### Validation

```bash
cd /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil && \
python3 -c "import ast; ast.parse(open('pipeline/orchestrator/production_types.py').read())" && \
grep -q 'class BatchConfig' pipeline/orchestrator/production_types.py && \
grep -q 'class BatchState' pipeline/orchestrator/production_types.py && \
grep -q 'class RetryRequest' pipeline/orchestrator/production_types.py && \
grep -q 'class ProvenanceRecord' pipeline/orchestrator/production_types.py && \
grep -q 'class AutonomyConfig' pipeline/orchestrator/production_types.py && \
grep -q 'class FailureCategory' pipeline/orchestrator/production_types.py && \
grep -q 'DEFAULT_RETRY_POLICIES' pipeline/orchestrator/production_types.py && \
echo "Phase 1 OK"
```

---

## Phase 2: Autonomy Controller

### Files to create
- `pipeline/orchestrator/autonomy_controller.py`

### What already exists (from prior phases)
- Phase 1 created `production_types.py` with `AutonomyConfig`, `BatchConfig`

### Scope boundary
- Do NOT modify any existing files
- Do NOT add new status values to ExecutionStore's state machine
- Only reads from ExecutionStore, never writes

### Exact implementation

**Create `pipeline/orchestrator/autonomy_controller.py`:**

```python
"""
autonomy_controller.py — Decides auto-approve vs flag-for-human.

Phase 1: disabled (all shots go to human review).
Phase 2: conservative thresholds.
Phase 3: widen based on learned patterns from LearningEngine.

The controller never writes to the store — it returns a decision that the
production loop acts on. This keeps the decision surface testable.
"""

import logging
from typing import Optional

from orchestrator.production_types import AutonomyConfig

logger = logging.getLogger(__name__)


class AutonomyController:
    """Evaluates whether a passed shot can be auto-approved.

    Thread-safe: no mutable shared state. Config is frozen.
    """

    def __init__(self, config: AutonomyConfig):
        self._config = config
        self._auto_approved_count = 0

    @property
    def config(self) -> AutonomyConfig:
        return self._config

    def can_auto_approve(
        self,
        shot_data: dict,
        gate_results: dict,
        deferred: bool = False,
    ) -> bool:
        """Decide if a shot can skip human review.

        Args:
            shot_data: Full shot record from ExecutionStore.
            gate_results: Merged gate results dict from the shot.
            deferred: Whether any gate returned DEFERRED (e.g. Gate 3).

        Returns:
            True if the shot can be auto-approved, False if human review needed.
        """
        if not self._config.enabled:
            return False

        # Budget cap on auto-approvals per batch
        if (self._config.max_auto_approve_per_batch > 0
                and self._auto_approved_count >= self._config.max_auto_approve_per_batch):
            logger.info(
                "Auto-approve cap reached (%d/%d)",
                self._auto_approved_count,
                self._config.max_auto_approve_per_batch,
            )
            return False

        # Deferred shots always need human review
        if deferred:
            logger.info("Shot %s: deferred → needs review", shot_data.get("shot_id", "?"))
            return False

        # Gate 3 requirement
        if self._config.require_gate_3:
            g3 = gate_results.get("gate_3", {})
            if not g3 or g3.get("deferred", False):
                return False

        # All gates must pass
        if self._config.require_all_gates_pass:
            for gate_name, result in gate_results.items():
                if gate_name.startswith("gate_") and isinstance(result, dict):
                    if not result.get("passed", False):
                        return False

        # Exclude complex shot types
        pipeline = shot_data.get("pipeline", "")
        if pipeline in self._config.exclude_shot_types:
            return False

        return True

    def record_approval(self) -> None:
        """Called by the production loop after auto-approving a shot."""
        self._auto_approved_count += 1

    def reset_counts(self) -> None:
        """Reset per-batch counters. Called at batch start."""
        self._auto_approved_count = 0
```

### Validation

```bash
cd /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil && \
python3 -c "import ast; ast.parse(open('pipeline/orchestrator/autonomy_controller.py').read())" && \
grep -q 'class AutonomyController' pipeline/orchestrator/autonomy_controller.py && \
grep -q 'def can_auto_approve' pipeline/orchestrator/autonomy_controller.py && \
grep -q 'def record_approval' pipeline/orchestrator/autonomy_controller.py && \
echo "Phase 2 OK"
```

---

## Phase 3: Take Provenance Writer

### Files to create
- `pipeline/orchestrator/take_provenance.py`

### What already exists (from prior phases)
- Phase 1 created `production_types.py` with `ProvenanceRecord`

### Scope boundary
- Do NOT modify existing take records in ExecutionStore — provenance is APPENDED alongside
- Do NOT change the file naming convention (existing `SH012_T3_keyframe.png` stays)
- Provenance is read-only from the orchestrator's perspective after writing

### Exact implementation

**Create `pipeline/orchestrator/take_provenance.py`:**

```python
"""
take_provenance.py — Full reproduction recipe per take.

Provenance records are stored both:
1. In-line on the take record in ExecutionStore (as a 'provenance' key)
2. As a batch manifest JSON at projects/{project}/output/manifests/

The in-line record enables per-shot reproduction from the Console.
The manifest enables batch-level analysis and editorial handoff.
"""

import hashlib
import json
import logging
import os
import tempfile
import time
from pathlib import Path
from typing import Optional

from orchestrator.production_types import ProvenanceRecord

logger = logging.getLogger(__name__)


def _file_hash(path: Path) -> str:
    """SHA-256 hash of a file for provenance tracking."""
    if not path.exists():
        return ""
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return f"sha256:{h.hexdigest()[:16]}"


def _ref_entry(role: str, path: Path) -> dict:
    """Build a reference image provenance entry."""
    return {
        "role": role,
        "path": str(path),
        "hash": _file_hash(path),
    }


class ProvenanceWriter:
    """Writes take provenance records to ExecutionStore and manifest files.

    Thread-safe: all writes go through ExecutionStore's lock or atomic file ops.
    """

    def __init__(self, project: str, output_root: Path):
        """
        Args:
            project: Project name (e.g. "tartarus").
            output_root: Project output root (projects/{project}/output/).
        """
        self.project = project
        self._output_root = output_root
        self._manifests_dir = output_root / "manifests"
        self._manifests_dir.mkdir(parents=True, exist_ok=True)

    def build_record(
        self,
        shot_id: str,
        episode_id: str,
        take_number: int,
        step_result,
        prompt: str = "",
        negative_prompt: str = "",
        model: str = "",
        phase: str = "keyframe",
        refs_used: Optional[list[dict]] = None,
        params: Optional[dict] = None,
        gate_results: Optional[dict] = None,
        parent_take: Optional[str] = None,
        change_reason: Optional[str] = None,
        inputs_snapshot: Optional[dict] = None,
    ) -> ProvenanceRecord:
        """Build a ProvenanceRecord from a StepResult and generation context.

        Args:
            shot_id: Shot identifier.
            episode_id: Episode identifier.
            take_number: Take number for this shot.
            step_result: StepResult from StepRunner.
            prompt: Full generation prompt.
            negative_prompt: Negative prompt if used.
            model: Model ID used.
            phase: Generation phase (previs/keyframe/video).
            refs_used: List of ref dicts [{role, path, hash}].
            params: Model-specific generation params.
            gate_results: Gate verdict results dict.
            parent_take: Parent take ID for retries/re-runs.
            change_reason: Why this take was generated (retry reason).
            inputs_snapshot: Full inputs snapshot from StepRunner.
        """
        take_id = f"{shot_id}_T{take_number}"

        # Extract cost from step_result
        cost_dict = {"total": step_result.cost_usd}
        if phase == "video":
            cost_dict["video"] = step_result.cost_usd
        else:
            cost_dict[phase] = step_result.cost_usd
        if step_result.gate_verdict:
            cost_dict["gates"] = step_result.gate_verdict.cost

        # Build gate results from step_result
        gates = {}
        if gate_results:
            gates = gate_results
        elif step_result.gate_verdict:
            gv = step_result.gate_verdict
            gates[gv.gate_name] = {
                "passed": gv.passed,
                "reason": gv.reason,
                "cost": gv.cost,
                "deferred": gv.deferred,
            }

        record = ProvenanceRecord(
            take_id=take_id,
            shot_id=shot_id,
            episode_id=episode_id,
            project=self.project,
            attempt=take_number,
            phase=phase,
            model=model or step_result.model,
            prompt=prompt,
            negative_prompt=negative_prompt,
            params=params or {},
            refs_used=refs_used or [],
            gates=gates,
            cost=cost_dict,
            parent_take=parent_take,
            change_reason=change_reason,
        )

        # Enrich from inputs_snapshot if available
        if inputs_snapshot:
            if not record.refs_used and "refs_sent" in inputs_snapshot:
                record.refs_used = [
                    {"role": r.get("type", ""), "path": r.get("url", ""), "hash": ""}
                    for r in inputs_snapshot.get("refs_sent", [])
                    if r.get("sent_to_model")
                ]
            if not record.prompt and "prompt" in inputs_snapshot:
                record.prompt = inputs_snapshot["prompt"]

        return record

    def write_to_store(self, store, record: ProvenanceRecord) -> None:
        """Append provenance to the shot's take record in ExecutionStore.

        Finds the matching take by take_number and adds a 'provenance' key.
        """
        shot = store.get_shot(record.shot_id)
        if not shot:
            logger.warning("Cannot write provenance: shot %s not found", record.shot_id)
            return

        takes = shot.get("takes", [])
        take_num = record.attempt

        # Find the matching take and enrich it
        for take in takes:
            if take.get("take_number") == take_num:
                take["provenance"] = record.to_dict()
                break
        else:
            logger.warning(
                "Take %d not found for shot %s — appending provenance as new take",
                take_num, record.shot_id,
            )
            takes.append({"take_number": take_num, "provenance": record.to_dict()})

        store.update_shot(record.shot_id, takes=takes)

    def write_manifest(self, episode_id: str, records: list[ProvenanceRecord]) -> Path:
        """Write a batch manifest JSON with all provenance records for an episode.

        Returns path to the written manifest file.
        """
        manifest_path = self._manifests_dir / f"{episode_id}_manifest.json"

        manifest = {
            "episode_id": episode_id,
            "project": self.project,
            "generated_at": time.time(),
            "total_takes": len(records),
            "total_cost": round(sum(r.cost.get("total", 0) for r in records), 4),
            "takes": [r.to_dict() for r in records],
        }

        # Atomic write
        fd, tmp = tempfile.mkstemp(
            dir=str(self._manifests_dir), prefix=".manifest_", suffix=".tmp"
        )
        try:
            with os.fdopen(fd, "w", encoding="utf-8") as f:
                json.dump(manifest, f, indent=2, default=str)
            os.replace(tmp, str(manifest_path))
        except Exception:
            try:
                os.unlink(tmp)
            except OSError:
                pass
            raise

        logger.info("Manifest written: %s (%d takes)", manifest_path, len(records))
        return manifest_path

    def create_selects_symlinks(self, episode_id: str, store) -> int:
        """Create symlinks in selects/ for approved takes.

        Returns number of symlinks created.
        """
        selects_dir = self._output_root / "selects" / episode_id
        selects_dir.mkdir(parents=True, exist_ok=True)

        count = 0
        shots = store.get_shots_by_episode(episode_id)
        for shot in shots:
            if shot.get("status") not in ("approved", "video_complete"):
                continue
            output_path = shot.get("output_path")
            if not output_path:
                continue

            # Resolve to absolute path
            from core.paths import PROJECTS_ROOT
            abs_path = PROJECTS_ROOT / self.project / output_path
            if not abs_path.exists():
                continue

            link_path = selects_dir / abs_path.name
            if link_path.exists() or link_path.is_symlink():
                link_path.unlink()
            link_path.symlink_to(abs_path)
            count += 1

        return count
```

### Validation

```bash
cd /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil && \
python3 -c "import ast; ast.parse(open('pipeline/orchestrator/take_provenance.py').read())" && \
grep -q 'class ProvenanceWriter' pipeline/orchestrator/take_provenance.py && \
grep -q 'def build_record' pipeline/orchestrator/take_provenance.py && \
grep -q 'def write_to_store' pipeline/orchestrator/take_provenance.py && \
grep -q 'def write_manifest' pipeline/orchestrator/take_provenance.py && \
grep -q 'def create_selects_symlinks' pipeline/orchestrator/take_provenance.py && \
echo "Phase 3 OK"
```

---

## Phase 4: Retry Dispatcher

### Files to create
- `pipeline/orchestrator/retry_dispatcher.py`

### What already exists (from prior phases)
- Phase 1 created `production_types.py` with `FailureCategory`, `RetryPolicy`, `RetryRequest`, `DEFAULT_RETRY_POLICIES`

### Scope boundary
- Do NOT modify StepRunner's internal retry logic (gate retries stay in StepRunner)
- Do NOT call HealerAgent directly — accept fix suggestions as dicts
- The dispatcher queues retries; the production loop executes them

### Exact implementation

**Create `pipeline/orchestrator/retry_dispatcher.py`:**

```python
"""
retry_dispatcher.py — Categorize failures and queue retries with backoff.

The dispatcher does NOT execute retries. It:
1. Classifies a StepResult failure into a FailureCategory
2. Checks the retry budget (per-shot attempt count vs policy max)
3. Computes the next retry time (with backoff)
4. Returns a RetryRequest or None (permanent failure)

The production loop calls dispatch() after a failed step, then processes
the retry queue on each loop iteration.
"""

import logging
import time
from collections import defaultdict
from typing import Optional

from orchestrator.production_types import (
    FailureCategory,
    RetryPolicy,
    RetryRequest,
    DEFAULT_RETRY_POLICIES,
)

logger = logging.getLogger(__name__)

# Keywords for failure classification
_TRANSIENT_PATTERNS = ("429", "rate limit", "503", "502", "500", "timeout", "connection", "ECONNRESET")
_CONTENT_FILTER_PATTERNS = ("content filter", "safety", "blocked", "policy", "refused")
_IDENTITY_PATTERNS = ("identity", "drift", "face", "different person", "wrong character")
_WARDROBE_PATTERNS = ("wardrobe", "costume", "clothing", "outfit", "phase")
_MECHANICAL_PATTERNS = ("artifact", "finger", "limb", "hand", "anatomy", "merge", "distort")


def classify_failure(
    step_result,
    shot_data: Optional[dict] = None,
) -> FailureCategory:
    """Classify a failed StepResult into a FailureCategory.

    Args:
        step_result: Failed StepResult from StepRunner.
        shot_data: Optional shot record from ExecutionStore for context.

    Returns:
        The appropriate FailureCategory.
    """
    error = (step_result.error or "").lower()
    final_state = step_result.final_state or ""

    # Check for budget exhaustion
    if "budget" in error:
        return FailureCategory.BUDGET

    # Transient API errors
    for pattern in _TRANSIENT_PATTERNS:
        if pattern in error:
            return FailureCategory.TRANSIENT

    # Content filter
    for pattern in _CONTENT_FILTER_PATTERNS:
        if pattern in error:
            return FailureCategory.CONTENT_FILTER

    # Gate-based failures (use final_state from ExecutionStore)
    if "mechanical" in final_state:
        return FailureCategory.GATE_MECHANICAL

    if "semantic" in final_state:
        # Distinguish identity vs wardrobe from gate details
        gv = step_result.gate_verdict
        if gv and gv.details:
            mismatches = gv.details.get("mismatches", [])
            for m in mismatches:
                cat = m.get("category", "").lower()
                evidence = m.get("visual_evidence", "").lower()
                if any(p in cat or p in evidence for p in _WARDROBE_PATTERNS):
                    return FailureCategory.GATE_WARDROBE
                if any(p in cat or p in evidence for p in _IDENTITY_PATTERNS):
                    return FailureCategory.GATE_IDENTITY

        # Check error string for identity/wardrobe keywords
        for pattern in _WARDROBE_PATTERNS:
            if pattern in error:
                return FailureCategory.GATE_WARDROBE
        for pattern in _IDENTITY_PATTERNS:
            if pattern in error:
                return FailureCategory.GATE_IDENTITY

        return FailureCategory.GATE_IDENTITY  # Default semantic → identity

    # Video drift
    if step_result.gate_verdict and step_result.gate_verdict.deferred:
        return FailureCategory.GATE_VIDEO_DRIFT

    # Default: permanent
    return FailureCategory.PERMANENT


class RetryDispatcher:
    """Queues and manages retries with per-category backoff.

    Not thread-safe — designed for single-threaded production loop.
    """

    def __init__(
        self,
        policies: Optional[dict[FailureCategory, RetryPolicy]] = None,
    ):
        self._policies = policies or dict(DEFAULT_RETRY_POLICIES)
        self._queue: list[RetryRequest] = []
        self._attempt_counts: dict[str, dict[FailureCategory, int]] = defaultdict(
            lambda: defaultdict(int)
        )

    @property
    def queue_size(self) -> int:
        return len(self._queue)

    @property
    def queue(self) -> list[RetryRequest]:
        return list(self._queue)

    def dispatch(
        self,
        shot_id: str,
        step_result,
        shot_data: Optional[dict] = None,
        fix_suggestion: Optional[dict] = None,
    ) -> Optional[RetryRequest]:
        """Classify failure and queue a retry, or return None if permanent.

        Args:
            shot_id: Shot identifier.
            step_result: Failed StepResult.
            shot_data: Full shot record for context.
            fix_suggestion: Optional fix from HealerAgent.

        Returns:
            RetryRequest if retry queued, None if failure is permanent.
        """
        category = classify_failure(step_result, shot_data)
        policy = self._policies.get(category, RetryPolicy(max_retries=0))

        # Check retry budget
        attempts = self._attempt_counts[shot_id][category]
        if attempts >= policy.max_retries:
            logger.info(
                "Shot %s: %s retry budget exhausted (%d/%d) → permanent",
                shot_id, category.value, attempts, policy.max_retries,
            )
            return None

        # Compute backoff
        backoff = min(
            policy.base_backoff_seconds * (policy.backoff_multiplier ** attempts),
            policy.max_backoff_seconds,
        )
        retry_at = time.time() + backoff

        # Track attempt
        self._attempt_counts[shot_id][category] += 1
        total_attempts = sum(self._attempt_counts[shot_id].values())

        request = RetryRequest(
            shot_id=shot_id,
            failure_category=category,
            attempt_number=total_attempts,
            retry_at=retry_at,
            fix_suggestion=fix_suggestion,
            error_message=step_result.error,
            original_model=step_result.model,
        )
        self._queue.append(request)

        logger.info(
            "Shot %s: queued %s retry #%d (backoff %.1fs, total attempts %d)",
            shot_id, category.value, attempts + 1, backoff, total_attempts,
        )
        return request

    def get_ready(self) -> list[RetryRequest]:
        """Return all retry requests that are past their backoff time.

        Removes returned requests from the queue.
        """
        now = time.time()
        ready = [r for r in self._queue if r.ready]
        self._queue = [r for r in self._queue if not r.ready]
        return ready

    def cancel(self, shot_id: str) -> int:
        """Cancel all pending retries for a shot. Returns count removed."""
        before = len(self._queue)
        self._queue = [r for r in self._queue if r.shot_id != shot_id]
        removed = before - len(self._queue)
        if removed:
            logger.info("Cancelled %d retries for shot %s", removed, shot_id)
        return removed

    def total_attempts(self, shot_id: str) -> int:
        """Total retry attempts across all categories for a shot."""
        return sum(self._attempt_counts.get(shot_id, {}).values())

    def reset(self) -> None:
        """Clear all queued retries and attempt counts."""
        self._queue.clear()
        self._attempt_counts.clear()
```

### Validation

```bash
cd /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil && \
python3 -c "import ast; ast.parse(open('pipeline/orchestrator/retry_dispatcher.py').read())" && \
grep -q 'class RetryDispatcher' pipeline/orchestrator/retry_dispatcher.py && \
grep -q 'def classify_failure' pipeline/orchestrator/retry_dispatcher.py && \
grep -q 'def dispatch' pipeline/orchestrator/retry_dispatcher.py && \
grep -q 'def get_ready' pipeline/orchestrator/retry_dispatcher.py && \
echo "Phase 4 OK"
```

---

## Phase 5: Batch Manager

### Files to create
- `pipeline/orchestrator/batch_manager.py`

### What already exists (from prior phases)
- Phase 1: `production_types.py` with `BatchConfig`, `BatchState`, `BatchStatus`

### Scope boundary
- Do NOT modify ExecutionStore — only read from it
- Do NOT implement shot execution — only track batch progress
- Batch state is written to JSON for crash recovery

### Exact implementation

**Create `pipeline/orchestrator/batch_manager.py`:**

```python
"""
batch_manager.py — Queue episodes, track batch progress, termination criteria.

A batch represents "generate all shots for episode N" or "re-run these specific
takes." The BatchManager:
- Initializes batch state from ExecutionStore
- Tracks what succeeded, failed, is pending, is in-flight
- Monitors total cost vs budget
- Determines termination (all complete, budget exhausted, manual pause)
- Persists state to JSON for crash recovery
"""

import json
import logging
import os
import tempfile
import time
import uuid
from pathlib import Path
from typing import Optional

from orchestrator.production_types import BatchConfig, BatchState, BatchStatus

logger = logging.getLogger(__name__)


# ── Actionable statuses (shots the loop should process) ────────────
ACTIONABLE_STATUSES = frozenset({
    # Previs layer
    "previs_pending",
    "previs_failed",
    # Keyframe layer
    "keyframe_pending",
    "keyframe_mechanical_failed",
    # Video layer
    "video_pending",
    "video_failed",
    "video_mechanical_failed",
    # General
    "failed",
})

# Terminal statuses (no more work needed)
TERMINAL_STATUSES = frozenset({
    "approved",
    "rejected",
    "abandoned",
    "skipped",
    "video_complete",
})

# Statuses awaiting human review
REVIEW_STATUSES = frozenset({
    "previs_generated",
    "keyframe_generated",
    "keyframe_semantic_failed",
    "video_semantic_failed",
    "video_ready",
})


class BatchManager:
    """Tracks batch-level progress and termination.

    Not thread-safe — designed for single-threaded production loop.
    """

    def __init__(
        self,
        config: BatchConfig,
        store,
        state_dir: Optional[Path] = None,
    ):
        """
        Args:
            config: Batch configuration.
            store: ExecutionStore instance.
            state_dir: Directory for batch state JSON. Defaults to
                projects/{project}/state/visual/batches/
        """
        self._config = config
        self._store = store

        if state_dir:
            self._state_dir = state_dir
        else:
            from core.paths import PROJECTS_ROOT
            self._state_dir = (
                PROJECTS_ROOT / config.project / "state" / "visual" / "batches"
            )
        self._state_dir.mkdir(parents=True, exist_ok=True)

        # Initialize or recover state
        self._state = self._recover_or_create()

    @property
    def state(self) -> BatchState:
        return self._state

    @property
    def config(self) -> BatchConfig:
        return self._config

    def _recover_or_create(self) -> BatchState:
        """Look for an existing active batch state file, or create new."""
        # Check for existing active batch
        for path in self._state_dir.glob("batch_*.json"):
            try:
                data = json.loads(path.read_text(encoding="utf-8"))
                if data.get("status") in (BatchStatus.RUNNING.value, BatchStatus.PAUSED.value):
                    if (data.get("project") == self._config.project
                            and data.get("episode_id") == self._config.episode_id):
                        logger.info("Recovering batch %s from %s", data["batch_id"], path)
                        state = BatchState(
                            batch_id=data["batch_id"],
                            config=self._config,
                            status=BatchStatus(data["status"]),
                            started_at=data.get("started_at", 0),
                            total_cost=data.get("total_cost", 0),
                            shots_completed=data.get("shots_completed", 0),
                            shots_failed=data.get("shots_failed", 0),
                            auto_approved=data.get("auto_approved", 0),
                        )
                        return state
            except (json.JSONDecodeError, KeyError, OSError) as e:
                logger.warning("Skipping corrupt batch state %s: %s", path, e)

        # Create new batch
        batch_id = f"batch_{self._config.episode_id}_{uuid.uuid4().hex[:8]}"
        return BatchState(
            batch_id=batch_id,
            config=self._config,
            status=BatchStatus.CREATED,
        )

    def start(self) -> None:
        """Mark batch as running."""
        self._state.status = BatchStatus.RUNNING
        self._state.started_at = time.time()
        self._save_state()

    def refresh_counts(self) -> None:
        """Refresh shot counts from ExecutionStore."""
        shots = self._store.get_shots_by_episode(self._config.episode_id)

        # Apply shot filter if configured
        if self._config.shot_filter:
            filter_set = set(self._config.shot_filter)
            shots = [s for s in shots if s.get("shot_id") in filter_set]

        pending = 0
        completed = 0
        failed = 0
        in_review = 0
        total_cost = 0.0

        for shot in shots:
            status = shot.get("status", "previs_pending")
            cost = shot.get("cost_incurred", 0) or 0
            total_cost += cost

            if status in ACTIONABLE_STATUSES:
                pending += 1
            elif status in TERMINAL_STATUSES:
                completed += 1
            elif status in REVIEW_STATUSES:
                in_review += 1
            elif status in ("previs_generating", "keyframe_generating",
                           "video_submitted", "video_processing", "video_downloading"):
                pending += 1  # In-flight counts as pending
            else:
                # Unknown or transition state — count as pending
                pending += 1

        # Count permanent failures separately
        permanent = [s for s in shots if s.get("status") == "abandoned"]
        failed = len(permanent)

        self._state.shots_pending = pending
        self._state.shots_completed = completed
        self._state.shots_failed = failed
        self._state.shots_in_review = in_review
        self._state.total_cost = total_cost

    def get_actionable_shots(self, max_batch: int = 10) -> list[dict]:
        """Get shots ready for processing, ordered by priority.

        Priority: retries first, then pending previs, then keyframe, then video.
        """
        shots = self._store.get_shots_by_episode(self._config.episode_id)

        # Apply shot filter
        if self._config.shot_filter:
            filter_set = set(self._config.shot_filter)
            shots = [s for s in shots if s.get("shot_id") in filter_set]

        # Filter to actionable
        actionable = [s for s in shots if s.get("status") in ACTIONABLE_STATUSES]

        # Filter by max attempts
        actionable = [
            s for s in actionable
            if (s.get("attempts", 0) or 0) < self._config.max_attempts_per_shot
        ]

        # Sort by priority: failed retries first, then by pipeline phase
        def _priority(shot):
            status = shot.get("status", "")
            if "failed" in status:
                return (0, shot.get("shot_id", ""))
            if "previs" in status:
                return (1, shot.get("shot_id", ""))
            if "keyframe" in status:
                return (2, shot.get("shot_id", ""))
            if "video" in status:
                return (3, shot.get("shot_id", ""))
            return (4, shot.get("shot_id", ""))

        actionable.sort(key=_priority)
        return actionable[:max_batch]

    def is_budget_exhausted(self) -> bool:
        """Check if batch cost exceeds budget."""
        return self._state.total_cost >= self._config.budget_usd

    def is_complete(self) -> bool:
        """Check if all shots are in terminal or review states (no more actionable)."""
        self.refresh_counts()
        return self._state.shots_pending == 0

    def check_termination(self) -> Optional[str]:
        """Check all termination criteria. Returns reason string or None.

        Call this each loop iteration. Returns:
        - "budget_exhausted" if cost >= budget
        - "complete" if no actionable shots remain
        - "paused" if status was set to PAUSED externally
        - None if batch should continue
        """
        if self._state.status == BatchStatus.PAUSED:
            return "paused"

        self.refresh_counts()

        if self.is_budget_exhausted():
            self._state.status = BatchStatus.BUDGET_EXHAUSTED
            self._save_state()
            return "budget_exhausted"

        if self.is_complete():
            self._state.status = BatchStatus.COMPLETE
            self._state.completed_at = time.time()
            self._save_state()
            return "complete"

        return None

    def pause(self, reason: str = "") -> None:
        """Pause the batch. Production loop should stop processing."""
        self._state.status = BatchStatus.PAUSED
        self._state.error_message = reason
        self._save_state()
        logger.info("Batch %s paused: %s", self._state.batch_id, reason)

    def record_completion(self, shot_id: str, auto_approved: bool = False) -> None:
        """Record a shot completion."""
        self._state.shots_completed += 1
        if auto_approved:
            self._state.auto_approved += 1
        self._save_state()

    def record_failure(self, shot_id: str) -> None:
        """Record a permanent shot failure."""
        self._state.shots_failed += 1
        self._save_state()

    def summary(self) -> dict:
        """Current batch summary."""
        self.refresh_counts()
        elapsed = time.time() - self._state.started_at if self._state.started_at else 0
        return {
            **self._state.to_dict(),
            "elapsed_seconds": round(elapsed, 1),
            "budget_remaining": round(self._config.budget_usd - self._state.total_cost, 4),
        }

    def _save_state(self) -> None:
        """Persist batch state to JSON for crash recovery."""
        path = self._state_dir / f"{self._state.batch_id}.json"
        content = json.dumps(self._state.to_dict(), indent=2, default=str)

        fd, tmp = tempfile.mkstemp(
            dir=str(self._state_dir), prefix=".batch_", suffix=".tmp"
        )
        try:
            with os.fdopen(fd, "w", encoding="utf-8") as f:
                f.write(content)
            os.replace(tmp, str(path))
        except Exception:
            try:
                os.unlink(tmp)
            except OSError:
                pass
            raise
```

### Validation

```bash
cd /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil && \
python3 -c "import ast; ast.parse(open('pipeline/orchestrator/batch_manager.py').read())" && \
grep -q 'class BatchManager' pipeline/orchestrator/batch_manager.py && \
grep -q 'ACTIONABLE_STATUSES' pipeline/orchestrator/batch_manager.py && \
grep -q 'def get_actionable_shots' pipeline/orchestrator/batch_manager.py && \
grep -q 'def check_termination' pipeline/orchestrator/batch_manager.py && \
grep -q 'def _recover_or_create' pipeline/orchestrator/batch_manager.py && \
echo "Phase 5 OK"
```

---

## Phase 6: Learning Engine

### Files to create
- `pipeline/orchestrator/learning_engine.py`

### What already exists (from prior phases)
- Phase 1: `production_types.py` with `ProvenanceRecord`

### Scope boundary
- Do NOT modify gate thresholds automatically — only produce reports/suggestions
- Do NOT modify any existing files
- Learning data is append-only JSON
- Cross-project insights go to Dropbox shared path (read by all machines)

### Exact implementation

**Create `pipeline/orchestrator/learning_engine.py`:**

```python
"""
learning_engine.py — Pattern detection from gate verdicts and human overrides.

Learns from:
1. Gate verdicts — which models/prompts/shot types produce passing results
2. Human overrides — when JT approves a gate-failed shot (false negative) or
   rejects a gate-passed shot (false positive)
3. Retry patterns — which failure types are transient vs persistent
4. Happy accidents — unscripted takes that JT selected

Produces:
1. Pattern reports (JSON)
2. Gate calibration suggestions
3. Pipeline-learnings.md updates (append to shared Dropbox file)

Storage: projects/{project}/state/learning/ as JSON
Cross-project: ~/Dropbox/Claude_Config/memory/pipeline-learnings-auto.md
"""

import json
import logging
import os
import tempfile
import time
from collections import defaultdict
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)

# Shared insights path (synced via Dropbox across machines)
_SHARED_LEARNINGS_PATH = Path.home() / "Dropbox" / "Claude_Config" / "memory" / "pipeline-learnings-auto.md"


class LearningEngine:
    """Ingests generation results and produces insights.

    Not thread-safe — designed for single-threaded production loop.
    """

    def __init__(self, project: str, state_dir: Optional[Path] = None):
        """
        Args:
            project: Project name.
            state_dir: Learning data directory. Defaults to
                projects/{project}/state/learning/
        """
        self.project = project

        if state_dir:
            self._state_dir = state_dir
        else:
            from core.paths import PROJECTS_ROOT
            self._state_dir = PROJECTS_ROOT / project / "state" / "learning"
        self._state_dir.mkdir(parents=True, exist_ok=True)

        # In-memory accumulators (flushed periodically)
        self._verdicts: list[dict] = []
        self._overrides: list[dict] = []
        self._retries: list[dict] = []

    def ingest_result(
        self,
        shot_id: str,
        step_result,
        shot_data: Optional[dict] = None,
        model: str = "",
        phase: str = "",
        shot_type: str = "",
    ) -> None:
        """Ingest a single generation result for learning.

        Call after every StepRunner execution (success or failure).
        """
        gv = step_result.gate_verdict
        entry = {
            "timestamp": time.time(),
            "project": self.project,
            "shot_id": shot_id,
            "model": model or step_result.model,
            "phase": phase or step_result.pipeline,
            "shot_type": shot_type,
            "success": step_result.success,
            "cost": step_result.cost_usd,
            "gate_name": gv.gate_name if gv else None,
            "gate_passed": gv.passed if gv else None,
            "gate_reason": gv.reason if gv else None,
            "gate_deferred": gv.deferred if gv else None,
            "final_state": step_result.final_state,
        }

        # Extract gate details for pattern analysis
        if gv and gv.details:
            entry["gate_details"] = {
                k: v for k, v in gv.details.items()
                if k in ("total_score", "mismatches", "failure_category",
                         "drift_count", "skipped")
            }

        self._verdicts.append(entry)

        # Auto-flush every 50 entries
        if len(self._verdicts) >= 50:
            self.flush()

    def ingest_override(
        self,
        shot_id: str,
        override_type: str,
        gate_name: str,
        gate_passed: bool,
        human_decision: str,
        notes: str = "",
    ) -> None:
        """Record when a human overrides a gate decision.

        Args:
            override_type: "false_negative" (approved despite gate fail) or
                          "false_positive" (rejected despite gate pass).
            gate_name: Which gate was overridden.
            gate_passed: Whether the gate originally passed.
            human_decision: "approved" or "rejected".
            notes: Human notes on why.
        """
        self._overrides.append({
            "timestamp": time.time(),
            "project": self.project,
            "shot_id": shot_id,
            "override_type": override_type,
            "gate_name": gate_name,
            "gate_passed": gate_passed,
            "human_decision": human_decision,
            "notes": notes,
        })

    def ingest_retry(
        self,
        shot_id: str,
        failure_category: str,
        retry_number: int,
        succeeded: bool,
        fix_applied: Optional[str] = None,
    ) -> None:
        """Record retry outcome for transient vs persistent pattern analysis."""
        self._retries.append({
            "timestamp": time.time(),
            "project": self.project,
            "shot_id": shot_id,
            "failure_category": failure_category,
            "retry_number": retry_number,
            "succeeded": succeeded,
            "fix_applied": fix_applied,
        })

    def flush(self) -> None:
        """Write accumulated data to disk."""
        if self._verdicts:
            self._append_jsonl("verdicts.jsonl", self._verdicts)
            self._verdicts.clear()
        if self._overrides:
            self._append_jsonl("overrides.jsonl", self._overrides)
            self._overrides.clear()
        if self._retries:
            self._append_jsonl("retries.jsonl", self._retries)
            self._retries.clear()

    def generate_report(self) -> dict:
        """Analyze accumulated data and produce a pattern report.

        Returns a dict with pass rates by model, shot type, gate calibration
        suggestions, and retry pattern analysis.
        """
        verdicts = self._load_jsonl("verdicts.jsonl")
        overrides = self._load_jsonl("overrides.jsonl")

        if not verdicts:
            return {"status": "no_data", "message": "No verdicts to analyze yet."}

        # Pass rate by model
        model_stats = defaultdict(lambda: {"pass": 0, "fail": 0, "total": 0, "cost": 0.0})
        for v in verdicts:
            model = v.get("model", "unknown")
            model_stats[model]["total"] += 1
            model_stats[model]["cost"] += v.get("cost", 0)
            if v.get("success"):
                model_stats[model]["pass"] += 1
            else:
                model_stats[model]["fail"] += 1

        model_report = {}
        for model, stats in model_stats.items():
            total = stats["total"]
            model_report[model] = {
                "total": total,
                "pass_rate": round(stats["pass"] / total, 3) if total else 0,
                "total_cost": round(stats["cost"], 4),
            }

        # Pass rate by shot type
        type_stats = defaultdict(lambda: {"pass": 0, "fail": 0, "total": 0})
        for v in verdicts:
            st = v.get("shot_type", "unknown")
            type_stats[st]["total"] += 1
            if v.get("success"):
                type_stats[st]["pass"] += 1
            else:
                type_stats[st]["fail"] += 1

        type_report = {}
        for st, stats in type_stats.items():
            total = stats["total"]
            type_report[st] = {
                "total": total,
                "pass_rate": round(stats["pass"] / total, 3) if total else 0,
            }

        # Gate calibration (false positive/negative rates from overrides)
        gate_cal = defaultdict(lambda: {"false_positive": 0, "false_negative": 0, "total_overrides": 0})
        for o in overrides:
            gate = o.get("gate_name", "unknown")
            gate_cal[gate]["total_overrides"] += 1
            if o.get("override_type") == "false_positive":
                gate_cal[gate]["false_positive"] += 1
            elif o.get("override_type") == "false_negative":
                gate_cal[gate]["false_negative"] += 1

        report = {
            "generated_at": time.time(),
            "project": self.project,
            "total_verdicts": len(verdicts),
            "total_overrides": len(overrides),
            "by_model": model_report,
            "by_shot_type": type_report,
            "gate_calibration": dict(gate_cal),
        }

        # Write report to disk
        report_path = self._state_dir / "latest_report.json"
        self._write_json(report_path, report)

        return report

    def update_shared_learnings(self, findings: list[str]) -> None:
        """Append findings to the shared pipeline-learnings-auto.md.

        This file is synced via Dropbox and visible across all machines.
        Only append truly novel findings (not duplicates of existing entries).
        """
        if not findings:
            return

        _SHARED_LEARNINGS_PATH.parent.mkdir(parents=True, exist_ok=True)

        existing = ""
        if _SHARED_LEARNINGS_PATH.exists():
            existing = _SHARED_LEARNINGS_PATH.read_text(encoding="utf-8")

        # Filter out duplicate findings
        new_findings = [f for f in findings if f not in existing]
        if not new_findings:
            return

        timestamp = time.strftime("%Y-%m-%d %H:%M")
        block = f"\n\n## Auto-learned: {self.project} ({timestamp})\n"
        for f in new_findings:
            block += f"- {f}\n"

        with open(_SHARED_LEARNINGS_PATH, "a", encoding="utf-8") as fh:
            fh.write(block)

        logger.info("Appended %d findings to shared learnings", len(new_findings))

    def _append_jsonl(self, filename: str, entries: list[dict]) -> None:
        """Append entries to a JSONL file."""
        path = self._state_dir / filename
        with open(path, "a", encoding="utf-8") as f:
            for entry in entries:
                f.write(json.dumps(entry, default=str) + "\n")

    def _load_jsonl(self, filename: str) -> list[dict]:
        """Load all entries from a JSONL file."""
        path = self._state_dir / filename
        if not path.exists():
            return []
        entries = []
        for line in path.read_text(encoding="utf-8").splitlines():
            line = line.strip()
            if line:
                try:
                    entries.append(json.loads(line))
                except json.JSONDecodeError:
                    continue
        return entries

    def _write_json(self, path: Path, data: dict) -> None:
        """Atomic JSON write."""
        fd, tmp = tempfile.mkstemp(
            dir=str(path.parent), prefix=".learn_", suffix=".tmp"
        )
        try:
            with os.fdopen(fd, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2, default=str)
            os.replace(tmp, str(path))
        except Exception:
            try:
                os.unlink(tmp)
            except OSError:
                pass
            raise
```

### Validation

```bash
cd /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil && \
python3 -c "import ast; ast.parse(open('pipeline/orchestrator/learning_engine.py').read())" && \
grep -q 'class LearningEngine' pipeline/orchestrator/learning_engine.py && \
grep -q 'def ingest_result' pipeline/orchestrator/learning_engine.py && \
grep -q 'def ingest_override' pipeline/orchestrator/learning_engine.py && \
grep -q 'def generate_report' pipeline/orchestrator/learning_engine.py && \
grep -q 'def update_shared_learnings' pipeline/orchestrator/learning_engine.py && \
echo "Phase 6 OK"
```

---

## Phase 7: Production Loop

### Files to create
- `pipeline/orchestrator/production_loop.py`

### What already exists (from prior phases)
- Phase 1: `production_types.py` — all type definitions
- Phase 2: `autonomy_controller.py` — `AutonomyController.can_auto_approve()`
- Phase 3: `take_provenance.py` — `ProvenanceWriter.build_record()`, `.write_to_store()`, `.write_manifest()`
- Phase 4: `retry_dispatcher.py` — `RetryDispatcher.dispatch()`, `.get_ready()`, `classify_failure()`
- Phase 5: `batch_manager.py` — `BatchManager.get_actionable_shots()`, `.check_termination()`, `ACTIONABLE_STATUSES`
- Phase 6: `learning_engine.py` — `LearningEngine.ingest_result()`, `.flush()`

### Existing codebase contracts
- `ExecutionStore` at `lib/execution_store` (proxy) — `get_shot()`, `update_shot()`, `get_shots_by_episode()`
- `StepRunner` at `orchestrator/step_runner` (proxy) — `execute_keyframe()`, `execute_video()`, `execute_multi_shot()`
- `StepResult` at `orchestrator/step_types` (proxy) — `.success`, `.cost_usd`, `.gate_verdict`, `.final_state`
- `GateVerdict` at `orchestrator/step_types` — `.passed`, `.deferred`, `.gate_name`, `.reason`
- `ProjectPaths` at `orchestrator/step_types` — `.for_episode(project, episode_num)`
- `scene_planner.route_shot()` — returns pipeline type string
- `scene_planner.classify_shot_tier()` — returns tier string
- `HealerAgent` at `execution.healer` — `.diagnose()`
- `make_identity_gate`, `make_video_drift_gate` at `orchestrator.step_runner`

### Scope boundary
- Do NOT modify any existing files
- Do NOT implement PromptPackage compilation — use existing pipeline strategies
- Do NOT add async/concurrent execution (Phase 1 is sequential)
- Do NOT auto-approve shots in Phase 1 (AutonomyConfig.enabled defaults to False)

### Exact implementation

**Create `pipeline/orchestrator/production_loop.py`:**

```python
"""
production_loop.py — Autonomous generation loop for the Starsend visual pipeline.

Sits between the Human Layer (Console, Claude Code) and the Execution Layer
(StepRunner, Gates, HealerAgent). Drives end-to-end: shot plans → generation →
validation → retry → editorial handoff.

Design: synchronous, single-threaded, single-project. Phase 1 runs one shot at
a time. Crash-safe: all state is in ExecutionStore and BatchManager's JSON.

Usage:
    from orchestrator.production_loop import ProductionLoop
    from orchestrator.production_types import BatchConfig

    loop = ProductionLoop(
        config=BatchConfig(project="tartarus", episode_id="EP001", budget_usd=25.0),
    )
    loop.run()
"""

import json
import logging
import re
import time
from pathlib import Path
from typing import Optional

import sys
_RECOIL_ROOT = str(Path(__file__).resolve().parent.parent.parent)
if _RECOIL_ROOT not in sys.path:
    sys.path.insert(0, _RECOIL_ROOT)

from lib.execution_store import ExecutionStore
from orchestrator.step_runner import StepRunner, make_identity_gate, make_video_drift_gate
from orchestrator.step_types import StepResult, ProjectPaths, GateVerdict

from orchestrator.production_types import (
    BatchConfig, BatchState, BatchStatus, FailureCategory,
    AutonomyConfig, ShotPhase,
)
from orchestrator.autonomy_controller import AutonomyController
from orchestrator.take_provenance import ProvenanceWriter
from orchestrator.retry_dispatcher import RetryDispatcher, classify_failure
from orchestrator.batch_manager import BatchManager, ACTIONABLE_STATUSES
from orchestrator.learning_engine import LearningEngine

logger = logging.getLogger(__name__)


def _determine_phase(shot_data: dict) -> ShotPhase:
    """Determine which generation phase a shot needs based on its status."""
    status = shot_data.get("status", "previs_pending")
    if "previs" in status:
        return ShotPhase.PREVIS
    if "keyframe" in status:
        return ShotPhase.KEYFRAME
    if "video" in status:
        return ShotPhase.VIDEO
    # Failed shots — look at what they were attempting
    if status == "failed":
        pipeline = shot_data.get("pipeline")
        if pipeline == "video":
            return ShotPhase.VIDEO
        if pipeline == "keyframe" or pipeline == "still":
            return ShotPhase.KEYFRAME
    return ShotPhase.KEYFRAME  # Default: start with keyframe


def _extract_episode_number(episode_id: str) -> int:
    """Extract episode number from ID like 'EP001' or 'ep_003'."""
    m = re.search(r"(\d+)", episode_id)
    return int(m.group(1)) if m else 1


class ProductionLoop:
    """Main orchestration loop — drives the visual pipeline autonomously.

    Lifecycle:
    1. Load project data (plans, bible, refs)
    2. Initialize subsystems (batch manager, retry dispatcher, etc.)
    3. Enter main loop:
       a. Check termination criteria
       b. Process retry queue
       c. Get actionable shots
       d. Execute each shot via StepRunner
       e. Handle result (advance, retry, or flag)
       f. Record provenance
       g. Feed learning engine
    4. On completion: write manifest, create selects, generate report

    Crash recovery: restart with same BatchConfig — BatchManager recovers state,
    ExecutionStore has all shot state, production loop picks up where it stopped.
    """

    def __init__(
        self,
        config: BatchConfig,
        store: Optional[ExecutionStore] = None,
        step_runner: Optional[StepRunner] = None,
    ):
        self._config = config
        self._project = config.project
        self._episode_id = config.episode_id
        self._episode_num = _extract_episode_number(config.episode_id)

        # Core dependencies
        self._store = store or ExecutionStore(project=self._project)
        self._paths = ProjectPaths.for_episode(self._project, self._episode_num)

        self._step_runner = step_runner or StepRunner(
            store=self._store,
            paths=self._paths,
        )

        # Subsystems
        self._batch = BatchManager(config=config, store=self._store)
        self._retry = RetryDispatcher(policies=config.retry_policies)
        self._autonomy = AutonomyController(config=config.autonomy)
        self._learning = LearningEngine(project=self._project)

        from core.paths import project_output_dir
        output_root = project_output_dir(self._project)
        self._provenance = ProvenanceWriter(
            project=self._project,
            output_root=output_root,
        )

        # Accumulated provenance records for manifest
        self._session_records = []

        # Load project data
        self._plan_data = self._load_plan()
        self._shot_plan_map = {}
        if self._plan_data:
            for shot in self._plan_data.get("shots", []):
                self._shot_plan_map[shot.get("shot_id", "")] = shot

    def _load_plan(self) -> Optional[dict]:
        """Load the episode shot plan."""
        plan_path = self._paths.plans_dir / f"ep_{self._episode_num:03d}_plan.json"
        if not plan_path.exists():
            # Try alternate naming
            plan_path = self._paths.plans_dir / f"{self._episode_id.lower()}_plan.json"
        if not plan_path.exists():
            logger.warning("No shot plan found at %s", plan_path)
            return None
        try:
            return json.loads(plan_path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, OSError) as e:
            logger.error("Failed to load plan: %s", e)
            return None

    def run(self) -> dict:
        """Execute the production loop. Returns batch summary on completion.

        This is the main entry point. Blocks until batch is complete,
        budget is exhausted, or the batch is paused externally.
        """
        logger.info(
            "Production loop starting: project=%s episode=%s budget=$%.2f",
            self._project, self._episode_id, self._config.budget_usd,
        )

        self._batch.start()
        self._autonomy.reset_counts()

        try:
            while True:
                # ── 1. Check termination ──
                reason = self._batch.check_termination()
                if reason:
                    logger.info("Batch terminated: %s", reason)
                    break

                # ── 2. Process retry queue ──
                ready_retries = self._retry.get_ready()
                for retry_req in ready_retries:
                    shot = self._store.get_shot(retry_req.shot_id)
                    if shot:
                        logger.info(
                            "Retrying shot %s (attempt %d, category %s)",
                            retry_req.shot_id, retry_req.attempt_number,
                            retry_req.failure_category.value,
                        )
                        self._execute_shot(shot, retry_request=retry_req)

                # ── 3. Get actionable shots ──
                actionable = self._batch.get_actionable_shots(
                    max_batch=self._config.max_concurrent,
                )
                if not actionable and self._retry.queue_size == 0:
                    # Nothing to do — wait and check again
                    if self._batch.is_complete():
                        break
                    time.sleep(self._config.poll_interval_seconds)
                    continue

                # ── 4. Execute each shot ──
                for shot_data in actionable:
                    shot_id = shot_data.get("shot_id", "")

                    # Skip if already in retry queue
                    if any(r.shot_id == shot_id for r in self._retry.queue):
                        continue

                    self._execute_shot(shot_data)

                    # Budget check after each shot
                    self._batch.refresh_counts()
                    if self._batch.is_budget_exhausted():
                        logger.warning("Budget exhausted mid-batch")
                        break

        except KeyboardInterrupt:
            logger.info("Production loop interrupted by user")
            self._batch.pause("keyboard_interrupt")
        except Exception as e:
            logger.error("Production loop error: %s", e, exc_info=True)
            self._batch.pause(f"error: {e}")
        finally:
            # Flush learning data
            self._learning.flush()

            # Write manifest
            if self._session_records:
                self._provenance.write_manifest(
                    self._episode_id, self._session_records,
                )

        summary = self._batch.summary()
        logger.info(
            "Production loop complete: %d completed, %d failed, %d in review, $%.2f spent",
            summary["shots_completed"], summary["shots_failed"],
            summary["shots_in_review"], summary["total_cost"],
        )
        return summary

    def _execute_shot(
        self,
        shot_data: dict,
        retry_request: Optional = None,
    ) -> None:
        """Execute a single shot through the appropriate pipeline phase.

        Handles the full lifecycle: execute → result handling → provenance → learning.
        """
        shot_id = shot_data.get("shot_id", "")
        phase = _determine_phase(shot_data)
        plan_shot = self._shot_plan_map.get(shot_id, {})

        logger.info("Executing shot %s phase=%s", shot_id, phase.value)

        # Increment attempts
        attempts = (shot_data.get("attempts", 0) or 0) + 1
        self._store.update_shot(shot_id, attempts=attempts)

        try:
            if phase == ShotPhase.KEYFRAME:
                result = self._execute_keyframe(shot_id, plan_shot, shot_data, retry_request)
            elif phase == ShotPhase.VIDEO:
                result = self._execute_video(shot_id, plan_shot, shot_data, retry_request)
            elif phase == ShotPhase.PREVIS:
                # Previs uses same path as keyframe but with Flash model
                result = self._execute_keyframe(
                    shot_id, plan_shot, shot_data, retry_request, previs=True,
                )
            else:
                logger.warning("Unknown phase %s for shot %s", phase, shot_id)
                return
        except Exception as e:
            logger.error("Shot %s execution error: %s", shot_id, e, exc_info=True)
            self._store.update_shot(
                shot_id,
                error_message=str(e),
            )
            return

        if result is None:
            return

        # ── Handle result ──
        self._handle_result(shot_id, result, shot_data, phase, plan_shot, retry_request)

    def _execute_keyframe(
        self,
        shot_id: str,
        plan_shot: dict,
        shot_data: dict,
        retry_request: Optional = None,
        previs: bool = False,
    ) -> Optional[StepResult]:
        """Build keyframe generation request and execute via StepRunner."""
        from lib.model_profiles import get_model

        # Determine model
        if previs:
            model = get_model("primary", "previs")
        else:
            model = get_model("primary", "keyframe")

        # Build prompt from plan data
        prompt = self._build_keyframe_prompt(plan_shot)
        if not prompt:
            logger.warning("Shot %s: no prompt available, skipping", shot_id)
            return None

        # Gather refs
        identity_refs = self._gather_identity_refs(plan_shot)
        expression_refs = self._gather_expression_refs(plan_shot)
        scene_ref = self._get_scene_ref(plan_shot)

        # Build gates
        gates = []
        if identity_refs and not previs:
            gates.append(make_identity_gate(
                ref_paths=identity_refs,
                prompt_skeleton=plan_shot.get("prompt_data", {}).get("prompt_skeleton"),
                wardrobe_phase_id=self._get_wardrobe_phase(plan_shot),
            ))

        return self._step_runner.execute_keyframe(
            shot_id=shot_id,
            prompt=prompt,
            model=model,
            scene_ref_path=scene_ref,
            identity_refs=identity_refs,
            expression_refs=expression_refs,
            aspect_ratio="9:16",
            gates=gates if gates else None,
            max_gate_retries=3,
        )

    def _execute_video(
        self,
        shot_id: str,
        plan_shot: dict,
        shot_data: dict,
        retry_request: Optional = None,
    ) -> Optional[StepResult]:
        """Build video generation request and execute via StepRunner."""
        from lib.model_profiles import get_model

        # Get the keyframe path for I2V
        keyframe_path = self._get_latest_keyframe(shot_id)

        # Determine model from routing data
        routing = plan_shot.get("routing_data", {})
        pipeline_type = routing.get("pipeline", "i2v")
        if pipeline_type == "t2v":
            model = get_model("primary", "t2v")
        else:
            model = get_model("primary", "i2v")

        # Build video prompt
        prompt = self._build_video_prompt(plan_shot)
        if not prompt:
            logger.warning("Shot %s: no video prompt, skipping", shot_id)
            return None

        # Build gates
        gates = []
        identity_refs = self._gather_identity_refs(plan_shot)
        if identity_refs:
            gates.append(make_video_drift_gate(
                ref_paths=identity_refs,
                shot_metadata=plan_shot,
            ))

        # Duration from plan
        duration = routing.get("duration", 5)

        return self._step_runner.execute_video(
            shot_id=shot_id,
            prompt=prompt,
            model=model,
            start_frame=keyframe_path,
            duration=duration,
            aspect_ratio="9:16",
            gates=gates if gates else None,
        )

    def _handle_result(
        self,
        shot_id: str,
        result: StepResult,
        shot_data: dict,
        phase: ShotPhase,
        plan_shot: dict,
        retry_request: Optional = None,
    ) -> None:
        """Process a StepResult: advance, retry, or flag for review."""
        # Feed learning engine
        shot_type = plan_shot.get("prompt_data", {}).get("shot_type", "")
        self._learning.ingest_result(
            shot_id=shot_id,
            step_result=result,
            shot_data=shot_data,
            phase=phase.value,
            shot_type=shot_type,
        )

        if result.success:
            self._handle_success(shot_id, result, shot_data, phase, plan_shot, retry_request)
        else:
            self._handle_failure(shot_id, result, shot_data, phase, retry_request)

    def _handle_success(
        self,
        shot_id: str,
        result: StepResult,
        shot_data: dict,
        phase: ShotPhase,
        plan_shot: dict,
        retry_request: Optional = None,
    ) -> None:
        """Handle a successful generation step."""
        # Record provenance
        record = self._provenance.build_record(
            shot_id=shot_id,
            episode_id=self._episode_id,
            take_number=result.take_index,
            step_result=result,
            model=result.model,
            phase=phase.value,
            parent_take=retry_request.shot_id if retry_request else None,
        )
        self._provenance.write_to_store(self._store, record)
        self._session_records.append(record)

        # Cancel any pending retries for this shot
        self._retry.cancel(shot_id)

        # Check autonomy (auto-approve vs needs_review)
        refreshed = self._store.get_shot(shot_id) or {}
        gate_results = refreshed.get("gate_results", {})
        deferred = refreshed.get("deferred", False)

        if self._autonomy.can_auto_approve(refreshed, gate_results, deferred):
            self._store.update_shot(shot_id, status="approved")
            self._autonomy.record_approval()
            self._batch.record_completion(shot_id, auto_approved=True)
            logger.info("Shot %s: auto-approved", shot_id)
        else:
            # Shot stays in its current state (keyframe_generated, video_complete)
            # waiting for human review via Console
            self._batch.record_completion(shot_id, auto_approved=False)
            logger.info("Shot %s: success → awaiting review", shot_id)

        if retry_request:
            self._learning.ingest_retry(
                shot_id=shot_id,
                failure_category=retry_request.failure_category.value,
                retry_number=retry_request.attempt_number,
                succeeded=True,
            )

    def _handle_failure(
        self,
        shot_id: str,
        result: StepResult,
        shot_data: dict,
        phase: ShotPhase,
        retry_request: Optional = None,
    ) -> None:
        """Handle a failed generation step."""
        # Check if we should retry
        total_attempts = self._retry.total_attempts(shot_id)
        if total_attempts >= self._config.max_attempts_per_shot:
            logger.warning(
                "Shot %s: max attempts (%d) reached → permanent failure",
                shot_id, self._config.max_attempts_per_shot,
            )
            self._store.update_shot(shot_id, status="abandoned")
            self._batch.record_failure(shot_id)
            return

        # Ask retry dispatcher
        retry_req = self._retry.dispatch(
            shot_id=shot_id,
            step_result=result,
            shot_data=shot_data,
        )

        if retry_req:
            logger.info(
                "Shot %s: %s → queued retry #%d (backoff until %.0fs)",
                shot_id, retry_req.failure_category.value,
                retry_req.attempt_number, retry_req.retry_at - time.time(),
            )
            if retry_request:
                self._learning.ingest_retry(
                    shot_id=shot_id,
                    failure_category=retry_request.failure_category.value,
                    retry_number=retry_request.attempt_number,
                    succeeded=False,
                )
        else:
            # Permanent failure
            self._store.update_shot(shot_id, status="abandoned")
            self._batch.record_failure(shot_id)
            logger.warning("Shot %s: permanent failure — %s", shot_id, result.error)

    # ── Helpers: prompt building, ref gathering ──

    def _build_keyframe_prompt(self, plan_shot: dict) -> str:
        """Extract or build keyframe prompt from plan shot data."""
        # Try compiled prompt first
        prompt_data = plan_shot.get("prompt_data", {})
        skeleton = prompt_data.get("prompt_skeleton", {})
        if skeleton.get("compiled_prompt"):
            return skeleton["compiled_prompt"]

        # Build from sections
        sections = []
        for key in ("scene_line", "subject_line", "action_line", "camera_line", "lighting_line"):
            if skeleton.get(key):
                sections.append(skeleton[key])
        if sections:
            return " ".join(sections)

        # Fallback: raw description
        return prompt_data.get("description", "")

    def _build_video_prompt(self, plan_shot: dict) -> str:
        """Build video generation prompt from plan shot data."""
        prompt_data = plan_shot.get("prompt_data", {})
        # Video prompt is typically the action + camera movement
        skeleton = prompt_data.get("prompt_skeleton", {})
        parts = []
        if skeleton.get("action_line"):
            parts.append(skeleton["action_line"])
        if skeleton.get("camera_line"):
            parts.append(skeleton["camera_line"])
        if parts:
            return " ".join(parts)
        return prompt_data.get("description", "")

    def _gather_identity_refs(self, plan_shot: dict) -> list[Path]:
        """Gather character identity reference paths from plan shot data."""
        refs = []
        asset_data = plan_shot.get("asset_data", {})
        for char in asset_data.get("characters", []):
            if isinstance(char, dict):
                ref_path = char.get("hero_ref") or char.get("ref_path")
                if ref_path:
                    p = Path(ref_path)
                    if not p.is_absolute():
                        from core.paths import PROJECTS_ROOT
                        p = PROJECTS_ROOT / self._project / ref_path
                    if p.exists():
                        refs.append(p)
        return refs

    def _gather_expression_refs(self, plan_shot: dict) -> list[Path]:
        """Gather expression reference paths."""
        refs = []
        asset_data = plan_shot.get("asset_data", {})
        for char in asset_data.get("characters", []):
            if isinstance(char, dict):
                expr_refs = char.get("expression_refs", [])
                for ref_path in expr_refs:
                    p = Path(ref_path)
                    if not p.is_absolute():
                        from core.paths import PROJECTS_ROOT
                        p = PROJECTS_ROOT / self._project / ref_path
                    if p.exists():
                        refs.append(p)
        return refs

    def _get_scene_ref(self, plan_shot: dict) -> Optional[Path]:
        """Get scene/location reference path."""
        asset_data = plan_shot.get("asset_data", {})
        location = asset_data.get("location_ref") or asset_data.get("scene_ref")
        if location:
            p = Path(location)
            if not p.is_absolute():
                from core.paths import PROJECTS_ROOT
                p = PROJECTS_ROOT / self._project / location
            if p.exists():
                return p
        return None

    def _get_wardrobe_phase(self, plan_shot: dict) -> Optional[str]:
        """Get wardrobe phase ID for Gate 2 wardrobe checking."""
        asset_data = plan_shot.get("asset_data", {})
        for char in asset_data.get("characters", []):
            if isinstance(char, dict):
                return char.get("wardrobe_phase")
        return None

    def _get_latest_keyframe(self, shot_id: str) -> Optional[Path]:
        """Get the latest approved keyframe path for I2V video generation."""
        shot = self._store.get_shot(shot_id)
        if not shot:
            return None

        # Check for approved keyframe in takes
        takes = shot.get("takes", [])
        for take in reversed(takes):
            if take.get("pipeline") in ("keyframe", "still", "previs"):
                file_path = take.get("file_path")
                if file_path:
                    from core.paths import PROJECTS_ROOT
                    p = PROJECTS_ROOT / self._project / file_path
                    if p.exists():
                        return p

        # Fallback: check output_path on the shot
        output_path = shot.get("output_path")
        if output_path:
            from core.paths import PROJECTS_ROOT
            p = PROJECTS_ROOT / self._project / output_path
            if p.exists() and p.suffix in (".png", ".jpg", ".jpeg", ".webp"):
                return p

        return None
```

### Validation

```bash
cd /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil && \
python3 -c "import ast; ast.parse(open('pipeline/orchestrator/production_loop.py').read())" && \
grep -q 'class ProductionLoop' pipeline/orchestrator/production_loop.py && \
grep -q 'def run' pipeline/orchestrator/production_loop.py && \
grep -q 'def _execute_shot' pipeline/orchestrator/production_loop.py && \
grep -q 'def _handle_result' pipeline/orchestrator/production_loop.py && \
grep -q 'def _handle_success' pipeline/orchestrator/production_loop.py && \
grep -q 'def _handle_failure' pipeline/orchestrator/production_loop.py && \
grep -q 'def _execute_keyframe' pipeline/orchestrator/production_loop.py && \
grep -q 'def _execute_video' pipeline/orchestrator/production_loop.py && \
grep -q 'RetryDispatcher' pipeline/orchestrator/production_loop.py && \
grep -q 'BatchManager' pipeline/orchestrator/production_loop.py && \
grep -q 'AutonomyController' pipeline/orchestrator/production_loop.py && \
grep -q 'ProvenanceWriter' pipeline/orchestrator/production_loop.py && \
grep -q 'LearningEngine' pipeline/orchestrator/production_loop.py && \
echo "Phase 7 OK"
```

---

## Verification Checklist

### Import/export consistency
- Phase 2 imports `AutonomyConfig` from Phase 1 ✓
- Phase 3 imports `ProvenanceRecord` from Phase 1 ✓
- Phase 4 imports `FailureCategory`, `RetryPolicy`, `RetryRequest`, `DEFAULT_RETRY_POLICIES` from Phase 1 ✓
- Phase 5 imports `BatchConfig`, `BatchState`, `BatchStatus` from Phase 1 ✓
- Phase 6 has no cross-phase imports (only stdlib + core.paths) ✓
- Phase 7 imports from ALL prior phases ✓
- Phase 7 imports from existing codebase: `ExecutionStore`, `StepRunner`, `StepResult`, `ProjectPaths`, `make_identity_gate`, `make_video_drift_gate` ✓

### File conflict check
- No two phases create the same file ✓
- No phases modify existing files ✓

### Deliverable coverage
1. ✅ Orchestration loop → Phase 7 (`production_loop.py`)
2. ✅ Batch manager → Phase 5 (`batch_manager.py`)
3. ✅ Retry dispatcher → Phase 4 (`retry_dispatcher.py`)
4. ✅ Take provenance → Phase 3 (`take_provenance.py`)
5. ✅ Learning engine → Phase 6 (`learning_engine.py`)
6. ✅ Autonomy controller → Phase 2 (`autonomy_controller.py`)
7. ✅ Gate 3 completion → Already implemented in `validation.py` (progressive sampling with drift detection). Wired into production loop via `make_video_drift_gate` in Phase 7.

### Scope boundaries
- No phase modifies more than 1 file (all create 1 new file) ✓
- All new files are in `pipeline/orchestrator/` ✓
- All phases are additive (no existing file modifications) ✓
- Phase 7 depends on all prior phases (correct ordering) ✓

---

## Suggested harness command

```bash
/harness BUILD_SPEC_PRODUCTION_ORCHESTRATOR.md --dir /Users/joeturnerlin/Dropbox/CLAUDE_PROJECTS/recoil/pipeline --debug --max-retries 4
```
