"""Regression tests for the dispatch reaper zombie detection cycle."""

from __future__ import annotations

import json
import os
import subprocess
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path


TOOLS_DIR = Path(__file__).resolve().parents[1]
REPO_ROOT = Path(__file__).resolve().parents[4]
REAPER = TOOLS_DIR / "dispatch_reaper.sh"


def iso_utc(value: datetime) -> str:
    return value.replace(microsecond=0).isoformat().replace("+00:00", "Z")


def now_iso() -> str:
    return iso_utc(datetime.now(timezone.utc))


def past_iso(seconds: int = 300) -> str:
    return iso_utc(datetime.now(timezone.utc) - timedelta(seconds=seconds))


def read_json(path: Path) -> dict:
    return json.loads(path.read_text(encoding="utf-8"))


def write_json(path: Path, payload: dict) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")


def events(run_dir: Path) -> list[dict]:
    path = run_dir / "events.jsonl"
    if not path.exists():
        return []
    return [
        json.loads(line)
        for line in path.read_text(encoding="utf-8").splitlines()
        if line.strip()
    ]


def transition_states(run_dir: Path) -> list[str]:
    return [
        item["state"]
        for item in events(run_dir)
        if item.get("event") == "transition" and item.get("state")
    ]


def run(cmd: list[str], *, cwd: Path | None = None, env: dict[str, str] | None = None) -> str:
    result = subprocess.run(
        cmd,
        cwd=cwd,
        env=env,
        text=True,
        capture_output=True,
        check=True,
    )
    return result.stdout.strip()


@dataclass(frozen=True)
class WorktreeFixture:
    branch: str
    commit: str
    spec: Path
    worktree: Path


def make_worktree(tmp: Path, run_id: str) -> WorktreeFixture:
    repo = tmp / f"{run_id}-repo"
    worktree = tmp / f"{run_id}-worktree"
    branch = f"codex/{run_id}"
    spec = tmp / f"{run_id}-spec.md"

    repo.mkdir()
    run(["git", "init"], cwd=repo)
    run(["git", "config", "user.email", "dispatch-reaper-test@example.invalid"], cwd=repo)
    run(["git", "config", "user.name", "Dispatch Reaper Test"], cwd=repo)
    (repo / "README.md").write_text(f"# {run_id}\n", encoding="utf-8")
    run(["git", "add", "README.md"], cwd=repo)
    run(["git", "commit", "-m", "initial"], cwd=repo)
    commit = run(["git", "rev-parse", "HEAD"], cwd=repo)
    run(["git", "worktree", "add", "-b", branch, str(worktree), commit], cwd=repo)
    spec.write_text("# test spec\n", encoding="utf-8")

    return WorktreeFixture(branch=branch, commit=commit, spec=spec, worktree=worktree)


def make_tmux_stub(tmp: Path) -> tuple[Path, Path]:
    bin_dir = tmp / "bin"
    state_dir = tmp / "tmux-state"
    ps_state_dir = tmp / "ps-state"
    bin_dir.mkdir()
    state_dir.mkdir()
    ps_state_dir.mkdir()
    tmux = bin_dir / "tmux"
    tmux.write_text(
        """#!/usr/bin/env bash
set -u
state="${TMUX_STUB_STATE:?}"
case "${1:-}" in
  has-session)
    shift
    session=""
    while [ "$#" -gt 0 ]; do
      case "$1" in
        -t) session="$2"; shift 2 ;;
        *) shift ;;
      esac
    done
    [ -n "$session" ] || exit 1
    [ -f "$state/$session.pid" ] || exit 1
    pid="$(cat "$state/$session.pid")"
    kill -0 "$pid" >/dev/null 2>&1
    ;;
  *)
    exit 2
    ;;
esac
""",
        encoding="utf-8",
    )
    tmux.chmod(0o755)
    ps = bin_dir / "ps"
    ps.write_text(
        """#!/usr/bin/env bash
set -u
state="${PS_STUB_STATE:?}"
pid=""
field=""
while [ "$#" -gt 0 ]; do
  case "$1" in
    -p) pid="$2"; shift 2 ;;
    -o) field="$2"; shift 2 ;;
    *) shift ;;
  esac
done
[ -n "$pid" ] && [ -n "$field" ] || exit 1
case "$field" in
  lstart=) cat "$state/$pid.lstart" ;;
  command=) cat "$state/$pid.command" ;;
  *) exit 1 ;;
esac
""",
        encoding="utf-8",
    )
    ps.chmod(0o755)
    return bin_dir, state_dir


