#!/usr/bin/env python3
"""Recoil at-a-glance dashboard — read-only local JOIN of dispatch status.json
files-of-record + `gh pr list`.

Renders the volatile axes JT cannot see at a glance: 🔨 BUILDING / ⚠️ CAPPED /
🔀 OPEN PRS, plus 🧊 STALE/FROZEN and local anomalies. Writes nothing; NEVER
contacts Linear (the /dashboard skill adds the in-session Linear read on top).

The classification is TERMINAL-anchored — it imports the canonical
dispatch_status.TERMINAL_STATES (the single Python SSOT) and derives
`building = not terminal`, so a NEW non-terminal chassis state auto-lands in
BUILDING instead of being silently dropped.
"""

from __future__ import annotations

import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable

# The script lives at recoil/pipeline/tools/recoil_dashboard.py, beside
# dispatch_status.py — make the sibling importable deterministically from
# __file__ (NOT the process cwd).
_TOOLS_DIR = Path(__file__).resolve().parent
if str(_TOOLS_DIR) not in sys.path:
    sys.path.insert(0, str(_TOOLS_DIR))

import dispatch_status  # noqa: E402  (after the deterministic sys.path insert)

# Single Python SSOT: hold a REFERENCE to the canonical terminal set, never a
# re-typed copy (test asserts identity).
TERMINAL_STATES = dispatch_status.TERMINAL_STATES

CAPPED_STATE = "CAPPED_NEEDS_HUMAN"
ZOMBIE_STATES = {"ZOMBIE_SUSPECT", "ZOMBIE_REAPED"}

# A non-terminal run with no status progress for this long is FROZEN. Single
# module-level knob (no magic number inline) so it is tunable in one place.
STALE_AGE_SECONDS = 3600  # 1h

# Repo root derived from __file__: …/recoil/pipeline/tools/ → up 3 → repo root.
_REPO_ROOT = _TOOLS_DIR.parent.parent.parent

GH_FIELDS = "number,title,body,headRefName,state,mergeStateStatus,url"
GH_LIMIT = 100

_REC_RE = re.compile(r"REC-\d+")


def _resolve_status_root(flag_value: str | None) -> Path:
    """Precedence: explicit --status-root > env RECOIL_DASHBOARD_STATUS_ROOT >
    default ~/.recoil/dispatch-runs. ALL three are expanduser()-expanded so a
    literal ~ never globs a bogus ./~/.recoil/... path."""
    if flag_value:
        raw = flag_value
    elif os.environ.get("RECOIL_DASHBOARD_STATUS_ROOT"):
        raw = os.environ["RECOIL_DASHBOARD_STATUS_ROOT"]
    else:
        raw = str(Path.home() / ".recoil" / "dispatch-runs")
    return Path(raw).expanduser()


def _parse_iso(value: str | None) -> datetime | None:
    if not value:
        return None
    try:
        return datetime.fromisoformat(value.replace("Z", "+00:00"))
    except (ValueError, TypeError):
        return None


def _age_seconds(status: dict[str, Any]) -> int | None:
    ts = _parse_iso(status.get("updated_at")) or _parse_iso(status.get("created_at"))
    if ts is None:
        return None
    if ts.tzinfo is None:
        ts = ts.replace(tzinfo=timezone.utc)
    return int((datetime.now(timezone.utc) - ts).total_seconds())


def _run_gh(args: list[str]) -> str | None:
    """Injectable gh seam. Returns stdout on success, None on any failure
    (non-zero exit, missing gh, exception). Run with the repo root as cwd so the
    invocation is cwd-independent (the skill runs from an arbitrary directory)."""
    try:
        result = subprocess.run(
            ["gh", *args],
            cwd=str(_REPO_ROOT),
            capture_output=True,
            text=True,
            check=False,
        )
    except (OSError, ValueError):
        return None
    if result.returncode != 0:
        return None
    return result.stdout


def _scan_status_root(status_root: Path) -> list[Path]:
    """Glob <status-root>/*/status.json. Raises on an unreadable/unstattable
    dir — the caller classifies that as source_unreachable."""
    return sorted(status_root.glob("*/status.json"))


