"""Lifecycle for per-project ttyd processes wrapping the Claude CLI."""

from __future__ import annotations

import atexit
import json
import logging
import os
import re
import signal
import socket
import subprocess
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel

from recoil.api.adapters._ids import validate_project_id
from recoil.api.chat_sessions import ChatSessionsStore, _now_iso
from recoil.api.eventbus import BUS
from recoil.core.paths import projects_root

logger = logging.getLogger(__name__)

router = APIRouter()


@router.get("/ttyd/health")
async def ttyd_health():
    """Phase 17: probe whether ttyd is installed.

    Returns 200 with {"status": "ok"} when the binary is on PATH.
    Returns 503 with {"detail": "ttyd not installed", "install_hint": "..."}
    so the frontend can render an actionable install card instead of a
    perpetual spinner (Law 4 prong-3).
    """
    import shutil

    from fastapi.responses import JSONResponse

    if not shutil.which("ttyd"):
        return JSONResponse(
            status_code=503,
            content={
                "detail": "ttyd not installed",
                "install_hint": "brew install ttyd",
            },
        )
    return {"status": "ok"}


TTYD_INDEX_PATH = str((Path(__file__).parent / "ttyd_index.html").resolve())

_PORT_MIN = 7681
_PORT_MAX = 7700
_SESSION_CAPTURE_TIMEOUT_S = 15.0
_SESSION_CAPTURE_POLL_S = 0.5
_STOP_GRACE_S = 3.0
_STOP_FAST_GRACE_S = 0.5
_CONTEXT_LIMIT_DEFAULT = 200_000

_CLAUDE_PROJECTS_ROOT = Path.home() / ".claude" / "projects"

_SHIM_PATH = str(
    (Path(__file__).parent / "console_mcp_shim.py").resolve()
)
_MCP_CONFIG = json.dumps(
    {
        "mcpServers": {
            "recoil-console": {
                "command": "python3",
                "args": [_SHIM_PATH],
            }
        }
    }
)


@dataclass
class _ProcRecord:
    proc: subprocess.Popen
    port: int
    session_id: str
    started_at: str
    project_id: str


_PROCS: dict[str, _ProcRecord] = {}
_PROCS_LOCK = threading.Lock()
_PORTS_RESERVED: set[int] = set()
_STORE = ChatSessionsStore()

# Cache for /context-window — keyed by JSONL path, value (mtime, used_tokens).
# Avoids re-reading the file on every 5s ContextBar poll.
_CONTEXT_CACHE: dict[Path, tuple[float, Optional[int]]] = {}
_CONTEXT_CACHE_LOCK = threading.Lock()


def _project_dir(project_id: str) -> Path:
    try:
        validate_project_id(project_id)
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc)) from exc
    project_dir = projects_root() / project_id
    if not project_dir.is_dir():
        raise HTTPException(status_code=404, detail=f"project not found: {project_id}")
    return project_dir


def _flatten_cwd(project_dir: Path) -> Path:
    # Claude Code rewrites `/`, `_`, `.` to `-` for its on-disk session dir name.
    abs_path = str(project_dir.resolve())
    flat = re.sub(r"[/_.]", "-", abs_path)
    return _CLAUDE_PROJECTS_ROOT / flat


def _allocate_port_locked() -> int:
    for port in range(_PORT_MIN, _PORT_MAX + 1):
        if port in _PORTS_RESERVED:
            continue
        probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        probe.settimeout(0.1)
        try:
            rc = probe.connect_ex(("127.0.0.1", port))
        except OSError:
            rc = 1
        finally:
            probe.close()
        if rc == 0:
            logger.warning(
                "ttyd port %d is busy (external listener); skipping", port
            )
            continue
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.bind(("127.0.0.1", port))
        except OSError:
            sock.close()
            continue
        sock.close()
        _PORTS_RESERVED.add(port)
        return port
    logger.error(
        "no ttyd ports available in range %d-%d (all busy/reserved)",
        _PORT_MIN,
        _PORT_MAX,
    )
    raise HTTPException(status_code=503, detail="no ports available")


