"""Tests for retry strategy wiring in EpisodeRunner._dispatch_one_beat.

Validates that the StrategyEngine integration (Phase 2-4) correctly:
- Preserves blind retry behavior when no strategy engine is injected
- Classifies failures and selects strategies on failed Takes
- Applies StrategyDiff mutations to Workflow step payloads
- Stops retries on strategy exhaustion or ESCALATE_TO_HUMAN
- Degrades gracefully when the strategy engine crashes
- Logs provenance events for strategy selection
- Passes RetryCostPolicy through to select_and_apply
"""

import asyncio
import dataclasses
import json
import pytest
from unittest.mock import MagicMock, patch

from recoil.pipeline.core.receipts import GenerationReceipt
from recoil.pipeline.core.registry import RunResult
from recoil.pipeline.core.take import Scene, Beat
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.pipeline.orchestrator.episode_runner import EpisodeRunner
from recoil.pipeline.orchestrator.strategy_registry import (
    StrategyDiff,
    RetryStrategyName,
    StrategyEngine,
)
from recoil.pipeline.orchestrator.coverage_planner import (
    CoveragePass,
)
from recoil.pipeline.orchestrator.production_types import RetryCostPolicy
from recoil.pipeline.orchestrator.learning_engine import LearningEngine


# ── Autouse fixture — isolate from disk I/O and heavy internals ────


@pytest.fixture(autouse=True)
def _isolate(monkeypatch):
    """Mock filesystem ops, cost estimation, and workflow building.

    EpisodeRunner.__init__ calls init_scenes_from_plan (disk I/O).
    _estimate_take_cost and _build_workflow_for_beat construct
    CanonicalShot from beat metadata, which requires fields not
    present in test fixtures. Stub them to keep tests focused on
    the retry strategy wiring.
    """
    monkeypatch.setattr(
        "recoil.pipeline.orchestrator.episode_runner.init_scenes_from_plan",
        lambda *a, **kw: None,
    )
    monkeypatch.setattr(
        "recoil.pipeline.orchestrator.episode_runner.ops_log.write",
        lambda *a, **kw: None,
    )
    monkeypatch.setattr(
        EpisodeRunner,
        "_estimate_take_cost",
        lambda self, beat: 0.0,
    )

    def _stub_wf(self, beat, take_index, **kw):
        return Workflow(
            workflow_id=f"{beat.beat_id}_take_{take_index}",
            steps=[
                WorkflowStep(
                    step_id="video",
                    modality="video_i2v",
                    payload={"shot_id": beat.beat_id},
                )
            ],
            global_provenance={"shot_id": beat.beat_id, "episode": self.episode},
        )

    monkeypatch.setattr(
        EpisodeRunner,
        "_build_workflow_for_beat",
        _stub_wf,
    )


# ── Helpers ────────────────────────────────────────────────────────


def _make_runner(strategy_engine=None, max_takes=3, budget=50.0):
    """Build an EpisodeRunner with a mock step_runner."""
    return EpisodeRunner(
        project="test_proj",
        plan={"sequences": {"seq_01": {"shots": [{"shot_id": "SH01"}]}}},
        max_takes=max_takes,
        budget_usd=budget,
        step_runner=MagicMock(),
        strategy_engine=strategy_engine,
        retry_cost_policy=RetryCostPolicy(max_retry_spend_usd=6.0),
    )


def _make_beat():
    return Beat(
        beat_id="SH01",
        max_takes=3,
        beat_metadata={
            "scene_id": "seq_01",
            "shot": {"shot_id": "SH01", "duration_s": 5, "shot_type": "MS"},
            "modality": "video_i2v",
        },
    )


def _make_scene(beat):
    return Scene(
        scene_id="seq_01",
        beats=[beat],
        scene_metadata={"episode": "ep_001", "project": "test_proj"},
    )