def _run_summary(status: dict[str, Any]) -> dict[str, Any]:
    return {
        "run_id": status.get("run_id"),
        "issue": status.get("issue"),
        "branch": status.get("branch"),
        "state": status.get("state"),
        "pr_url": status.get("pr_url"),
        "age_seconds": _age_seconds(status),
    }


def collect_local(status_root: Path, scan: Callable[[Path], list[Path]] = _scan_status_root) -> dict[str, Any]:
    """Read-only local join over status.json files. Never raises — an
    unreachable root is CLASSIFIED, a single malformed file is an anomaly."""
    out: dict[str, Any] = {
        "source_unreachable": False,
        "status_root": str(status_root),
        "building": [],
        "capped": [],
        "converged": [],
        "stale": [],
        "anomalies": [],
    }

    if not status_root.exists():
        out["source_unreachable"] = True
        return out
    try:
        status_files = scan(status_root)
    except (PermissionError, OSError):
        out["source_unreachable"] = True
        return out

    for sp in status_files:
        run_dir = sp.parent
        try:
            status = json.loads(sp.read_text(encoding="utf-8"))
            if not isinstance(status, dict) or "state" not in status:
                raise KeyError("state")
        except (json.JSONDecodeError, ValueError, PermissionError, OSError, KeyError):
            out["anomalies"].append(
                {"run_dir": str(run_dir), "anomaly": "unreadable/malformed status.json"}
            )
            continue

        state = status.get("state")
        summary = _run_summary(status)

        if state == CAPPED_STATE:
            out["capped"].append(summary)
            continue
        if state in TERMINAL_STATES:
            # Terminal-but-no-pr_url is the exact ambiguity Phase 3 kills.
            if not status.get("pr_url"):
                out["anomalies"].append(
                    {"run_dir": str(run_dir), "anomaly": "terminal CONVERGED_PR_CREATED but pr_url null/empty",
                     "run_id": status.get("run_id"), "state": state}
                )
            else:
                # Terminal-converged runs are EXCLUDED from BUILDING/CAPPED, but
                # their branch/issue/pr_url metadata is the normal source of the
                # OPEN-PRS join (the reaper marks CONVERGED_PR_CREATED *after*
                # creating the PR). Keep them for join_prs(), not the build axes.
                out["converged"].append(summary)
            continue

        # Non-terminal ⇒ BUILDING (drift guard: building = not terminal).
        out["building"].append(summary)
        # STALE is a cross-cutting VIEW over the non-terminal set, NOT a removal.
        age = summary["age_seconds"]
        if state in ZOMBIE_STATES or (age is not None and age > STALE_AGE_SECONDS):
            out["stale"].append(summary)

    return out


def collect_prs(gh: Callable[[list[str]], str | None]) -> dict[str, Any]:
    raw = gh(["pr", "list", "--limit", str(GH_LIMIT), "--json", GH_FIELDS])
    if raw is None:
        return {"gh_unavailable": True, "prs_truncated": False, "open_prs": []}
    try:
        prs = json.loads(raw)
    except (json.JSONDecodeError, ValueError):
        return {"gh_unavailable": True, "prs_truncated": False, "open_prs": []}
    return {
        "gh_unavailable": False,
        "prs_truncated": len(prs) == GH_LIMIT,
        "open_prs": prs if isinstance(prs, list) else [],
    }


def _rec_from(*texts: str | None) -> str | None:
    for t in texts:
        if not t:
            continue
        m = _REC_RE.search(t)
        if m:
            return m.group(0)
    return None


def join_prs(open_prs: list[dict[str, Any]], local: dict[str, Any]) -> list[dict[str, Any]]:
    """Join PRs ↔ runs. Precedence (first match wins):
    (1) headRefName contains a run's branch (or its REC-NN token);
    (2) REC-NN from PR title/body;
    (3) a run whose status.json.issue == the extracted REC-NN.
    Unjoined PRs render with joined_issue=None (never dropped)."""
    runs = local.get("building", []) + local.get("capped", []) + local.get("converged", [])
    branch_by_run = {r.get("branch"): r for r in runs if r.get("branch")}
    issue_set = {r.get("issue") for r in runs if r.get("issue")}

    joined: list[dict[str, Any]] = []
    for pr in open_prs:
        head = pr.get("headRefName") or ""
        rec = None
        # (1) branch match
        for branch, run in branch_by_run.items():
            if branch and branch in head:
                rec = run.get("issue") or _rec_from(branch)
                break
        # (2) REC-NN from title/body/branch
        if rec is None:
            rec = _rec_from(head, pr.get("title"), pr.get("body"))
        # (3) match against a run's issue
        if rec is None:
            for issue in issue_set:
                if issue and issue in (pr.get("title") or "", pr.get("body") or "", head):
                    rec = issue
                    break
        joined.append(
            {
                "number": pr.get("number"),
                "title": pr.get("title"),
                "headRefName": pr.get("headRefName"),
                "mergeStateStatus": pr.get("mergeStateStatus"),
                "url": pr.get("url"),
                "joined_issue": rec,
            }
        )
    return joined