def _latest_jsonl_mtime(directory: Path) -> tuple[Optional[Path], float]:
    if not directory.is_dir():
        return None, 0.0
    latest: Optional[Path] = None
    latest_mtime = 0.0
    for entry in directory.glob("*.jsonl"):
        try:
            m = entry.stat().st_mtime
        except OSError:
            continue
        if m > latest_mtime:
            latest = entry
            latest_mtime = m
    return latest, latest_mtime


def _bump_last_used_safely(project_id: str) -> None:
    try:
        _STORE.bump_last_used(project_id)
    except Exception as exc:  # noqa: BLE001
        logger.warning("bump_last_used failed: %s", exc)


def _capture_session_id(
    project_id: str, project_dir: Path, baseline_mtime: float
) -> str:
    flat_dir = _flatten_cwd(project_dir)
    deadline = time.monotonic() + _SESSION_CAPTURE_TIMEOUT_S
    while time.monotonic() < deadline:
        latest, mtime = _latest_jsonl_mtime(flat_dir)
        if latest is not None and mtime > baseline_mtime:
            session_id = latest.stem
            try:
                _STORE.record_session(project_id, session_id)
            except Exception as exc:  # noqa: BLE001
                logger.warning("record_session failed: %s", exc)
            return session_id
        time.sleep(_SESSION_CAPTURE_POLL_S)
    BUS.emit_sync(
        "warning",
        "chat/sessions",
        "session id capture timeout",
        payload={
            "project_id": project_id,
            "flat_dir": str(flat_dir),
            "timeout_s": _SESSION_CAPTURE_TIMEOUT_S,
        },
    )
    return "unknown"


def _build_ttyd_args(
    port: int, project_dir: Path, resume_session_id: Optional[str]
) -> list[str]:
    # 2026-05-11: dropped `--index TTYD_INDEX_PATH`. ttyd 1.7.7 with --index
    # serves ONLY the custom HTML — none of its bundled xterm.js renderer is
    # exposed at /css/index.css, /inline.bundle.js, etc. So the iframe loaded
    # a page with a title + a keystroke bubbler and NO terminal. ttyd_index.html
    # is preserved for reference; if we need keystroke escape later, we'll
    # need a different injection path (ttyd doesn't support both custom HTML
    # AND its own assets in one process).
    args = [
        "ttyd",
        "--writable",
        "-p",
        str(port),
        "claude",
        "--add-dir",
        str(project_dir),
        "--mcp-config",
        _MCP_CONFIG,
    ]
    if resume_session_id:
        args.extend(["--resume", resume_session_id])
    return args


def _kill_proc_record(rec: _ProcRecord, *, grace_s: float = _STOP_FAST_GRACE_S) -> None:
    """SIGTERM → grace_s wait → SIGKILL escalation.

    Picker-change swaps want fast turnover (500ms). Shutdown atexit can
    afford the existing 3s grace — callers pass `grace_s=_STOP_GRACE_S`.
    """
    pid = rec.proc.pid
    try:
        os.killpg(pid, signal.SIGTERM)
    except ProcessLookupError:
        return
    except OSError as exc:
        logger.warning("killpg SIGTERM failed for pid=%s: %s", pid, exc)
    try:
        rec.proc.wait(timeout=grace_s)
        return
    except subprocess.TimeoutExpired:
        pass
    try:
        os.killpg(pid, signal.SIGKILL)
    except ProcessLookupError:
        return
    except OSError as exc:
        logger.warning("killpg SIGKILL failed for pid=%s: %s", pid, exc)
    try:
        rec.proc.wait(timeout=1.0)
    except subprocess.TimeoutExpired:
        logger.warning("ttyd pid=%s did not exit after SIGKILL", pid)


def _emit_project_switch(new_project: str, killed: list[str]) -> None:
    BUS.emit_sync(
        "info",
        "chat/ttyd",
        "killed stale ttyds on project switch",
        payload={"new_project": new_project, "killed": killed},
    )


