from __future__ import annotations

import asyncio
import dataclasses
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock

import pytest

from recoil.core.naming import build_filename
from recoil.execution.step_types import PassResult, ProjectPaths
from recoil.pipeline._lib.plan_loader import CanonicalPlan, CanonicalShot
from recoil.pipeline.cli import generate
from recoil.pipeline.core.persistence import save_scene, scene_path
from recoil.pipeline.core.receipts import GenerationReceipt, utc_now_iso8601
from recoil.pipeline.core.registry import MODALITY_R2V_MULTI, RunResult
from recoil.pipeline.core.take import Beat, Scene
from recoil.pipeline.core.workflow import Workflow, WorkflowStep
from recoil.pipeline.orchestrator.episode_runner import (
    EpisodeRunner,
    RerollPreflightError,
)


@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
    root = tmp_path / "projects"
    root.mkdir()
    (root / ".recoil-data-root").touch()
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(root))
    monkeypatch.setattr(
        "recoil.pipeline.orchestrator.episode_runner.ops_log.write",
        lambda *a, **kw: None,
    )
    from recoil.pipeline.core.dispatch import _reset_bootstrap_for_tests
    from recoil.pipeline.core.registry import _reset_for_tests

    _reset_for_tests()
    _reset_bootstrap_for_tests()
    yield
    _reset_for_tests()
    _reset_bootstrap_for_tests()


def _shot(shot_id: str = "EP001_SH23", scene_index: int = 1) -> CanonicalShot:
    return CanonicalShot(
        shot_id=shot_id,
        scene_index=scene_index,
        sequence_id=None,
        pipeline="video",
        previs_model=None,
        video_model="seeddance-2.0",
        location_id="L1",
        characters=[],
        shot_type="MS",
        duration_s=2.0,
        is_env_only=True,
        has_dialogue=False,
        aspect_ratio="9:16",
        raw={"shot_id": shot_id, "description": f"shot {shot_id}"},
    )


def _plan(shots: list[CanonicalShot]) -> CanonicalPlan:
    return CanonicalPlan(
        episode_id="ep_001",
        project="fixture",
        shots=list(shots),
        source_path=Path("fixture.json"),
        raw={"shots": []},
    )


def _take_path(video_dir: Path, take: int, shot_ids: list[str] | None = None) -> Path:
    return video_dir / build_filename(
        episode=1,
        pass_counter=1,
        shot_ids=shot_ids or ["EP001_SH23", "EP001_SH24", "EP001_SH25"],
        take=take,
    )


def _receipt(output_path: Path, success: bool = True) -> GenerationReceipt:
    return GenerationReceipt(
        receipt_id=f"rcpt_test_{output_path.stem}",
        modality=MODALITY_R2V_MULTI,
        caller_id="test",
        project="fixture",
        episode=1,
        shot_id="PASS_011",
        timestamp_utc=utc_now_iso8601(),
        run_result=RunResult(
            id=f"run_{output_path.stem}",
            modality=MODALITY_R2V_MULTI,
            output_path=str(output_path),
            success=success,
            metadata={"take_index": 1 if success else -1},
        ),
    )


def _succeeded_beat(video_dir: Path) -> Beat:
    shot_ids = ["EP001_SH23", "EP001_SH24", "EP001_SH25"]
    take1 = _take_path(video_dir, 1, shot_ids)
    take1.parent.mkdir(parents=True, exist_ok=True)
    take1.write_bytes(b"take-one-original")
    shots = [_shot(sid, i + 1) for i, sid in enumerate(shot_ids)]
    beat = Beat(
        beat_id="PASS_011__cov",
        max_takes=5,
        beat_metadata={
            "scene_id": "PASS_011",
            "modality": "r2v_multi",
            "shot": dataclasses.asdict(shots[0]),
            "batch_shots": [dataclasses.asdict(s) for s in shots],
            "inputs_fingerprint": "",
        },
    )
    wf = Workflow(
        workflow_id="PASS_011__cov_take_0",
        steps=[
            WorkflowStep(
                step_id="video",
                modality=MODALITY_R2V_MULTI,
                payload={"shot_id": "PASS_011", "segment_shot_ids": shot_ids},
                status="succeeded",
                receipt=_receipt(take1),
            )
        ],
        global_provenance={"shot_id": beat.beat_id},
    )
    take = beat.new_take(workflow=wf)
    take.status = "succeeded"
    beat.primary_take_id = take.take_id
    return beat