def build_report(status_root: Path, gh: Callable[[list[str]], str | None],
                 scan: Callable[[Path], list[Path]] = _scan_status_root) -> dict[str, Any]:
    local = collect_local(status_root, scan=scan)
    pr_axis = collect_prs(gh)
    report = {
        "source_unreachable": local["source_unreachable"],
        "gh_unavailable": pr_axis["gh_unavailable"],
        "prs_truncated": pr_axis["prs_truncated"],
        "status_root": local["status_root"],
        "building": local["building"],
        "capped": local["capped"],
        "stale": local["stale"],
        "anomalies": local["anomalies"],
        "open_prs": join_prs(pr_axis["open_prs"], local),
    }
    return report


def _fmt_age(secs: int | None) -> str:
    if secs is None:
        return "?"
    if secs < 90:
        return f"{secs}s"
    if secs < 5400:
        return f"{secs // 60}m"
    return f"{secs // 3600}h"


def _fmt_run(r: dict[str, Any]) -> str:
    return (f"  {r.get('run_id')}  {r.get('issue') or '-'}  {r.get('branch') or '-'}  "
            f"{r.get('state')}  ({_fmt_age(r.get('age_seconds'))})")


def render_human(report: dict[str, Any]) -> str:
    lines: list[str] = []
    if report["source_unreachable"]:
        lines.append(f"\033[1m⚠️ SOURCE UNREACHABLE — build axes unknown "
                     f"(status-root: {report['status_root']})\033[0m")

    lines.append("🔨 BUILDING")
    if report["building"]:
        lines.extend(_fmt_run(r) for r in report["building"])
    elif not report["source_unreachable"]:
        lines.append("  (none building)")

    lines.append("⚠️ CAPPED")
    if report["capped"]:
        lines.extend(_fmt_run(r) for r in report["capped"])
    elif not report["source_unreachable"]:
        lines.append("  (none capped)")

    if report["stale"]:
        lines.append("🧊 STALE/FROZEN")
        lines.extend(_fmt_run(r) for r in report["stale"])

    lines.append("🔀 OPEN PRS")
    if report["gh_unavailable"]:
        lines.append("  (gh unavailable)")
    elif report["open_prs"]:
        if report["prs_truncated"]:
            lines.append("  ⚠️ PR list may be truncated (≥100 open PRs)")
        for pr in report["open_prs"]:
            lines.append(f"  #{pr.get('number')}  {pr.get('title')}  {pr.get('headRefName')}  "
                         f"{pr.get('mergeStateStatus')}  [{pr.get('joined_issue') or 'unjoined'}]")
    else:
        lines.append("  (no open PRs)")

    if report["anomalies"]:
        lines.append("❗ LOCAL ANOMALIES")
        for a in report["anomalies"]:
            lines.append(f"  {a.get('run_dir')}  {a.get('anomaly')}")

    lines.append("BACKLOG/DONE/STALE: run via /dashboard (needs in-session Linear read)")
    return "\n".join(lines)


def main(argv: list[str] | None = None) -> int:
    parser = argparse.ArgumentParser(description="Recoil at-a-glance read-only build/PR dashboard.")
    parser.add_argument("--status-root", default=None)
    parser.add_argument("--json", action="store_true", dest="as_json")
    args = parser.parse_args(argv)

    status_root = _resolve_status_root(args.status_root)
    report = build_report(status_root, _run_gh)

    if args.as_json:
        print(json.dumps(report, indent=2, sort_keys=True))
    else:
        print(render_human(report))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