def _fake_strategy_diff(name=RetryStrategyName.CHANGE_SEED, pass_obj=None):
    """Build a minimal StrategyDiff for testing."""
    if pass_obj is None:
        pass_obj = CoveragePass(
            pass_id="test",
            episode_id="ep_001",
            shot_range=("SH01", "SH01"),
            camera_side="A",
            label="test",
            focus_character="",
            pass_type="env",
            generation_config={"seed": 42},
        )
    return StrategyDiff(
        strategy_name=name,
        modified_pass=pass_obj,
        changes={"generation_config.seed": "42"},
        cost_tier="free",
        estimated_cost_usd=1.06,
    )


def _attach_failed_take(
    beat,
    *,
    error="content policy violation: rejected by safety filter",
    prompt=None,
    model="fal/test-video",
):
    payload = {"shot_id": beat.beat_id}
    if prompt is not None:
        payload["prompt"] = prompt
    step = WorkflowStep(
        step_id="video",
        modality="video_i2v",
        payload=payload,
        status="failed",
        receipt=GenerationReceipt(
            receipt_id="rcpt_test",
            modality="video_i2v",
            caller_id="test",
            project="test_proj",
            episode=1,
            shot_id=beat.beat_id,
            timestamp_utc="2026-06-12T00:00:00Z",
            run_result=RunResult(
                id="run_test",
                modality="video_i2v",
                metadata={"model": model},
                success=False,
                error=error,
            ),
        ),
    )
    take = beat.new_take(
        workflow=Workflow(
            workflow_id=f"{beat.beat_id}_take_0",
            steps=[step],
            global_provenance={
                "shot_id": beat.beat_id,
                "episode": "ep_001",
                "project": "test_proj",
                "model": model,
            },
        )
    )
    take.status = "failed"
    return take


def _read_jsonl(path):
    if not path.exists():
        return []
    return [
        json.loads(line)
        for line in path.read_text(encoding="utf-8").splitlines()
    ]


# ── Test 1: No strategy engine preserves blind retry behavior ──────


def test_no_strategy_engine_preserves_blind_retry():
    """Without a strategy_engine, EpisodeRunner retries blindly as
    before — no strategy methods are called."""
    runner = _make_runner(strategy_engine=None, max_takes=2)
    beat = _make_beat()
    beat.max_takes = 2
    scene = _make_scene(beat)

    call_count = 0

    def _always_fail(fn, **kw):
        nonlocal call_count
        take = beat.takes[call_count]
        take.status = "failed"
        for s in take.workflow.steps:
            s.status = "failed"
        call_count += 1

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch("asyncio.to_thread", side_effect=_always_fail),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    assert len(beat.takes) == 2  # exhausted max_takes
    assert beat.primary_take is None


# ── Test 2: Strategy-driven retry produces a modified workflow ──────


def test_strategy_driven_retry_applies_diff():
    """When Take 0 fails, strategy engine selects CHANGE_SEED, and
    Take 1's workflow payload contains the mutated seed."""
    engine = MagicMock(spec=StrategyEngine)
    engine.select_and_apply.return_value = _fake_strategy_diff()
    runner = _make_runner(strategy_engine=engine, max_takes=2)
    beat = _make_beat()
    beat.max_takes = 2
    scene = _make_scene(beat)

    call_count = 0

    def _fail_then_succeed(fn, **kw):
        nonlocal call_count
        take = beat.takes[call_count]
        if call_count == 0:
            take.status = "failed"
            for s in take.workflow.steps:
                s.status = "failed"
        else:
            take.status = "succeeded"
            for s in take.workflow.steps:
                s.status = "succeeded"
        call_count += 1

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch(
            "asyncio.to_thread",
            side_effect=_fail_then_succeed,
        ),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    assert engine.select_and_apply.call_count == 1
    assert beat.primary_take is not None
    # Verify the second take's payload was mutated
    retry_wf = beat.takes[1].workflow
    assert retry_wf.steps[0].payload.get("seed") == 42


# ── Test 3: Strategy exhaustion stops retries before max_takes ──────