def reaper_env(tmp: Path, runs_root: Path, tmux_bin: Path, tmux_state: Path) -> dict[str, str]:
    home = tmp / "home"
    recoil_home = home / ".recoil"
    recoil_home.mkdir(parents=True, exist_ok=True)
    env = os.environ.copy()
    env.update(
        {
            "DISPATCH_RUNS_ROOT": str(runs_root),
            "HOME": str(home),
            "LINEAR_TOKEN_FILE": str(recoil_home / "linear_token"),
            "NTFY_TOPIC_FILE": str(recoil_home / "ntfy_topic"),
            "PATH": f"{tmux_bin}{os.pathsep}{env.get('PATH', '')}",
            "PS_STUB_STATE": str(tmp / "ps-state"),
            "REAPER_DRY_RUN": "1",
            "REAPER_LOCK": str(tmp / "dispatch-reaper.lock"),
            "TMUX_STUB_STATE": str(tmux_state),
        }
    )
    return env


def invoke_reaper(env: dict[str, str]) -> subprocess.CompletedProcess[str]:
    result = subprocess.run(
        ["bash", str(REAPER)],
        cwd=REPO_ROOT,
        env=env,
        text=True,
        capture_output=True,
        timeout=10,
        check=False,
    )
    assert result.returncode == 0, result.stderr
    return result


def write_status(
    runs_root: Path,
    run_id: str,
    fixture: WorktreeFixture,
    *,
    state: str = "ATTEMPT_RUNNING",
    attempt: int = 1,
    max_attempts: int = 3,
) -> Path:
    run_dir = runs_root / run_id
    timestamp = now_iso()
    write_json(
        run_dir / "status.json",
        {
            "attempt": attempt,
            "branch": fixture.branch,
            "budget": {
                "codex_rounds_max": 18,
                "codex_rounds_used": 0,
                "concurrent_runs_max": 4,
                "wall_clock_s_max": 21600,
                "wall_clock_s_used": 0,
            },
            "created_at": timestamp,
            "issue": "REC-90",
            "last_failure_signature": None,
            "last_retry_cause": None,
            "last_validated_commit": fixture.commit,
            "linear_projected_at": None,
            "linear_projection_dirty": True,
            "max_attempts": max_attempts,
            "prior_failure_signatures": [],
            "pr_url": None,
            "run_id": run_id,
            "spec": str(fixture.spec),
            "started_grace_until": past_iso(),
            "state": state,
            "updated_at": timestamp,
            "worktree": str(fixture.worktree),
        },
    )
    (run_dir / "events.jsonl").touch()
    return run_dir


def write_heartbeat(
    run_dir: Path,
    run_id: str,
    *,
    attempt: int,
    pid: int,
    start_time: str = "Mon Jan  1 00:00:00 2024",
    tmux_session: str,
    updated_at: str | None = None,
    ps_match: str = "harness_orchestrator.sh",
) -> None:
    write_json(
        run_dir / f"attempt-{attempt:03d}" / "heartbeat.json",
        {
            "attempt": attempt,
            "log_path": str(run_dir / f"attempt-{attempt:03d}" / "build-log.md"),
            "orchestrator_pid": pid,
            "orchestrator_start_time": start_time,
            "phase_hint": None,
            "ps_match": ps_match,
            "round_hint": None,
            "run_id": run_id,
            "tmux_session": tmux_session,
            "updated_at": updated_at or now_iso(),
        },
    )


