"""Behavioral acceptance tests for recoil_dashboard.py (REC-229 Phase 2).

Build a tmp_path status-root with hand-written status.json fixtures and
dependency-inject the gh seam — no live ~/.recoil read, no live gh call.
"""

from __future__ import annotations

import json
import re
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path

import pytest

_TOOLS = Path(__file__).resolve().parent.parent
if str(_TOOLS) not in sys.path:
    sys.path.insert(0, str(_TOOLS))

import dispatch_status  # noqa: E402
import recoil_dashboard as rd  # noqa: E402


def _iso(dt: datetime) -> str:
    return dt.replace(microsecond=0).isoformat().replace("+00:00", "Z")


def _now() -> datetime:
    return datetime.now(timezone.utc)


def write_run(root: Path, run_id: str, **fields):
    d = root / run_id
    d.mkdir(parents=True, exist_ok=True)
    status = {
        "run_id": run_id,
        "issue": fields.get("issue"),
        "branch": fields.get("branch"),
        "state": fields.get("state", "ATTEMPT_RUNNING"),
        "pr_url": fields.get("pr_url"),
        "updated_at": fields.get("updated_at", _iso(_now())),
        "created_at": fields.get("created_at", _iso(_now())),
    }
    (d / "status.json").write_text(json.dumps(status), encoding="utf-8")
    return d


def gh_returning(prs):
    """Build a gh seam that returns the canned PR list, recording call args."""
    calls = []

    def _gh(args):
        calls.append(args)
        return json.dumps(prs)

    _gh.calls = calls
    return _gh


def gh_failing():
    def _gh(args):
        return None
    return _gh


def report(root, gh):
    return rd.build_report(root, gh)


# 1. Renders the axes — table over every non-terminal BUILDING state + capped + terminal excluded
@pytest.mark.parametrize("state", ["STARTED", "ATTEMPT_RUNNING", "RETRY_PENDING", "ZOMBIE_SUSPECT", "ZOMBIE_REAPED"])
def test_building_states_table(tmp_path, state):
    write_run(tmp_path, f"run-{state}", state=state, branch=f"b-{state}")
    write_run(tmp_path, "run-capped", state="CAPPED_NEEDS_HUMAN")
    write_run(tmp_path, "run-conv", state="CONVERGED_PR_CREATED", pr_url="https://x/pull/9")
    r = report(tmp_path, gh_returning([]))
    assert r["source_unreachable"] is False
    building_ids = {b["run_id"] for b in r["building"]}
    assert f"run-{state}" in building_ids
    assert "run-capped" in {c["run_id"] for c in r["capped"]}
    assert "run-conv" not in building_ids
    assert "run-capped" not in building_ids


# 2. Anomaly: terminal + pr_url null
def test_anomaly_terminal_null_pr_url(tmp_path):
    write_run(tmp_path, "run-conv-null", state="CONVERGED_PR_CREATED", pr_url=None)
    r = report(tmp_path, gh_returning([]))
    assert "run-conv-null" not in {b["run_id"] for b in r["building"]}
    assert any("pr_url null" in a["anomaly"] for a in r["anomalies"])


# 2a. Malformed/unreadable individual status.json does not crash
def test_malformed_status_is_anomaly_not_crash(tmp_path):
    write_run(tmp_path, "run-good", state="ATTEMPT_RUNNING", branch="bg")
    bad = tmp_path / "run-bad"
    bad.mkdir()
    (bad / "status.json").write_text("{not json", encoding="utf-8")
    write_run(tmp_path, "run-cap", state="CAPPED_NEEDS_HUMAN")
    r = report(tmp_path, gh_returning([]))
    assert r["source_unreachable"] is False
    assert "run-good" in {b["run_id"] for b in r["building"]}
    assert "run-cap" in {c["run_id"] for c in r["capped"]}
    assert any("run-bad" in a["run_dir"] for a in r["anomalies"])


def test_missing_state_key_is_anomaly(tmp_path):
    write_run(tmp_path, "run-good", state="ATTEMPT_RUNNING")
    nostate = tmp_path / "run-nostate"
    nostate.mkdir()
    (nostate / "status.json").write_text(json.dumps({"run_id": "run-nostate"}), encoding="utf-8")
    r = report(tmp_path, gh_returning([]))
    assert r["source_unreachable"] is False
    assert "run-good" in {b["run_id"] for b in r["building"]}
    assert any("run-nostate" in a["run_dir"] for a in r["anomalies"])


# 2c. STALE/FROZEN axis + exact both-list membership
def test_stale_frozen_both_list_membership(tmp_path):
    write_run(tmp_path, "run-fresh", state="ATTEMPT_RUNNING", updated_at=_iso(_now()))
    write_run(tmp_path, "run-old", state="ATTEMPT_RUNNING",
              updated_at=_iso(_now() - timedelta(seconds=rd.STALE_AGE_SECONDS + 7200)))
    write_run(tmp_path, "run-zombie", state="ZOMBIE_SUSPECT", updated_at=_iso(_now()))
    r = report(tmp_path, gh_returning([]))
    building_ids = {b["run_id"] for b in r["building"]}
    stale_ids = {s["run_id"] for s in r["stale"]}
    assert "run-fresh" in building_ids and "run-fresh" not in stale_ids
    assert "run-old" in stale_ids and "run-old" in building_ids
    assert "run-zombie" in stale_ids and "run-zombie" in building_ids
    assert stale_ids <= building_ids  # stale is a cross-cutting flag, not a removal