def test_strategy_exhausted_stops_loop():
    """When select_and_apply returns None, the loop breaks even if
    max_takes hasn't been reached."""
    engine = MagicMock(spec=StrategyEngine)
    engine.select_and_apply.return_value = None
    runner = _make_runner(strategy_engine=engine, max_takes=5)
    beat = _make_beat()
    beat.max_takes = 5
    scene = _make_scene(beat)

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch(
            "asyncio.to_thread",
            side_effect=lambda fn, **kw: setattr(beat.takes[-1], "status", "failed"),
        ),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    assert len(beat.takes) == 1  # stopped after first failure
    assert beat.primary_take is None


# ── Test 4: ESCALATE_TO_HUMAN stops retries ─────────────────────────


def test_escalate_to_human_stops_loop():
    engine = MagicMock(spec=StrategyEngine)
    engine.select_and_apply.return_value = _fake_strategy_diff(
        name=RetryStrategyName.ESCALATE_TO_HUMAN,
    )
    runner = _make_runner(strategy_engine=engine, max_takes=5)
    beat = _make_beat()
    beat.max_takes = 5
    scene = _make_scene(beat)

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch(
            "asyncio.to_thread",
            side_effect=lambda fn, **kw: setattr(beat.takes[-1], "status", "failed"),
        ),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    assert len(beat.takes) == 1
    assert beat.primary_take is None


# ── Test 5: Graceful degradation on strategy engine crash ───────────


def test_strategy_engine_crash_falls_back_to_blind_retry():
    """If select_and_apply raises, the loop continues with blind retry
    (no workflow mutation) rather than crashing."""
    engine = MagicMock(spec=StrategyEngine)
    engine.select_and_apply.side_effect = RuntimeError("kaboom")
    runner = _make_runner(strategy_engine=engine, max_takes=3)
    beat = _make_beat()
    scene = _make_scene(beat)

    call_count = 0

    def _fail_then_succeed(fn, **kw):
        nonlocal call_count
        take = beat.takes[call_count]
        take.status = "succeeded" if call_count == 2 else "failed"
        for s in take.workflow.steps:
            s.status = take.status
        call_count += 1

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch("asyncio.to_thread", side_effect=_fail_then_succeed),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    assert len(beat.takes) == 3
    assert beat.primary_take is not None


# ── Test 6: Provenance logged for strategy events ───────────────────


def test_ops_log_provenance_for_strategy_selection():
    engine = MagicMock(spec=StrategyEngine)
    engine.select_and_apply.return_value = _fake_strategy_diff()
    runner = _make_runner(strategy_engine=engine, max_takes=2)
    beat = _make_beat()
    beat.max_takes = 2
    scene = _make_scene(beat)

    logged_events = []

    def _capture_log(entry, **kw):
        logged_events.append(entry)

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch(
            "recoil.pipeline.orchestrator.episode_runner.ops_log.write",
            side_effect=_capture_log,
        ),
        patch(
            "asyncio.to_thread",
            side_effect=lambda fn, **kw: setattr(beat.takes[-1], "status", "failed"),
        ),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    strategy_events = [
        e for e in logged_events if e.get("event") == "retry_strategy_selected"
    ]
    assert len(strategy_events) >= 1
    assert strategy_events[0]["strategy"] == "change_seed"
    assert strategy_events[0]["failure_mode"] is not None
    assert "cumulative_retry_cost" in strategy_events[0]


# ── Test 7: Cost policy passed to strategy engine ─────────────────


def test_cost_policy_passed_to_strategy_engine():
    """Verify RetryCostPolicy is forwarded to select_and_apply."""
    engine = MagicMock(spec=StrategyEngine)
    engine.select_and_apply.return_value = None
    policy = RetryCostPolicy(max_retry_spend_usd=2.00)
    runner = _make_runner(strategy_engine=engine, max_takes=3)
    runner._retry_cost_policy = policy
    beat = _make_beat()
    scene = _make_scene(beat)

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch(
            "asyncio.to_thread",
            side_effect=lambda fn, **kw: setattr(beat.takes[-1], "status", "failed"),
        ),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    call_kwargs = engine.select_and_apply.call_args
    actual_policy = call_kwargs.kwargs.get("cost_policy") or call_kwargs[1].get(
        "cost_policy"
    )
    assert actual_policy is policy