def write_terminal_status(
    run_dir: Path,
    run_id: str,
    *,
    attempt: int,
    exit_code: int = 1,
    converge_status: str = "FAILED",
    cause_hint: str = "validation_failure",
    failure_signature: str | None = "sha256:failed-terminal",
    pr_url: str | None = None,
) -> None:
    write_json(
        run_dir / f"attempt-{attempt:03d}" / "terminal_status.json",
        {
            "attempt": attempt,
            "branch": f"codex/{run_id}",
            "cause_hint": cause_hint,
            "commit": "def456",
            "converge_status": converge_status,
            "convergence_verdict_summary": "VERDICT: FAILED",
            "exit_code": exit_code,
            "failing_test_ids": ["test_failed_terminal_caps"],
            "failure_reason": "Gate/step: validation\nExit code: 1",
            "failure_signature": failure_signature,
            "gate": "validation",
            "log_path": str(run_dir / f"attempt-{attempt:03d}" / "build-log.md"),
            "normalized_failure_excerpt": "FAILED test_failed_terminal_caps",
            "phase": "phase-1",
            "pr_url": pr_url,
            "run_id": run_id,
            "validation_command": "pytest recoil/pipeline/tools/tests/test_dispatch_reaper.py",
            "written_at": now_iso(),
        },
    )


def make_notify_stub(tmp: Path) -> tuple[Path, Path]:
    notify_log = tmp / "notify.log"
    notify = tmp / "notify.sh"
    notify.write_text(
        """#!/usr/bin/env bash
set -u
printf '%s\\n' "$*" >> "${NOTIFY_LOG:?}"
exit 0
""",
        encoding="utf-8",
    )
    notify.chmod(0o755)
    return notify, notify_log


def unused_pid() -> int:
    for pid in range(999_999, 900_000, -1):
        try:
            os.kill(pid, 0)
        except ProcessLookupError:
            return pid
        except PermissionError:
            continue
    raise AssertionError("could not find an unused pid")


def assert_relaunch_cycle_within(run_dir: Path, env: dict[str, str], max_ticks: int) -> int:
    for tick in range(1, max_ticks + 1):
        invoke_reaper(env)
        status = read_json(run_dir / "status.json")
        states = transition_states(run_dir)
        if (
            "ZOMBIE_SUSPECT" in states
            and "ZOMBIE_REAPED" in states
            and status["state"] == "ATTEMPT_RUNNING"
            and status["attempt"] == 2
        ):
            suspect_at = states.index("ZOMBIE_SUSPECT")
            reaped_at = states.index("ZOMBIE_REAPED")
            running_after = states.index("ATTEMPT_RUNNING", reaped_at + 1)
            assert suspect_at < reaped_at < running_after
            assert tick <= max_ticks
            return tick
    raise AssertionError(f"reaper did not complete zombie relaunch in {max_ticks} ticks")


def test_confirmed_dead_run_is_suspected_reaped_and_dry_run_relaunched(tmp: Path) -> None:
    runs_root = tmp / "runs"
    runs_root.mkdir()
    tmux_bin, tmux_state = make_tmux_stub(tmp)
    env = reaper_env(tmp, runs_root, tmux_bin, tmux_state)

    run_id = "REC90-confirmed-dead"
    fixture = make_worktree(tmp, run_id)
    run_dir = write_status(runs_root, run_id, fixture)
    write_heartbeat(
        run_dir,
        run_id,
        attempt=1,
        pid=unused_pid(),
        tmux_session="missing-confirmed-dead-session",
    )

    ticks = assert_relaunch_cycle_within(run_dir, env, max_ticks=4)
    status = read_json(run_dir / "status.json")

    assert ticks <= 4
    assert status["attempt"] == 2
    assert status["state"] == "ATTEMPT_RUNNING"
    assert any(item.get("event") == "retry_start" and item.get("zombie") for item in events(run_dir))