def _evict_other_projects_locked(active_project_id: str) -> list[_ProcRecord]:
    """Evict ttyd records for projects ≠ active_project_id from global state.

    Must be called with _PROCS_LOCK held. Does NOT send signals — caller kills
    the returned records outside the lock via _kill_proc_record so the lock is
    not held during blocking wait() calls.
    """
    evicted: list[_ProcRecord] = []
    for proj_id in list(_PROCS.keys()):
        if proj_id == active_project_id:
            continue
        rec = _PROCS.pop(proj_id)
        _PORTS_RESERVED.discard(rec.port)
        evicted.append(rec)
    return evicted


def _kill_all_ttyds() -> None:
    with _PROCS_LOCK:
        records = list(_PROCS.values())
        _PROCS.clear()
        _PORTS_RESERVED.clear()
    for rec in records:
        try:
            _kill_proc_record(rec, grace_s=1.0)
        except Exception as exc:  # noqa: BLE001
            logger.warning("cleanup failed for project=%s: %s", rec.project_id, exc)


atexit.register(_kill_all_ttyds)


_PREV_HANDLERS: dict[int, object] = {
    signal.SIGINT: signal.getsignal(signal.SIGINT),
    signal.SIGTERM: signal.getsignal(signal.SIGTERM),
}


def _signal_handler(signum, frame):  # type: ignore[no-untyped-def]
    _kill_all_ttyds()
    prev = _PREV_HANDLERS.get(signum)
    if callable(prev) and prev not in (signal.SIG_DFL, signal.SIG_IGN):
        try:
            prev(signum, frame)
        except SystemExit:
            raise
        except Exception as exc:  # noqa: BLE001
            logger.warning("chained signal handler failed: %s", exc)


try:
    signal.signal(signal.SIGINT, _signal_handler)
    signal.signal(signal.SIGTERM, _signal_handler)
except (ValueError, OSError):
    # Not on the main thread (e.g. some test runners) — atexit still runs.
    pass


class _ProjectRequest(BaseModel):
    project_id: str