# ── Test 8: _apply_strategy_diff overlays gen config ───────────────


def test_apply_strategy_diff_overlays_generation_config():
    modified_pass = CoveragePass(
        pass_id="test",
        episode_id="ep_001",
        shot_range=("SH01", "SH01"),
        camera_side="A",
        label="test",
        focus_character="",
        pass_type="env",
        generation_config={"tier": "pro", "seed": 999},
    )
    diff = StrategyDiff(
        strategy_name=RetryStrategyName.UPGRADE_FAST_TO_PRO,
        modified_pass=modified_pass,
        changes={"generation_config.tier": "fast -> pro"},
        cost_tier="cheap",
        estimated_cost_usd=1.10,
    )
    wf = Workflow(
        workflow_id="test_wf",
        steps=[
            WorkflowStep(
                step_id="video",
                modality="video_i2v",
                payload={"prompt": "test prompt", "tier": "fast"},
            )
        ],
    )
    EpisodeRunner._apply_strategy_diff(diff, wf, original_pass=None)

    assert wf.steps[0].payload["tier"] == "pro"
    assert wf.steps[0].payload["seed"] == 999
    assert wf.steps[0].payload["_retry_strategy"] == "upgrade_fast_to_pro"
    assert wf.global_provenance["retry_strategy"] == "upgrade_fast_to_pro"


# ── Test 9: _apply_strategy_diff handles prompt prefix ──────────────


def test_apply_strategy_diff_prepends_arc_preamble():
    orig = CoveragePass(
        pass_id="test",
        episode_id="ep_001",
        shot_range=("SH01", "SH01"),
        camera_side="A",
        label="test",
        focus_character="",
        pass_type="env",
        arc_preamble="Original preamble. ",
    )
    modified = dataclasses.replace(
        orig,
        arc_preamble="IDENTITY LOCK. Original preamble. ",
    )
    diff = StrategyDiff(
        strategy_name=RetryStrategyName.STRENGTHEN_IDENTITY_LOCK,
        modified_pass=modified,
        changes={"arc_preamble": "prepended identity lock"},
        cost_tier="free",
        estimated_cost_usd=1.06,
    )
    wf = Workflow(
        workflow_id="test_wf",
        steps=[
            WorkflowStep(
                step_id="video",
                modality="video_i2v",
                payload={"prompt": "A man walks through the door."},
            )
        ],
    )
    EpisodeRunner._apply_strategy_diff(diff, wf, original_pass=orig)

    assert wf.steps[0].payload["prompt"].startswith("IDENTITY LOCK. ")


# ── REC-14: phantom-success invalidation must release primary selection ──


def test_invalidate_phantom_clears_primary_take_id():
    """REC-14 regression: a succeeded take whose video artifact is missing
    must be demoted AND released as primary, so run_scene re-dispatches the
    beat. Before the fix, invalidate_phantom_succeeded_takes set status=failed
    but left primary_take_id pointing at the dead take — Beat.primary_take
    still returned it (non-None), so run_scene filtered the beat out forever.
    """
    runner = _make_runner()
    beat = _make_beat()
    scene = _make_scene(beat)

    # A "succeeded" take with NO video artifact on disk (no receipt
    # output_path → _take_video_artifact returns None → phantom),
    # selected as the beat's primary.
    take = beat.new_take(
        workflow=Workflow(
            workflow_id="SH01_take_0",
            steps=[
                WorkflowStep(
                    step_id="video", modality="video_i2v", payload={"shot_id": "SH01"}
                )
            ],
        )
    )
    take.status = "succeeded"
    beat.primary_take_id = take.take_id
    assert beat.primary_take is take  # precondition: phantom is the primary

    n = runner.invalidate_phantom_succeeded_takes(scene)

    assert n == 1
    assert take.status == "failed"
    # The bug: primary must be released so run_scene's
    # `b.primary_take is None` filter lets the beat re-dispatch.
    assert beat.primary_take is None