# 2b. Terminal-state SSOT + reaper-drift guard
def test_terminal_ssot_identity():
    assert rd.TERMINAL_STATES is dispatch_status.TERMINAL_STATES


def test_reaper_bash_terminal_set_matches():
    reaper = _TOOLS / "dispatch_reaper.sh"
    text = reaper.read_text(encoding="utf-8")
    m = re.search(r'^TERMINAL_STATES="([^"]*)"', text, re.MULTILINE)
    assert m, "could not find TERMINAL_STATES in dispatch_reaper.sh"
    bash_set = set(m.group(1).split())
    assert bash_set == dispatch_status.TERMINAL_STATES


def test_future_state_absorbed_into_building(tmp_path):
    write_run(tmp_path, "run-future", state="SOME_NEW_FUTURE_STATE")
    r = report(tmp_path, gh_returning([]))
    assert "run-future" in {b["run_id"] for b in r["building"]}


# 3. Unreachable != empty
def test_unreachable_vs_empty(tmp_path):
    missing = tmp_path / "does-not-exist"
    r_missing = report(missing, gh_returning([]))
    assert r_missing["source_unreachable"] is True

    empty = tmp_path / "empty"
    empty.mkdir()
    r_empty = report(empty, gh_returning([]))
    assert r_empty["source_unreachable"] is False
    assert r_empty["building"] == []
    assert r_empty["capped"] == []


# 3b. status-root precedence + expanduser on default
def test_status_root_precedence_and_expanduser(tmp_path, monkeypatch):
    monkeypatch.delenv("RECOIL_DASHBOARD_STATUS_ROOT", raising=False)
    # (i) no flag, no env → default, expanded
    default = rd._resolve_status_root(None)
    assert str(default) == str(Path.home() / ".recoil" / "dispatch-runs")
    # (ii) explicit flag wins
    assert str(rd._resolve_status_root(str(tmp_path))) == str(tmp_path)
    # (iii) env used when no flag
    monkeypatch.setenv("RECOIL_DASHBOARD_STATUS_ROOT", str(tmp_path / "envroot"))
    assert str(rd._resolve_status_root(None)) == str(tmp_path / "envroot")
    # (iv) flag overrides env
    assert str(rd._resolve_status_root(str(tmp_path / "flagroot"))) == str(tmp_path / "flagroot")
    # (v) env with literal ~ is expanded
    monkeypatch.setenv("RECOIL_DASHBOARD_STATUS_ROOT", "~/somefixture")
    assert str(rd._resolve_status_root(None)) == str(Path.home() / "somefixture")


def test_default_status_root_in_json_is_resolved(tmp_path, monkeypatch, capsys):
    monkeypatch.delenv("RECOIL_DASHBOARD_STATUS_ROOT", raising=False)
    monkeypatch.setattr(rd, "_run_gh", lambda args: json.dumps([]))
    rd.main(["--json"])
    out = json.loads(capsys.readouterr().out)
    assert out["status_root"] == str(Path.home() / ".recoil" / "dispatch-runs")


# 4. PR join — all match modes + no-match, + body field requested + --limit 100
def test_pr_join_all_modes(tmp_path):
    write_run(tmp_path, "run-A", state="ATTEMPT_RUNNING", branch="claude/REC-11-foo", issue="REC-11")
    write_run(tmp_path, "run-B", state="ATTEMPT_RUNNING", branch="x", issue="REC-44")
    prs = [
        {"number": 1, "title": "t", "body": "", "headRefName": "claude/REC-11-foo",
         "state": "OPEN", "mergeStateStatus": "CLEAN", "url": "u1"},  # branch match
        {"number": 2, "title": "Closes REC-22", "body": "", "headRefName": "other",
         "state": "OPEN", "mergeStateStatus": "CLEAN", "url": "u2"},  # title REC
        {"number": 3, "title": "nope", "body": "fixes Closes REC-33 here", "headRefName": "z",
         "state": "OPEN", "mergeStateStatus": "CLEAN", "url": "u3"},  # body REC
        {"number": 4, "title": "REC-44 work", "body": "", "headRefName": "w",
         "state": "OPEN", "mergeStateStatus": "CLEAN", "url": "u4"},  # matches run-B issue
        {"number": 5, "title": "unmatched", "body": "", "headRefName": "q",
         "state": "OPEN", "mergeStateStatus": "CLEAN", "url": "u5"},  # no match
    ]
    gh = gh_returning(prs)
    r = report(tmp_path, gh)
    by_num = {p["number"]: p for p in r["open_prs"]}
    assert by_num[1]["joined_issue"] == "REC-11"
    assert by_num[2]["joined_issue"] == "REC-22"
    assert by_num[3]["joined_issue"] == "REC-33"
    assert by_num[4]["joined_issue"] == "REC-44"
    assert by_num[5]["joined_issue"] is None
    # body field + limit requested
    assert len(gh.calls) == 1
    call = gh.calls[0]
    assert "body" in call[call.index("--json") + 1]
    assert "100" in call