def test_alive_identity_matching_run_is_left_attempt_running(tmp: Path) -> None:
    # CI may not have tmux, and this sandbox blocks /bin/ps. The tmp PATH stubs
    # implement only the tmux/ps probes used by pid_identity_alive, so this still
    # drives the real reaper classifier while avoiding host-level dependencies.
    runs_root = tmp / "runs"
    runs_root.mkdir()
    tmux_bin, tmux_state = make_tmux_stub(tmp)
    env = reaper_env(tmp, runs_root, tmux_bin, tmux_state)

    run_id = "REC90-alive"
    fixture = make_worktree(tmp, run_id)
    run_dir = write_status(runs_root, run_id, fixture)
    session = f"{run_id}-attempt-001"

    proc = subprocess.Popen(
        ["python3", "-c", "import time; time.sleep(300)", run_id],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        text=True,
    )
    try:
        start_time = "Sun Jun  7 10:00:00 2026"
        cmdline = f"python3 -c import time; time.sleep(300) {run_id}"
        (tmp / "ps-state" / f"{proc.pid}.lstart").write_text(
            f"{start_time}\n", encoding="utf-8"
        )
        (tmp / "ps-state" / f"{proc.pid}.command").write_text(
            f"{cmdline}\n", encoding="utf-8"
        )
        (tmux_state / f"{session}.pid").write_text(f"{proc.pid}\n", encoding="utf-8")
        write_heartbeat(
            run_dir,
            run_id,
            attempt=1,
            pid=proc.pid,
            start_time=start_time,
            tmux_session=session,
            ps_match="time.sleep",
        )

        invoke_reaper(env)
        status = read_json(run_dir / "status.json")

        assert status["state"] == "ATTEMPT_RUNNING"
        assert status["attempt"] == 1
        assert "ZOMBIE_SUSPECT" not in transition_states(run_dir)
        assert "ZOMBIE_REAPED" not in transition_states(run_dir)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=2)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait(timeout=2)


def test_confirmed_dead_at_attempt_cap_caps_without_retry(tmp: Path) -> None:
    runs_root = tmp / "runs"
    runs_root.mkdir()
    tmux_bin, tmux_state = make_tmux_stub(tmp)
    env = reaper_env(tmp, runs_root, tmux_bin, tmux_state)

    run_id = "REC90-attempt-cap"
    fixture = make_worktree(tmp, run_id)
    run_dir = write_status(runs_root, run_id, fixture, attempt=3, max_attempts=3)
    write_heartbeat(
        run_dir,
        run_id,
        attempt=3,
        pid=unused_pid(),
        tmux_session="missing-attempt-cap-session",
    )

    for _ in range(3):
        invoke_reaper(env)
        status = read_json(run_dir / "status.json")
        if status["state"] == "CAPPED_NEEDS_HUMAN":
            break

    status = read_json(run_dir / "status.json")
    assert status["state"] == "CAPPED_NEEDS_HUMAN"
    assert status["attempt"] == 3
    assert not any(item.get("event") == "retry_start" for item in events(run_dir))


def test_failed_terminal_status_on_dead_stale_attempt_caps_and_notifies(tmp: Path) -> None:
    runs_root = tmp / "runs"
    runs_root.mkdir()
    tmux_bin, tmux_state = make_tmux_stub(tmp)
    notify, notify_log = make_notify_stub(tmp)
    env = reaper_env(tmp, runs_root, tmux_bin, tmux_state)
    env["NOTIFY_TOOL"] = str(notify)
    env["NOTIFY_LOG"] = str(notify_log)

    run_id = "REC110-failed-terminal"
    fixture = make_worktree(tmp, run_id)
    run_dir = write_status(runs_root, run_id, fixture)
    write_heartbeat(
        run_dir,
        run_id,
        attempt=1,
        pid=unused_pid(),
        tmux_session="missing-failed-terminal-session",
        updated_at=past_iso(seconds=3_600),
    )
    write_terminal_status(run_dir, run_id, attempt=1)

    invoke_reaper(env)
    status = read_json(run_dir / "status.json")
    states = transition_states(run_dir)
    notify_output = notify_log.read_text(encoding="utf-8")

    assert status["state"] == "CAPPED_NEEDS_HUMAN"
    assert status["attempt"] == 1
    assert "CAPPED_NEEDS_HUMAN" in states
    assert "ZOMBIE_SUSPECT" not in states
    assert not any(item.get("event") == "retry_start" for item in events(run_dir))
    assert "--event capped" in notify_output