def test_invalidate_phantom_preserves_take_with_real_artifact(tmp_path, monkeypatch):
    """Inverse guard: a succeeded take WITH a present artifact must NOT be
    demoted and must stay primary — the fix only releases phantoms."""
    runner = _make_runner()
    beat = _make_beat()
    scene = _make_scene(beat)
    take = beat.new_take(
        workflow=Workflow(
            workflow_id="SH01_take_0",
            steps=[
                WorkflowStep(
                    step_id="video", modality="video_i2v", payload={"shot_id": "SH01"}
                )
            ],
        )
    )
    take.status = "succeeded"
    beat.primary_take_id = take.take_id
    real = tmp_path / "out.mp4"
    real.write_bytes(b"x" * 16)
    monkeypatch.setattr(runner, "_take_video_artifact", lambda t: str(real))

    n = runner.invalidate_phantom_succeeded_takes(scene)

    assert n == 0
    assert take.status == "succeeded"
    assert beat.primary_take is take


# ── REC-20: REAL StrategyEngine end-to-end (no mock) ────────────────


def test_real_strategy_engine_fires_on_failure_e2e(tmp_path):
    """REC-20 e2e: a REAL StrategyEngine + cold-start LearningEngine
    classifies a failure, selects a static-chain strategy, and mutates the
    retried Take's workflow payload. Proves intelligent retry actually fires
    end-to-end — not blind round-robin, and not a MagicMock."""
    learning = LearningEngine(project="test_proj", state_dir=tmp_path)
    engine = StrategyEngine(learning=learning, model="seeddance-2.0")
    runner = _make_runner(strategy_engine=engine, max_takes=2)
    beat = _make_beat()
    beat.max_takes = 2
    scene = _make_scene(beat)

    call_count = 0

    def _fail_then_succeed(fn, **kw):
        nonlocal call_count
        take = beat.takes[call_count]
        take.status = "succeeded" if call_count == 1 else "failed"
        for s in take.workflow.steps:
            s.status = take.status
        call_count += 1

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch("asyncio.to_thread", side_effect=_fail_then_succeed),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    # Retry happened and succeeded on take 1.
    assert len(beat.takes) == 2
    assert beat.primary_take is not None

    # The retried take's workflow was mutated by a REAL registered strategy.
    retry_wf = beat.takes[1].workflow
    applied = retry_wf.steps[0].payload.get("_retry_strategy")
    assert applied in {n.value for n in RetryStrategyName}
    assert applied != RetryStrategyName.ESCALATE_TO_HUMAN.value
    # Cold start: learning had no data, so the pick came from the static
    # escalation chain — confirms the genuine engine ran, not a stub.
    assert (
        retry_wf.global_provenance["retry_changes"]["_selection_basis"]
        == "static_chain"
    )


def test_real_strategy_engine_cost_cap_halts_runaway(tmp_path):
    """REC-20 cost-cap guard: with a zero retry-spend ceiling, the REAL
    StrategyEngine finds every strategy over budget, escalates, and the loop
    halts after the first failure instead of blindly burning all max_takes.
    This is the unattended-overnight safety gate."""
    learning = LearningEngine(project="test_proj", state_dir=tmp_path)
    engine = StrategyEngine(learning=learning, model="seeddance-2.0")
    runner = _make_runner(strategy_engine=engine, max_takes=5)
    runner._retry_cost_policy = RetryCostPolicy(max_retry_spend_usd=0.0)
    beat = _make_beat()
    beat.max_takes = 5
    scene = _make_scene(beat)

    with (
        patch(
            "recoil.pipeline.orchestrator.episode_runner.save_active_scene",
        ),
        patch(
            "asyncio.to_thread",
            side_effect=lambda fn, **kw: setattr(beat.takes[-1], "status", "failed"),
        ),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat,
                scene,
                1,  # expected_version
                dry_run=False,
            )
        )

    # Cost cap forced escalation after take 0 — NOT 5 blind retries.
    assert len(beat.takes) == 1
    assert beat.primary_take is None


