#!/usr/bin/env python3
"""Overnight loop — single CLI entry for autonomous + manual fire (CP-10).

In-process, NOT subprocess. Maintains a live Scene tree and BudgetGuard
across sequences. Subprocess overhead isn't just startup cost — it's
budget IPC + plan/casting re-parse per sequence (SYNTHESIS D4).

Usage (autonomous + manual identical):

  python3 -m pipeline.cli.run_overnight \\
    --project driver-beware --episode ep_001 \\
    [--budget-usd 50.0] [--max-takes 3] \\
    [--concurrency 2] [--halt-after 0] [--resume] [--dry-run]

--halt-after defaults to 0 (disabled) per JT override (SYNTHESIS A4):
a failed sequence is already flagged in dailies_queue.json; halting the
run wastes the rest of the night. Budget cap is the only default hard halt.
"""

from __future__ import annotations

import argparse
import asyncio
import json
import logging
import os
import sys
from pathlib import Path

# Path bootstrap.
_RECOIL_ROOT = Path(__file__).resolve().parents[2]
if str(_RECOIL_ROOT) not in sys.path:
    sys.path.insert(0, str(_RECOIL_ROOT))
from recoil.core.paths import ensure_pipeline_importable  # noqa: E402

ensure_pipeline_importable()
from recoil.core.paths import ProjectPaths as CoreProjectPaths  # noqa: E402

from recoil.pipeline.orchestrator.episode_runner import (  # noqa: E402
    EpisodeRunner,
    BudgetExhaustedError,
)
from recoil.pipeline.orchestrator.learning_engine import LearningEngine  # noqa: E402
from recoil.pipeline.orchestrator.strategy_registry import StrategyEngine  # noqa: E402
from recoil.pipeline.core.receipts import utc_now_iso8601  # noqa: E402
from recoil.pipeline._lib import ops_log  # noqa: E402
from recoil.pipeline._lib.plan_loader import load_plan  # noqa: E402
from recoil.execution.execution_store import ExecutionStore  # noqa: E402
from recoil.execution.step_runner import StepRunner  # noqa: E402
from recoil.execution.step_types import ProjectPaths  # noqa: E402

logger = logging.getLogger(__name__)
_BOARD_GATE_ENV_VAR = "RECOIL_BOARD_GATE"


def _load_plan(project: str, episode: str) -> dict:
    plan_num = int(episode.split("_")[-1]) if episode.startswith("ep_") else 1
    plan_path = (
        CoreProjectPaths.for_project(project).plans_dir / f"ep_{plan_num:03d}_plan.json"
    )
    return json.loads(plan_path.read_text(encoding="utf-8"))


def _load_casting_state(project: str) -> dict[str, str]:
    """Return {slug: element_type} from casting_state.json. Empty if absent."""
    cs_path = CoreProjectPaths.for_project(project).casting_state_path
    if not cs_path.exists():
        return {}
    raw = json.loads(cs_path.read_text(encoding="utf-8"))
    out: dict[str, str] = {}
    for et in ("characters", "locations", "props"):
        for slug in (raw.get(et) or {}).keys():
            out[slug] = et
    return out


def _write_dailies_queue(
    project: str,
    episode: str,
    sequences: list[str],
    *,
    dry_run: bool = False,
) -> None:
    out_path = (
        CoreProjectPaths.for_project(project).orchestration_dir
        / f"dailies_queue_{episode}.json"
    )
    try:
        out_path.parent.mkdir(parents=True, exist_ok=True)
        out_path.write_text(
            json.dumps(
                {
                    "episode": episode,
                    "sequences": sequences,
                    "timestamp": utc_now_iso8601(),
                },
                indent=2,
            )
        )
    except PermissionError as exc:
        if not dry_run:
            raise
        logger.warning("Dry-run dailies queue persistence skipped for %s: %s", out_path, exc)


def _parse_args(argv: list[str]) -> argparse.Namespace:
    p = argparse.ArgumentParser(prog="run_overnight")
    p.add_argument("--project", required=True)
    p.add_argument("--episode", required=True)
    p.add_argument("--sequence", help=argparse.SUPPRESS)
    p.add_argument("--max-takes", type=int, default=3)
    p.add_argument("--budget-usd", type=float, default=50.0)
    p.add_argument("--concurrency", type=int, default=2)
    p.add_argument("--halt-after", type=int, default=0)  # disabled default
    p.add_argument("--resume", action="store_true")
    p.add_argument("--dry-run", action="store_true")
    p.add_argument("--verbose", action="store_true")
    p.add_argument(
        "--model",
        dest="model_override",
        default=None,
        help="Override video model_id for the run (e.g. seeddance-2.0, "
        "kling-v3). Defaults to CanonicalShot.video_model or "
        "dispatch_payload.DEFAULT_VIDEO_MODEL (seeddance-2.0).",
    )
    p.add_argument(
        "--tier",
        dest="tier_override",
        default=None,
        help="Override provider tier (e.g. standard_720p, fast_720p, "
        "fast_480p). Defaults to dispatch_payload.DEFAULT_TIER.",
    )
    audio_group = p.add_mutually_exclusive_group()
    audio_group.add_argument(
        "--generate-audio",
        dest="generate_audio",
        action="store_const",
        const=True,
        default=None,
        help="Force generate_audio=True for all shots.",
    )
    audio_group.add_argument(
        "--no-audio",
        dest="generate_audio",
        action="store_const",
        const=False,
        help="Force generate_audio=False for all shots.",
    )
    return p.parse_args(argv)