@router.post("/ttyd/start")
def ttyd_start(body: _ProjectRequest) -> dict[str, object]:
    project_id = body.project_id
    project_dir = _project_dir(project_id)

    # Inside the lock: idempotency check + port reservation only. Spawn
    # happens outside so concurrent /start calls for different projects
    # don't serialize on subprocess.Popen.
    stale: list[_ProcRecord] = []
    reuse_resp: dict | None = None
    with _PROCS_LOCK:
        # Single-active-project policy: evict records for any other project.
        # Actual process termination (killpg) happens OUTSIDE the lock below
        # so blocking wait() calls don't stall other _PROCS_LOCK acquirers.
        stale = _evict_other_projects_locked(project_id)

        existing = _PROCS.get(project_id)
        if existing and existing.proc.poll() is None:
            _bump_last_used_safely(project_id)
            reuse_resp = {"port": existing.port, "session_id": existing.session_id}
        elif existing:
            _PORTS_RESERVED.discard(existing.port)
            _PROCS.pop(project_id, None)
        if reuse_resp is None:
            port = _allocate_port_locked()

    # Kill stale processes outside the lock using killpg (process group) so the
    # Claude CLI child is also terminated, not just the ttyd parent (audit H4).
    killed_others = [rec.project_id for rec in stale]
    for rec in stale:
        _kill_proc_record(rec, grace_s=_STOP_FAST_GRACE_S)

    # Emit also outside the lock to avoid holding _PROCS_LOCK during BUS I/O.
    if reuse_resp is not None:
        if killed_others:
            _emit_project_switch(project_id, killed_others)
        return reuse_resp

    resume_session_id = _STORE.get_session(project_id)
    flat_dir = _flatten_cwd(project_dir)
    _, baseline_mtime = _latest_jsonl_mtime(flat_dir)
    args = _build_ttyd_args(port, project_dir, resume_session_id)
    try:
        proc = subprocess.Popen(  # noqa: S603
            args,
            start_new_session=True,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
    except FileNotFoundError as exc:
        with _PROCS_LOCK:
            _PORTS_RESERVED.discard(port)
        raise HTTPException(status_code=500, detail="ttyd not installed") from exc
    rec = _ProcRecord(
        proc=proc,
        port=port,
        session_id=resume_session_id or "",
        started_at=_now_iso(),
        project_id=project_id,
    )
    with _PROCS_LOCK:
        _PROCS[project_id] = rec

    captured = _capture_session_id(project_id, project_dir, baseline_mtime)
    rec.session_id = captured
    if resume_session_id:
        _bump_last_used_safely(project_id)
    if killed_others:
        _emit_project_switch(project_id, killed_others)
    return {"port": rec.port, "session_id": rec.session_id}


@router.post("/ttyd/stop")
def ttyd_stop(body: _ProjectRequest) -> dict[str, bool]:
    project_id = body.project_id
    with _PROCS_LOCK:
        rec = _PROCS.pop(project_id, None)
        if rec is not None:
            _PORTS_RESERVED.discard(rec.port)
    if rec is None:
        return {"ok": True}
    _kill_proc_record(rec)
    return {"ok": True}


@router.get("/ttyd/status")
def ttyd_status(project_id: str = Query(...)) -> dict[str, object]:
    with _PROCS_LOCK:
        rec = _PROCS.get(project_id)
        running = rec is not None and rec.proc.poll() is None
        if rec is not None and not running:
            _PORTS_RESERVED.discard(rec.port)
            _PROCS.pop(project_id, None)
    if not running or rec is None:
        return {
            "running": False,
            "port": None,
            "session_id": None,
            "started_at": None,
        }
    return {
        "running": True,
        "port": rec.port,
        "session_id": rec.session_id or None,
        "started_at": rec.started_at,
    }


def _read_last_assistant_input_tokens(jsonl_path: Path) -> Optional[int]:
    """Backward-scan a session JSONL for the most recent assistant input_tokens."""
    try:
        with jsonl_path.open("rb") as f:
            f.seek(0, os.SEEK_END)
            size = f.tell()
            chunk_size = 64 * 1024
            buffer = b""
            position = size
            while position > 0:
                read_size = min(chunk_size, position)
                position -= read_size
                f.seek(position)
                buffer = f.read(read_size) + buffer
                lines = buffer.split(b"\n")
                if position > 0:
                    buffer = lines[0]
                    candidates = lines[1:]
                else:
                    buffer = b""
                    candidates = lines
                for line in reversed(candidates):
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        doc = json.loads(line)
                    except (json.JSONDecodeError, UnicodeDecodeError):
                        continue
                    if doc.get("type") != "assistant":
                        continue
                    msg = doc.get("message")
                    if not isinstance(msg, dict):
                        continue
                    usage = msg.get("usage")
                    if not isinstance(usage, dict):
                        continue
                    tokens = usage.get("input_tokens")
                    if isinstance(tokens, int):
                        return tokens
    except OSError:
        return None
    return None


def _cached_input_tokens(jsonl_path: Path, mtime: float) -> Optional[int]:
    with _CONTEXT_CACHE_LOCK:
        entry = _CONTEXT_CACHE.get(jsonl_path)
        if entry is not None and entry[0] == mtime:
            return entry[1]
    used = _read_last_assistant_input_tokens(jsonl_path)
    with _CONTEXT_CACHE_LOCK:
        _CONTEXT_CACHE[jsonl_path] = (mtime, used)
    return used


@router.get("/ttyd/context-window")
def ttyd_context_window(project_id: str = Query(...)) -> dict[str, object]:
    fallback = {"used": None, "limit": _CONTEXT_LIMIT_DEFAULT, "pct": None}
    try:
        project_dir = _project_dir(project_id)
    except HTTPException:
        return fallback
    flat_dir = _flatten_cwd(project_dir)
    latest, mtime = _latest_jsonl_mtime(flat_dir)
    if latest is None:
        return fallback
    used = _cached_input_tokens(latest, mtime)
    if used is None:
        return fallback
    return {
        "used": used,
        "limit": _CONTEXT_LIMIT_DEFAULT,
        "pct": used / _CONTEXT_LIMIT_DEFAULT,
    }


__all__ = ["router"]