# ── REC-20: wire regression — run_overnight must inject the engine ──


def test_run_overnight_injects_real_strategy_engine(tmp_path, monkeypatch):
    """REC-20 wire regression: run_overnight._amain must construct
    EpisodeRunner WITH a real StrategyEngine (backed by a real
    LearningEngine), never leaving it None (= blind round-robin). Guards
    against silently dropping the injection in a future refactor."""
    import argparse
    from recoil.pipeline.cli import run_overnight as ro
    from recoil.pipeline.orchestrator.strategy_registry import (
        StrategyEngine as RealStrategyEngine,
    )

    captured: dict = {}

    class _FakeRunner:
        def __init__(self, **kwargs):
            captured.update(kwargs)

        async def run_episode_batches(self, canonical_plan, dry_run=False):
            return []

    class _FakePlan:
        raw = {"sequences": {}}

    monkeypatch.setattr(ro, "EpisodeRunner", _FakeRunner)
    monkeypatch.setattr(ro, "load_plan", lambda p: _FakePlan())
    monkeypatch.setattr(ro, "_load_casting_state", lambda project: {})
    monkeypatch.setattr(ro, "ExecutionStore", lambda project: MagicMock())
    monkeypatch.setattr(ro, "StepRunner", lambda **kw: MagicMock())
    monkeypatch.setattr(ro, "_write_dailies_queue", lambda *a, **k: None)
    monkeypatch.setattr(ro, "ops_log", MagicMock())
    monkeypatch.setattr(ro, "CoreProjectPaths", MagicMock())

    fake_paths = MagicMock()
    fake_pp = MagicMock()
    fake_pp.for_episode = lambda *a, **k: fake_paths
    monkeypatch.setattr(ro, "ProjectPaths", fake_pp)

    # Inject a cold LearningEngine rooted in tmp so the test never touches
    # the real ~/Dropbox/CLAUDE_DATA learning store.
    monkeypatch.setattr(
        ro,
        "LearningEngine",
        lambda project, **kw: LearningEngine(project=project, state_dir=tmp_path),
    )

    args = argparse.Namespace(
        project="test_proj",
        episode="ep_001",
        sequence=None,
        max_takes=3,
        budget_usd=50.0,
        concurrency=2,
        halt_after=0,
        resume=False,
        dry_run=True,
        verbose=False,
        model_override=None,
        tier_override=None,
        generate_audio=None,
    )
    rc = asyncio.run(ro._amain(args))

    assert rc == 0
    se = captured.get("strategy_engine")
    assert isinstance(se, RealStrategyEngine), (
        "overnight loop must inject a StrategyEngine, not run blind"
    )
    assert isinstance(se._learning, LearningEngine)


# ── REC-38: retry mutations must reach the wire (provider_hints channel) ──


def test_apply_strategy_diff_routes_seed_to_provider_hints():
    """REC-38: CHANGE_SEED's seed must land in provider_hints (the channel the
    runner actually forwards to the Flora/fal wire), not only the dead top-level
    payload key that the VideoRunner whitelist drops."""
    cp = CoveragePass(
        pass_id="t", episode_id="ep_001", shot_range=("SH01", "SH01"),
        camera_side="A", label="t", focus_character="", pass_type="env",
        generation_config={"seed": 1234},
    )
    diff = StrategyDiff(
        strategy_name=RetryStrategyName.CHANGE_SEED, modified_pass=cp,
        changes={"generation_config.seed": "1234"}, cost_tier="free",
        estimated_cost_usd=1.06,
    )
    wf = Workflow(workflow_id="t", steps=[WorkflowStep(
        step_id="video", modality="video_i2v", payload={"prompt": "x"})])
    EpisodeRunner._apply_strategy_diff(diff, wf, original_pass=None)
    assert wf.steps[0].payload["provider_hints"]["seed"] == 1234
    assert isinstance(wf.steps[0].payload["provider_hints"]["seed"], int)