class _FakeStepRunner:
    def __init__(
        self,
        video_dir: Path,
        submit: MagicMock,
        *,
        fail: bool = False,
        artifact: bytes = b"rerolled-video",
    ):
        self.video_dir = video_dir
        self.submit = submit
        self.fail = fail
        self.artifact = artifact
        self._dispatch_path = ""

    def execute_pass(self, **kwargs):
        body = {"params": {}}
        if kwargs.get("seed") is not None:
            body["params"]["seed"] = kwargs["seed"]
        self.submit(body)
        if self.fail:
            return PassResult(
                pass_id=kwargs["pass_id"],
                success=False,
                video_path=None,
                cost_usd=0.0,
                model=kwargs.get("model", ""),
                error="provider failed",
                take_index=kwargs.get("forced_take_number") or -1,
            )
        take_num = kwargs.get("forced_take_number")
        if take_num is None:
            existing = sorted(self.video_dir.glob(f"*{kwargs['pass_id']}_take*.mp4"))
            take_num = len(existing) + 1
        path = _take_path(self.video_dir, int(take_num), list(kwargs["segment_shot_ids"]))
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_bytes(self.artifact)
        return PassResult(
            pass_id=kwargs["pass_id"],
            success=True,
            video_path=str(path),
            cost_usd=0.0,
            model=kwargs.get("model", ""),
            take_index=int(take_num),
            expected_cuts=max(0, len(kwargs["segment_shot_ids"]) - 1),
        )


def _runner(step_runner, *, strategy_engine=None) -> EpisodeRunner:
    return EpisodeRunner(
        project="fixture",
        plan={"shots": []},
        max_takes=5,
        budget_usd=50.0,
        concurrency=1,
        episode="ep_001",
        step_runner=step_runner,
        strategy_engine=strategy_engine,
    )


def _persist_pass_scene(beat: Beat) -> None:
    save_scene(
        Scene(scene_id="PASS_011", beats=[beat]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )


def _write_location_ref(project: str = "fixture", location_id: str = "L1") -> Path:
    from recoil.core.paths import ProjectPaths as CoreProjectPaths
    from recoil.core.ref_resolver import slugify_asset_id

    subject = slugify_asset_id(location_id)
    ref = (
        CoreProjectPaths.for_project(project).asset_subject_dir("loc", subject)
        / f"{subject}_loc_hero_v01.png"
    )
    ref.parent.mkdir(parents=True, exist_ok=True)
    ref.write_bytes(b"stable-location-ref")
    return ref


def _run_reroll(
    tmp_path,
    *,
    submit: MagicMock | None = None,
    make_primary: bool = False,
    fail: bool = False,
    seed: int | None = None,
):
    submit = submit or MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    _persist_pass_scene(beat)
    runner = _runner(_FakeStepRunner(video_dir, submit, fail=fail))
    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
            seed=seed,
            make_primary=make_primary,
        )
    )
    return scenes[0].beats[0], video_dir, submit


def _run_main(argv: list[str], capsys):
    old = sys.argv
    sys.argv = ["generate.py", *argv]
    try:
        try:
            code = generate.main()
            out = capsys.readouterr().out
            result = json.loads(out) if out else {}
        except SystemExit as exc:
            code = exc.code
            result = {"success": False, "error": "system_exit"}
    finally:
        sys.argv = old
    return code, result