# 4b. PR join includes terminal CONVERGED runs (gate finding REC-229)
def test_pr_join_includes_terminal_converged_run(tmp_path):
    # A run that already CONVERGED (terminal) and created its PR. Its REC token
    # is NOT in the PR title/body/head — the join must come from the run's
    # status.json issue, which requires terminal-converged runs to be retained.
    write_run(tmp_path, "run-conv", state="CONVERGED_PR_CREATED",
              branch="claude/REC-77-thing", issue="REC-77",
              pr_url="https://github.com/x/pull/5")
    prs = [
        {"number": 5, "title": "dashboard fix", "body": "no token here",
         "headRefName": "claude/REC-77-thing", "state": "OPEN",
         "mergeStateStatus": "CLEAN", "url": "u5"},
    ]
    r = report(tmp_path, gh_returning(prs))
    # The converged run is excluded from the build axes...
    assert "run-conv" not in {b["run_id"] for b in r["building"]}
    assert "run-conv" not in {c["run_id"] for c in r["capped"]}
    # ...but its metadata IS available to the OPEN-PRS join (not dropped).
    by_num = {p["number"]: p for p in r["open_prs"]}
    assert by_num[5]["joined_issue"] == "REC-77"


# 5. gh unavailable
def test_gh_unavailable(tmp_path):
    write_run(tmp_path, "run-A", state="ATTEMPT_RUNNING")
    r = report(tmp_path, gh_failing())
    assert r["gh_unavailable"] is True
    assert r["open_prs"] == []
    assert "run-A" in {b["run_id"] for b in r["building"]}


# 6. unreachable-status-root + working-gh independence
def test_unreachable_status_root_working_gh(tmp_path):
    missing = tmp_path / "nope"
    prs = [{"number": 7, "title": "t", "body": "", "headRefName": "h",
            "state": "OPEN", "mergeStateStatus": "CLEAN", "url": "u"}]
    r = report(missing, gh_returning(prs))
    assert r["source_unreachable"] is True
    assert len(r["open_prs"]) == 1


# 6b. unreadable status-root (PermissionError/OSError) classified, not crashing
@pytest.mark.parametrize("exc", [PermissionError, OSError])
def test_unreadable_status_root_classified(tmp_path, exc):
    root = tmp_path / "exists"
    root.mkdir()

    def raising_scan(_root):
        raise exc("boom")

    prs = [{"number": 8, "title": "t", "body": "", "headRefName": "h",
            "state": "OPEN", "mergeStateStatus": "CLEAN", "url": "u"}]
    r = rd.build_report(root, gh_returning(prs), scan=raising_scan)
    assert r["source_unreachable"] is True
    assert len(r["open_prs"]) == 1


# 7. human-pane render
def test_human_render(tmp_path, capsys, monkeypatch):
    write_run(tmp_path, "run-bld", state="ATTEMPT_RUNNING", branch="bb")
    write_run(tmp_path, "run-cap", state="CAPPED_NEEDS_HUMAN")
    monkeypatch.setattr(rd, "_run_gh", lambda args: json.dumps([
        {"number": 3, "title": "pr", "body": "", "headRefName": "bb",
         "state": "OPEN", "mergeStateStatus": "CLEAN", "url": "u"}]))
    rd.main(["--status-root", str(tmp_path)])
    out = capsys.readouterr().out
    assert "run-bld" in out
    assert "run-cap" in out
    assert "OPEN PRS" in out
    assert "BACKLOG/DONE/STALE: run via /dashboard" in out


def test_human_render_unreachable_banner(tmp_path, capsys, monkeypatch):
    monkeypatch.setattr(rd, "_run_gh", lambda args: json.dumps([]))
    rd.main(["--status-root", str(tmp_path / "missing")])
    out = capsys.readouterr().out
    assert "SOURCE UNREACHABLE" in out


# 8. gh repo-context is cwd-independent
def test_gh_cwd_independent(tmp_path, monkeypatch):
    recorded = {}

    def fake_run(cmd, cwd=None, **kwargs):
        recorded["cwd"] = cwd

        class R:
            returncode = 0
            stdout = "[]"
        return R()

    monkeypatch.chdir(tmp_path)
    monkeypatch.setattr(rd.subprocess, "run", fake_run)
    rd._run_gh(["pr", "list"])
    assert recorded["cwd"] == str(rd._REPO_ROOT)
    assert recorded["cwd"] != str(tmp_path)


# 9. No Linear call-boundary
def test_no_linear_boundary():
    source = Path(rd.__file__).read_text(encoding="utf-8")
    assert "mcp__linear" not in source
    assert "import linear" not in source
    assert "linear_token" not in source
    assert "api.linear.app" not in source
    assert not hasattr(rd, "linear_client")