def test_apply_strategy_diff_routes_tier_to_provider_hints():
    """REC-38: a tier switch (UPGRADE_FAST_TO_PRO) must land in provider_hints so
    the (REC-38-fixed) tier read site forwards it to VideoModelClient(tier=...)."""
    cp = CoveragePass(
        pass_id="t", episode_id="ep_001", shot_range=("SH01", "SH01"),
        camera_side="A", label="t", focus_character="", pass_type="env",
        generation_config={"tier": "pro"},
    )
    diff = StrategyDiff(
        strategy_name=RetryStrategyName.UPGRADE_FAST_TO_PRO, modified_pass=cp,
        changes={"generation_config.tier": "fast -> pro"}, cost_tier="cheap",
        estimated_cost_usd=1.10,
    )
    wf = Workflow(workflow_id="t", steps=[WorkflowStep(
        step_id="video", modality="video_i2v",
        payload={"prompt": "x", "tier": "fast"})])
    EpisodeRunner._apply_strategy_diff(diff, wf, original_pass=None)
    assert wf.steps[0].payload["provider_hints"]["tier"] == "pro"


# ── REC-44 / REC-38 join: one retry reaches the wire end-to-end ──────


def test_real_strategy_engine_routes_seed_to_provider_hints_e2e(tmp_path):
    """REC-44 deterministic loop-closure signal (the $0 precursor to the live
    one-beat smoke).

    The existing e2e test proves a retry FIRES; the REC-38 unit tests prove
    _apply_strategy_diff routes seed→provider_hints in ISOLATION. Neither proves
    both happen together through _dispatch_one_beat. This does: a real failure →
    REAL StrategyEngine classifies TRANSIENT (errorless take, no video) → selects
    CHANGE_SEED off the static chain → the loop applies the diff → the RETRIED
    take's payload carries the mutated seed in provider_hints, the only channel
    the runner forwards to the Flora/fal wire.
    """
    learning = LearningEngine(project="test_proj", state_dir=tmp_path)
    engine = StrategyEngine(learning=learning, model="seeddance-2.0")
    runner = _make_runner(strategy_engine=engine, max_takes=2)
    beat = _make_beat()
    beat.max_takes = 2
    scene = _make_scene(beat)

    call_count = 0

    def _fail_then_succeed(fn, **kw):
        nonlocal call_count
        take = beat.takes[call_count]
        # take 0 fails with NO receipt/error → PassResult.error is None →
        # detect_failure_mode returns TRANSIENT → chain is [CHANGE_SEED].
        take.status = "succeeded" if call_count == 1 else "failed"
        for s in take.workflow.steps:
            s.status = take.status
        call_count += 1

    with (
        patch("recoil.pipeline.orchestrator.episode_runner.save_active_scene"),
        patch("asyncio.to_thread", side_effect=_fail_then_succeed),
    ):
        asyncio.run(
            runner._dispatch_one_beat(
                beat, scene, 1, dry_run=False,  # expected_version
            )
        )

    # Exactly one retry fired and the second take succeeded.
    assert len(beat.takes) == 2
    assert beat.primary_take is not None

    retry_payload = beat.takes[1].workflow.steps[0].payload
    # Real engine picked the TRANSIENT static-chain strategy (mechanical re-roll).
    assert retry_payload["_retry_strategy"] == RetryStrategyName.CHANGE_SEED.value
    # REC-38: the mutated seed reaches the wire via provider_hints — not only the
    # dead top-level key the VideoRunner whitelist drops.
    hints = retry_payload.get("provider_hints") or {}
    assert isinstance(hints.get("seed"), int)
    # _apply_strategy_diff writes both channels with the same value.
    assert retry_payload.get("seed") == hints["seed"]