def _write_cli_project(project: str = "fixture", episode: int = 1) -> None:
    paths = ProjectPaths.for_episode(project, episode)
    paths.project_root.mkdir(parents=True, exist_ok=True)
    paths.video_dir.mkdir(parents=True, exist_ok=True)
    paths.plans_dir.mkdir(parents=True, exist_ok=True)
    paths.coverage_passes_dir.mkdir(parents=True, exist_ok=True)
    (paths.plans_dir / "ep_001_plan.json").write_text(
        json.dumps(
            {
                "episode_id": "ep_001",
                "project": project,
                "shots": [
                    {
                        "shot_id": "EP001_SH23",
                        "scene_index": 1,
                        "pipeline": "video",
                        "video_model": "seeddance-2.0",
                        "asset_data": {"location_id": "L1", "characters": []},
                        "prompt_data": {"shot_type": "MS"},
                        "routing_data": {"target_editorial_duration_s": 2},
                    },
                    {
                        "shot_id": "EP001_SH24",
                        "scene_index": 2,
                        "pipeline": "video",
                        "video_model": "seeddance-2.0",
                        "asset_data": {"location_id": "L1", "characters": []},
                        "prompt_data": {"shot_type": "MS"},
                        "routing_data": {"target_editorial_duration_s": 2},
                    },
                    {
                        "shot_id": "EP001_SH25",
                        "scene_index": 3,
                        "pipeline": "video",
                        "video_model": "seeddance-2.0",
                        "asset_data": {"location_id": "L1", "characters": []},
                        "prompt_data": {"shot_type": "MS"},
                        "routing_data": {"target_editorial_duration_s": 2},
                    },
                ],
            }
        )
    )
    (paths.coverage_passes_dir / "ep_001_passes.json").write_text(
        json.dumps(
            [
                {
                    "pass_id": "PASS_011",
                    "episode_id": "ep_001",
                    "shot_range": ["EP001_SH23", "EP001_SH25"],
                    "camera_side": "A",
                    "label": "fixture",
                    "focus_character": "",
                    "pass_type": "env",
                    "location_id": "L1",
                    "generation_config": {"mode": "t2v"},
                    "segments": [
                        {"segment_index": 0, "source_shot_id": "EP001_SH23", "shot_type": "MS", "duration_s": 2, "prompt": "a"},
                        {"segment_index": 1, "source_shot_id": "EP001_SH24", "shot_type": "MS", "duration_s": 2, "prompt": "b"},
                        {"segment_index": 2, "source_shot_id": "EP001_SH25", "shot_type": "MS", "duration_s": 2, "prompt": "c"},
                    ],
                }
            ]
        )
    )


def test_reroll_appends_take2_without_overwriting_take1(tmp_path):
    beat, video_dir, submit = _run_reroll(tmp_path)
    take1 = _take_path(video_dir, 1)
    take2 = _take_path(video_dir, 2)
    assert take1.read_bytes() == b"take-one-original"
    assert take2.exists()
    assert take2.read_bytes() == b"rerolled-video"
    assert len(beat.takes) == 2
    assert beat.takes[-1].take_index == 1
    assert submit.call_count == 1


def test_two_consecutive_rerolls_append_take2_then_take3(tmp_path):
    submit = MagicMock()
    beat, video_dir, _ = _run_reroll(tmp_path, submit=submit)
    _persist_pass_scene(beat)
    runner = _runner(_FakeStepRunner(video_dir, submit, artifact=b"third-video"))
    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
        )
    )
    beat = scenes[0].beats[0]
    assert len(beat.takes) == 3
    assert _take_path(video_dir, 1).read_bytes() == b"take-one-original"
    assert _take_path(video_dir, 2).read_bytes() == b"rerolled-video"
    assert _take_path(video_dir, 3).read_bytes() == b"third-video"
    assert submit.call_count == 2


def test_ordinal_is_beat_authoritative_and_collision_fails_loud_before_submit(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    _take_path(video_dir, 2).write_bytes(b"colliding-existing-file")
    _persist_pass_scene(beat)
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_collision"
    submit.assert_not_called()


def test_make_primary_default_off_and_promote_on_request(tmp_path):
    beat, video_dir, _ = _run_reroll(tmp_path)
    assert beat.primary_take_id == beat.takes[0].take_id
    assert beat.primary_take is beat.takes[0]

    _persist_pass_scene(beat)
    runner = _runner(_FakeStepRunner(video_dir, MagicMock(), artifact=b"promoted"))
    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
            make_primary=True,
        )
    )
    beat = scenes[0].beats[0]
    assert beat.primary_take_id == beat.takes[-1].take_id
    assert beat.primary_take.status == "succeeded"


def test_new_take_requires_single_pass(capsys, monkeypatch):
    submit = MagicMock()
    monkeypatch.setattr("recoil.execution.video_model_client.VideoModelClient.submit", submit)

    _, result = _run_main(["--project", "fixture", "--episode", "1", "--new-take"], capsys)
    assert result["error"] == "new_take_requires_single_pass"

    _, result = _run_main(
        ["--project", "fixture", "--episode", "1", "--passes", "PASS_011", "--new-take"],
        capsys,
    )
    assert result["error"] == "new_take_requires_single_pass"

    _, result = _run_main(["--project", "fixture", "--episode", "1", "--all", "--new-take"], capsys)
    assert result["error"] == "new_take_requires_single_pass"

    _write_cli_project()
    paths = ProjectPaths.for_episode("fixture", 1)
    passes_file = paths.coverage_passes_dir / "ep_001_passes.json"
    passes = json.loads(passes_file.read_text())
    passes[0]["generation_config"]["model"] = "not-a-real-model"
    passes_file.write_text(json.dumps(passes))
    result = generate.run_generation(
        project="fixture",
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )
    assert result["success"] is False
    assert result["error"] == "validation_blocked"
    assert result["blocks"][0]["check"] == "unknown_model"
    submit.assert_not_called()