def test_ambiguous_fresh_dead_waits_for_short_grace_before_suspect(tmp: Path) -> None:
    runs_root = tmp / "runs"
    runs_root.mkdir()
    tmux_bin, tmux_state = make_tmux_stub(tmp)
    env = reaper_env(tmp, runs_root, tmux_bin, tmux_state)
    env["FRESH_DEAD_GRACE_S"] = "5"

    run_id = "REC90-ambiguous-fresh-dead"
    fixture = make_worktree(tmp, run_id)
    run_dir = write_status(runs_root, run_id, fixture)

    proc = subprocess.Popen(
        ["python3", "-c", "import time; time.sleep(300)", run_id],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        text=True,
    )
    try:
        session = f"{run_id}-missing-session"
        actual_start = "Sun Jun  7 10:00:00 2026"
        heartbeat_start = "Sun Jun  7 10:01:00 2026"
        (tmp / "ps-state" / f"{proc.pid}.lstart").write_text(
            f"{actual_start}\n", encoding="utf-8"
        )
        (tmp / "ps-state" / f"{proc.pid}.command").write_text(
            f"python3 -c import time; time.sleep(300) {run_id}\n", encoding="utf-8"
        )
        write_heartbeat(
            run_dir,
            run_id,
            attempt=1,
            pid=proc.pid,
            start_time=heartbeat_start,
            tmux_session=session,
            updated_at=now_iso(),
            ps_match="time.sleep",
        )

        invoke_reaper(env)
        status = read_json(run_dir / "status.json")

        assert status["state"] == "ATTEMPT_RUNNING"
        assert status["attempt"] == 1
        assert "ZOMBIE_SUSPECT" not in transition_states(run_dir)

        write_heartbeat(
            run_dir,
            run_id,
            attempt=1,
            pid=proc.pid,
            start_time=heartbeat_start,
            tmux_session=session,
            updated_at=past_iso(seconds=10),
            ps_match="time.sleep",
        )

        invoke_reaper(env)
        status = read_json(run_dir / "status.json")

        assert status["state"] == "ZOMBIE_SUSPECT"
        assert status["attempt"] == 1
        assert transition_states(run_dir).count("ZOMBIE_SUSPECT") == 1
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=2)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait(timeout=2)

# ── REC-229 Phase 3 regressions: pr_url stamp must NOT steal terminal ownership,
#    and the reaper consumes terminal_status.json.pr_url on BOTH handlers. ──
# A converged terminal_status reuses write_terminal_status with exit_code=0,
# converge_status="CONVERGED" (the classify CONVERGED contract) + a non-null pr_url.

def write_converged_terminal(run_dir: Path, run_id: str, *, attempt: int, pr_url: str) -> None:
    write_terminal_status(
        run_dir, run_id, attempt=attempt,
        exit_code=0, converge_status="CONVERGED", cause_hint="converged",
        failure_signature=None, pr_url=pr_url,
    )


def test_pr_url_stamped_run_keeps_reaper_terminal_ownership(tmp: Path) -> None:
    """R14: a pr_url-stamped run (pr_url set, state STILL non-terminal) must reach
    handle_terminal_status — the stamp must NOT pre-set terminal state and early-return."""
    runs_root = tmp / "runs"
    runs_root.mkdir()
    tmux_bin, tmux_state = make_tmux_stub(tmp)
    notify, notify_log = make_notify_stub(tmp)
    env = reaper_env(tmp, runs_root, tmux_bin, tmux_state)
    env["NOTIFY_TOOL"] = str(notify)
    env["NOTIFY_LOG"] = str(notify_log)

    run_id = "REC229-stamped-converged"
    fixture = make_worktree(tmp, run_id)
    run_dir = write_status(runs_root, run_id, fixture, state="ATTEMPT_RUNNING")

    # Stamp pr_url via the new pr_url-only subcommand — state stays ATTEMPT_RUNNING.
    run(["python3", str(TOOLS_DIR / "dispatch_status.py"), "set-pr-url",
         "--run-dir", str(run_dir), "--pr-url", "https://github.com/x/y/pull/55"])
    assert read_json(run_dir / "status.json")["state"] == "ATTEMPT_RUNNING"
    assert read_json(run_dir / "status.json")["pr_url"] == "https://github.com/x/y/pull/55"

    write_converged_terminal(run_dir, run_id, attempt=1, pr_url="https://github.com/x/y/pull/55")

    invoke_reaper(env)

    status = read_json(run_dir / "status.json")
    states = transition_states(run_dir)
    notify_output = notify_log.read_text(encoding="utf-8")
    # Reached handle_terminal_status (did NOT early-return at the terminal-state check):
    assert status["state"] == "CONVERGED_PR_CREATED"
    assert "CONVERGED_PR_CREATED" in states
    assert "--event converged" in notify_output