async def _amain(args: argparse.Namespace) -> int:
    if args.sequence is not None:
        raise SystemExit(
            "--sequence is not supported: it does NOT scope the run, so a "
            "scoped invocation would dispatch and pay for the WHOLE episode "
            "against --budget-usd. Re-run without --sequence to render the "
            "full episode, or scope via the plan."
        )
    plan_num = int(args.episode.split("_")[-1]) if args.episode.startswith("ep_") else 1
    plan_path = (
        CoreProjectPaths.for_project(args.project).plans_dir
        / f"ep_{plan_num:03d}_plan.json"
    )
    canonical_plan = load_plan(plan_path)
    plan = canonical_plan.raw
    casting = _load_casting_state(args.project)
    store = ExecutionStore(args.project)
    paths = ProjectPaths.for_episode(args.project, plan_num)
    paths.video_dir.mkdir(parents=True, exist_ok=True)
    step_runner = StepRunner(store=store, paths=paths, episode=plan_num)
    # REC-20: activate intelligent retry. A cold-start LearningEngine (no
    # prior data) makes StrategyEngine fall through to its static
    # ESCALATION_CHAINS, so injecting this is strictly additive vs the old
    # blind max_takes round-robin. EpisodeRunner's default RetryCostPolicy
    # (max $6/pass, warn $4) caps runaway retry on a single stubborn shot.
    strategy_engine = StrategyEngine(
        learning=LearningEngine(project=args.project),
        model=args.model_override or "seeddance-2.0",
    )
    ops_log.write(
        {
            "event": "strategy_engine_active",
            "project": args.project,
            "episode": args.episode,
            "model": args.model_override or "seeddance-2.0",
        }
    )
    runner = EpisodeRunner(
        project=args.project,
        plan=plan,
        casting=casting,
        max_takes=args.max_takes,
        budget_usd=args.budget_usd,
        concurrency=args.concurrency,
        episode=args.episode,
        model_override=args.model_override,
        tier_override=args.tier_override,
        generate_audio=args.generate_audio,
        step_runner=step_runner,
        strategy_engine=strategy_engine,
    )

    board_gate_override = os.environ.get(_BOARD_GATE_ENV_VAR)
    if args.dry_run and board_gate_override is None:
        # Dry-run is a no-spend dispatch smoke used by audit gates. Do not let
        # project-config board approval state make it depend on external prep
        # artifacts; RECOIL_BOARD_GATE=1 still forces the live gate explicitly.
        os.environ[_BOARD_GATE_ENV_VAR] = "0"
    try:
        scenes = await runner.run_episode_batches(
            canonical_plan,
            dry_run=args.dry_run,
        )
    except BudgetExhaustedError as e:
        ops_log.write({"event": "budget_halt_run", "remaining": e.remaining})
        # B2: the runner stamped the canonical shot ids of beats that actually
        # rendered THIS run (beat-scoped, current-run, build-error-excluding).
        # Write them verbatim — do NOT re-derive from disk (a loaded scene's
        # sibling-beat take is indistinguishable current vs prior run on disk).
        sequences_with_work = list(getattr(e, "rendered_shot_ids", []))
        _write_dailies_queue(
            args.project, args.episode, sequences_with_work,
            dry_run=args.dry_run,
        )
        ops_log.write(
            {"event": "overnight_complete",
             "sequences_run": len(sequences_with_work)}
        )
        return 0
    finally:
        if args.dry_run and board_gate_override is None:
            os.environ.pop(_BOARD_GATE_ENV_VAR, None)
    sequences_with_work: list[str] = [
        shot_id
        for s in scenes
        for b in s.beats
        for shot_id in b.beat_metadata.get("batch_summary", {}).get(
            "shot_ids", [b.beat_id]
        )
    ]

    _write_dailies_queue(
        args.project,
        args.episode,
        sequences_with_work,
        dry_run=args.dry_run,
    )
    ops_log.write(
        {"event": "overnight_complete", "sequences_run": len(sequences_with_work)}
    )
    return 0


def main(argv: list[str] | None = None) -> int:
    args = _parse_args(argv if argv is not None else sys.argv[1:])
    logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
    return asyncio.run(_amain(args))


if __name__ == "__main__":
    sys.exit(main())