def test_content_filter_failure_appends_filter_safety_label(tmp_path, monkeypatch):
    from recoil.core.critic import FailureMode
    from recoil.pipeline.orchestrator import episode_runner
    from recoil.pipeline._lib import filter_safety

    label_path = tmp_path / "filter_safety_labels.jsonl"
    monkeypatch.setattr(filter_safety, "FILTER_SAFETY_LABELS_PATH", label_path)
    monkeypatch.setattr(episode_runner, "FILTER_SAFETY_LABELS_PATH", label_path)
    runner = _make_runner(strategy_engine=MagicMock(spec=StrategyEngine), max_takes=1)
    beat = _make_beat()
    prompt = (
        "She draws her bloodied katana mid-strike, the lethal warrior in her "
        "torn harness confronting the guards."
    )
    take = _attach_failed_take(beat, prompt=prompt)

    (failure_mode, _confidence), _result, _pass = runner._classify_take_failure(
        take, beat, 0
    )

    assert failure_mode is FailureMode.CONTENT_FILTER_HARD_BLOCK
    records = _read_jsonl(label_path)
    assert len(records) == 1
    record = records[0]
    assert record["project"] == "test_proj"
    assert record["episode"] == runner.episode
    assert record["shot_id"] == beat.beat_id
    assert record["model"] == "fal/test-video"
    assert (
        record["provider_error"]
        == "content policy violation: rejected by safety filter"
    )
    assert record["prompt"] == prompt
    assert record["prompt"] is not None
    assert "prompt_error" not in record
    assert record["lint"]["warn"] == 1


def test_content_filter_failure_with_malformed_take_appends_prompt_error(
    tmp_path, monkeypatch
):
    from recoil.core.critic import FailureMode
    from recoil.pipeline.orchestrator import episode_runner
    from recoil.pipeline._lib import filter_safety

    label_path = tmp_path / "filter_safety_labels.jsonl"
    monkeypatch.setattr(filter_safety, "FILTER_SAFETY_LABELS_PATH", label_path)
    monkeypatch.setattr(episode_runner, "FILTER_SAFETY_LABELS_PATH", label_path)
    runner = _make_runner(strategy_engine=MagicMock(spec=StrategyEngine), max_takes=1)
    beat = _make_beat()
    take = _attach_failed_take(beat, prompt=None)

    (failure_mode, _confidence), _result, _pass = runner._classify_take_failure(
        take, beat, 0
    )

    assert failure_mode is FailureMode.CONTENT_FILTER_HARD_BLOCK
    records = _read_jsonl(label_path)
    assert len(records) == 1
    assert "prompt" not in records[0]
    assert "prompt_error" in records[0]
    assert "no workflow step payload contains prompt" in records[0]["prompt_error"]


def test_non_content_filter_failure_does_not_append_filter_safety_label(
    tmp_path, monkeypatch
):
    from recoil.core.critic import FailureMode
    from recoil.pipeline.orchestrator import episode_runner
    from recoil.pipeline._lib import filter_safety

    label_path = tmp_path / "filter_safety_labels.jsonl"
    monkeypatch.setattr(filter_safety, "FILTER_SAFETY_LABELS_PATH", label_path)
    monkeypatch.setattr(episode_runner, "FILTER_SAFETY_LABELS_PATH", label_path)
    runner = _make_runner(strategy_engine=MagicMock(spec=StrategyEngine), max_takes=1)
    beat = _make_beat()
    take = _attach_failed_take(
        beat,
        error="500 service unavailable",
        prompt="She holds the ceremonial sword and waits.",
    )

    (failure_mode, _confidence), _result, _pass = runner._classify_take_failure(
        take, beat, 0
    )

    assert failure_mode is FailureMode.TRANSIENT
    assert _read_jsonl(label_path) == []