def test_reaper_writes_terminal_status_pr_url_on_alive_handler(tmp: Path) -> None:
    """R10b: handle_terminal_status sources pr_url from terminal_status.json and
    OVERWRITES status.json.pr_url with it (so the :902 emission fix is the right
    single-point upstream fix)."""
    runs_root = tmp / "runs"
    runs_root.mkdir()
    tmux_bin, tmux_state = make_tmux_stub(tmp)
    notify, notify_log = make_notify_stub(tmp)
    env = reaper_env(tmp, runs_root, tmux_bin, tmux_state)
    env["NOTIFY_TOOL"] = str(notify)
    env["NOTIFY_LOG"] = str(notify_log)

    run_id = "REC229-alive-pr-url"
    fixture = make_worktree(tmp, run_id)
    run_dir = write_status(runs_root, run_id, fixture, state="ATTEMPT_RUNNING")
    # status.json has a STALE/garbage pr_url pre-reaper:
    run(["python3", str(TOOLS_DIR / "dispatch_status.py"), "set-pr-url",
         "--run-dir", str(run_dir), "--pr-url", "https://github.com/STALE/pull/1"])
    write_converged_terminal(run_dir, run_id, attempt=1, pr_url="https://github.com/correct/pull/99")

    invoke_reaper(env)

    status = read_json(run_dir / "status.json")
    assert status["state"] == "CONVERGED_PR_CREATED"
    assert status["pr_url"] == "https://github.com/correct/pull/99"


def test_reaper_writes_terminal_status_pr_url_on_dead_handler(tmp: Path) -> None:
    """R10b: handle_dead_terminal_status (dead-session converged branch) ALSO sources
    pr_url from terminal_status.json and writes it into status.json."""
    runs_root = tmp / "runs"
    runs_root.mkdir()
    tmux_bin, tmux_state = make_tmux_stub(tmp)
    notify, notify_log = make_notify_stub(tmp)
    env = reaper_env(tmp, runs_root, tmux_bin, tmux_state)
    env["NOTIFY_TOOL"] = str(notify)
    env["NOTIFY_LOG"] = str(notify_log)

    run_id = "REC229-dead-pr-url"
    fixture = make_worktree(tmp, run_id)
    run_dir = write_status(runs_root, run_id, fixture, state="ATTEMPT_RUNNING")
    run(["python3", str(TOOLS_DIR / "dispatch_status.py"), "set-pr-url",
         "--run-dir", str(run_dir), "--pr-url", "https://github.com/STALE/pull/2"])
    # Dead-session converged path: ATTEMPT_RUNNING + terminal + heartbeat + dead orchestrator.
    write_heartbeat(
        run_dir,
        run_id,
        attempt=1,
        pid=unused_pid(),
        tmux_session="missing-dead-pr-url-session",
        updated_at=past_iso(seconds=3_600),
    )
    write_converged_terminal(run_dir, run_id, attempt=1, pr_url="https://github.com/correct/pull/100")

    invoke_reaper(env)

    status = read_json(run_dir / "status.json")
    assert status["state"] == "CONVERGED_PR_CREATED"
    assert status["pr_url"] == "https://github.com/correct/pull/100"