def test_new_take_requires_existing_succeeded_primary(tmp_path):
    submit = MagicMock()
    beat = Beat(
        beat_id="PASS_011__cov",
        max_takes=5,
        beat_metadata={
            "scene_id": "PASS_011",
            "modality": "r2v_multi",
            "shot": dataclasses.asdict(_shot("EP001_SH23", 1)),
            "batch_shots": [dataclasses.asdict(_shot("EP001_SH23", 1))],
        },
    )
    _persist_pass_scene(beat)
    runner = _runner(_FakeStepRunner(tmp_path / "video", submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([_shot("EP001_SH23", 1)]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_requires_succeeded_primary"
    submit.assert_not_called()


def test_seed_reaches_flora_request_body(tmp_path, monkeypatch):
    from recoil.execution.execution_store import ExecutionStore
    from recoil.execution.step_runner import StepRunner
    from recoil.execution.types import GenerationResult

    bodies: list[dict] = []

    def fake_http(method, url, headers, body, timeout):
        bodies.append(body)
        return {"run_id": "run_1"}

    monkeypatch.setenv("RECOIL_PROVIDER_OVERRIDE", "flora")
    monkeypatch.setenv("FLORA_API_KEY", "test-key")
    monkeypatch.setenv("RECOIL_FLORA_WORKSPACE", "workspace")
    monkeypatch.setenv("RECOIL_FLORA_PROJECT", "project")
    monkeypatch.setattr("recoil.execution.video_model_client._http", fake_http)
    monkeypatch.setattr(
        "recoil.execution.video_model_client.VideoModelClient.wait_for_job",
        lambda self, job, timeout_s=900, on_status=None: GenerationResult(
            success=True,
            video_data=b"seeded-video",
            model="seeddance-2.0",
            cost=0.0,
            metadata={"provider": "flora"},
        ),
    )
    paths = ProjectPaths.for_episode("fixture", 1)
    paths.video_dir.mkdir(parents=True, exist_ok=True)
    runner = StepRunner(store=ExecutionStore("fixture"), paths=paths)
    result = runner.execute_pass(
        pass_id="PASS_011",
        prompt="test prompt",
        reference_image_paths=["https://example.test/ref.png"],
        segment_shot_ids=["EP001_SH23"],
        expected_segment_timestamps=[(0.0, 2.0)],
        duration=2,
        pass_counter=1,
        tag="COV_ENV",
        seed=424242,
        generate_audio=False,
    )

    assert result.success is True
    assert bodies
    assert bodies[0]["params"].get("seed") == 424242


def test_stable_refs_reroll_preserves_prior_primary(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    ref = _write_location_ref()
    runner = _runner(_FakeStepRunner(video_dir, submit))
    payload = runner._build_reroll_preflight_payload(beat)
    fresh_refs = payload.get("reference_images") or []
    assert str(ref) in fresh_refs
    stored_fp = EpisodeRunner._compute_inputs_fingerprint(fresh_refs)
    beat.beat_metadata["inputs_fingerprint"] = stored_fp
    take1 = _take_path(video_dir, 1)
    before = take1.read_bytes()
    _persist_pass_scene(beat)

    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
        )
    )

    beat = scenes[0].beats[0]
    assert beat.primary_take_id == beat.takes[0].take_id
    assert beat.takes[0].status == "succeeded"
    assert beat.beat_metadata["inputs_fingerprint"] == stored_fp
    assert take1.read_bytes() == before
    assert _take_path(video_dir, 2).exists()


def test_ref_drift_preflight_rejects_empty_stored_nonempty_fresh_refs(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    beat = _succeeded_beat(video_dir)
    _write_location_ref()
    beat.beat_metadata["inputs_fingerprint"] = ""
    _persist_pass_scene(beat)
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "reroll_refs_drifted"
    loaded = Scene.from_dict(json.loads(scene_path("fixture", "ep_001", "PASS_011").read_text()))
    assert loaded.beats[0].primary_take_id == beat.primary_take_id
    assert loaded.beats[0].beat_metadata["inputs_fingerprint"] == ""
    assert len(loaded.beats[0].takes) == 1
    submit.assert_not_called()


def test_integrity_still_runs_for_non_target_beats(tmp_path):
    submit = MagicMock()
    beat, video_dir, _ = _run_reroll(tmp_path, submit=submit)
    other = _succeeded_beat(video_dir)
    other.beat_id = "OTHER__cov"
    other.beat_metadata["modality"] = "video_i2v"
    other.beat_metadata.pop("batch_shots", None)
    save_scene(
        Scene(scene_id="PASS_011", beats=[beat, other]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )
    runner = _runner(_FakeStepRunner(video_dir, submit))
    runner.revalidate_succeeded_fingerprints = MagicMock(
        wraps=runner.revalidate_succeeded_fingerprints
    )
    scenes = asyncio.run(
        runner.run_episode_batches(
            _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
            force_single_batch=True,
            pass_id="PASS_011",
            force_new_take=True,
        )
    )
    assert runner.revalidate_succeeded_fingerprints.call_count == 1
    assert len(scenes[0].beats[0].takes) == 3
    assert submit.call_count == 2


def test_cli_success_reflects_appended_take_not_history(tmp_path, monkeypatch):
    _write_cli_project()
    submit = MagicMock()
    paths = ProjectPaths.for_episode("fixture", 1)
    beat = _succeeded_beat(paths.video_dir)
    failed = beat.new_take(workflow=beat.takes[0].workflow)
    failed.status = "failed"
    _persist_pass_scene(beat)
    fake = _FakeStepRunner(paths.video_dir, submit)
    monkeypatch.setattr(generate, "StepRunner", lambda store, paths, episode=None: fake)

    result = generate.run_generation(
        project="fixture",
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )

    assert result["success"] is True
    assert result["shots_succeeded"] == 1
    assert result["shots_failed"] == 0
    assert _take_path(paths.video_dir, 3).exists()


def test_new_take_fires_exactly_one_take_no_retry_ladder(tmp_path, monkeypatch):
    _write_cli_project()
    submit = MagicMock()
    paths = ProjectPaths.for_episode("fixture", 1)
    beat = _succeeded_beat(paths.video_dir)
    original_primary = beat.primary_take_id
    _persist_pass_scene(beat)
    fake = _FakeStepRunner(paths.video_dir, submit, fail=True)
    monkeypatch.setattr(generate, "StepRunner", lambda store, paths, episode=None: fake)

    result = generate.run_generation(
        project="fixture",
        episode=1,
        pass_ids=["PASS_011"],
        force_new_take=True,
    )
    loaded = Scene.from_dict(json.loads(scene_path("fixture", "ep_001", "PASS_011").read_text()))

    assert result["success"] is False
    assert submit.call_count == 1
    assert loaded.beats[0].primary_take_id == original_primary


def test_make_primary_true_with_failed_reroll_keeps_existing_primary(tmp_path):
    submit = MagicMock()
    beat, _, _ = _run_reroll(tmp_path, submit=submit, fail=True, make_primary=True)
    assert len(beat.takes) == 2
    assert beat.takes[-1].status == "failed"
    assert beat.primary_take_id == beat.takes[0].take_id


def test_new_take_requires_single_r2v_multi_beat(tmp_path):
    submit = MagicMock()
    video_dir = tmp_path / "video"
    target = _succeeded_beat(video_dir)
    extra = _succeeded_beat(video_dir)
    extra.beat_id = "PASS_011__extra_cov"
    save_scene(
        Scene(scene_id="PASS_011", beats=[target, extra]),
        scene_path("fixture", "ep_001", "PASS_011"),
    )
    runner = _runner(_FakeStepRunner(video_dir, submit))

    with pytest.raises(RerollPreflightError) as exc:
        asyncio.run(
            runner.run_episode_batches(
                _plan([_shot("EP001_SH23", 1), _shot("EP001_SH24", 2), _shot("EP001_SH25", 3)]),
                force_single_batch=True,
                pass_id="PASS_011",
                force_new_take=True,
            )
        )

    assert exc.value.error_code == "new_take_requires_single_r2v_multi_beat"
    submit.assert_not_called()


def test_seed_or_make_primary_requires_new_take(capsys, monkeypatch):
    submit = MagicMock()
    monkeypatch.setattr("recoil.execution.video_model_client.VideoModelClient.submit", submit)

    _, result = _run_main(
        ["--project", "fixture", "--episode", "1", "--pass", "PASS_011", "--seed", "123"],
        capsys,
    )
    assert result["error"] == "flag_requires_new_take"

    _, result = _run_main(
        ["--project", "fixture", "--episode", "1", "--pass", "PASS_011", "--make-primary"],
        capsys,
    )
    assert result["error"] == "flag_requires_new_take"
    submit.assert_not_called()
