"""Events adapter — reads recoil/_dispatch_logs/receipts.jsonl (CP-5 audit log)
and projects.list_legacy_project_warnings() and emits EngineEvent rows.

Phase 16 contract:
  list_events(severities=None, scope_prefix=None, since_id=None, limit=200) → list[EngineEvent]

This adapter is READ-ONLY. The receipts log is append-only by the engine; we
do tail-style reads.
"""
from __future__ import annotations

import json
import logging
import os
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable, Optional

from recoil.api.adapters.projects import list_legacy_project_warnings
from recoil.api.fallback_bridge import emit_fallback
from recoil.api.schemas.engine import (
    SCHEMA_VERSION,
    EngineEvent,
    EventSeverity,
)
from recoil.core.paths import RECOIL_ROOT

logger = logging.getLogger(__name__)

# Per-process memo of byte-offsets that have already fired the
# receipts_log_corrupt_line_skip fallback. Without this, every /api/events
# request re-reads the whole receipts.jsonl from offset 0 and re-fires the
# fallback for every corrupt line on every request, saturating the 500-event
# BUS ring with duplicates and crowding out other engine telemetry. Bounded
# growth: corrupt lines are rare and the file is append-only, so this set
# only grows when a NEW corrupt line lands. Reset on uvicorn restart by
# design (matches the FALLBACK_COUNTERS lifecycle).
_seen_corrupt_offsets: set[int] = set()


def reset_seen_corrupt_offsets_for_tests() -> None:
    """Test-only hook. Production never calls this."""
    _seen_corrupt_offsets.clear()


def _receipts_path() -> Path:
    override = os.environ.get("RECOIL_RECEIPTS_LOG")
    if override:
        return Path(override).expanduser()
    return RECOIL_ROOT / "_dispatch_logs" / "receipts.jsonl"


def _severity_from_receipt(rcpt: dict[str, Any]) -> EventSeverity:
    rr = rcpt.get("run_result") or {}
    if rr.get("error"):
        return "failure"
    if rr.get("success") is False:
        return "failure"
    final_state = (rr.get("metadata") or {}).get("final_state")
    if final_state == "failed":
        return "failure"
    if final_state == "fallback":
        return "fallback"
    if rr.get("success") or final_state in ("succeeded", "complete", "ok"):
        return "success"
    return "info"


def _scope_from_receipt(rcpt: dict[str, Any]) -> str:
    parts = []
    project = rcpt.get("project")
    if project:
        parts.append(str(project))
    ep = rcpt.get("episode")
    if ep is not None:
        parts.append(f"ep{int(ep):02d}" if isinstance(ep, int) else str(ep))
    shot = rcpt.get("shot_id")
    if shot:
        parts.append(str(shot))
    if not parts:
        return "engine"
    return " / ".join(parts)


def _summary_from_receipt(rcpt: dict[str, Any]) -> str:
    modality = rcpt.get("modality") or "?"
    rr = rcpt.get("run_result") or {}
    err = rr.get("error")
    if err:
        return f"{modality}: {str(err)[:120]}"
    final_state = (rr.get("metadata") or {}).get("final_state") or rr.get("status") or "ok"
    return f"{modality}: {final_state}"


def _ts_from_receipt(rcpt: dict[str, Any]) -> datetime:
    raw = rcpt.get("timestamp_utc") or rcpt.get("timestamp")
    if isinstance(raw, str):
        try:
            return datetime.fromisoformat(raw.replace("Z", "+00:00"))
        except ValueError:
            pass
    if isinstance(raw, (int, float)):
        try:
            return datetime.fromtimestamp(float(raw), tz=timezone.utc)
        except (OSError, ValueError):
            pass
    # Fall back to provenance.finished_us if present.
    prov = rcpt.get("provenance") or {}
    finished_us = prov.get("finished_us")
    if isinstance(finished_us, (int, float)):
        try:
            return datetime.fromtimestamp(float(finished_us) / 1_000_000.0, tz=timezone.utc)
        except (OSError, ValueError):
            pass
    return datetime.now(tz=timezone.utc)


def _collapse_eval_score(eval_scores: dict[str, Any]) -> Optional[float]:
    """Collapse the multi-panel ``eval_scores`` dict to a single representative
    float for the activity-pane (Tenet 6) surface.

    Returns the mean of all non-None ``panel_score`` values — matching the
    reduction used by ``Take.compute_aggregate_score`` so both surfaces show
    the same number on multi-panel receipts.
    Returns None when there are no panels, no panels with a numeric score,
    or the input is malformed.
    """
    if not isinstance(eval_scores, dict) or not eval_scores:
        return None
    scores = [
        float(card["panel_score"])
        for card in eval_scores.values()
        if isinstance(card, dict)
        and isinstance(card.get("panel_score"), (int, float))
        and not isinstance(card.get("panel_score"), bool)
    ]
    return sum(scores) / len(scores) if scores else None


def _receipt_to_event(rcpt: dict[str, Any]) -> Optional[EngineEvent]:
    rid = rcpt.get("receipt_id") or rcpt.get("id")
    if not rid:
        return None
    rr = rcpt.get("run_result") or {}
    metadata = rr.get("metadata") or {}
    detail = rr.get("error") if rr.get("error") else None
    # failure_mode evolved: receipts have surfaced it both at the top level
    # (`rcpt["failure_mode"]`) and inside `run_result.metadata.failure_mode`.
    # Probe both so older receipts still light up the activity-pane.
    failure_mode = rcpt.get("failure_mode")
    if failure_mode is None:
        failure_mode = metadata.get("failure_mode")
    # eval_scores is top-level on CP-9 receipts but probe run_result too
    # for forward-compat with any future migration.
    eval_scores = rcpt.get("eval_scores")
    if not eval_scores:
        eval_scores = rr.get("eval_scores") or {}
    episode = rcpt.get("episode")
    if episode is not None:
        try:
            episode = int(episode)
        except (TypeError, ValueError):
            episode = None
    return EngineEvent(
        schema_version=SCHEMA_VERSION,
        id=str(rid),
        ts=_ts_from_receipt(rcpt),
        severity=_severity_from_receipt(rcpt),
        scope=_scope_from_receipt(rcpt),
        summary=_summary_from_receipt(rcpt),
        detail=str(detail) if detail else None,
        payload={
            "modality": rcpt.get("modality"),
            "caller_id": rcpt.get("caller_id"),
            "model": (rcpt.get("provenance") or {}).get("model"),
            # Tenet 6 surface fields. Single-quoted literals are intentional —
            # the bugfix-sprint-post-overhaul spec greps for these exact
            # forms; do not normalize quoting on save.
            'failure_mode': failure_mode,
            'final_state': metadata.get("final_state"),
            'cost_usd': metadata.get("cost_usd"),
            'eval_score': _collapse_eval_score(eval_scores),
            'shot_id': rcpt.get("shot_id"),
            'episode': episode,
            'project': rcpt.get("project"),
        },
    )


def _iter_receipts() -> Iterable[dict[str, Any]]:
    path = _receipts_path()
    if not path.exists():
        return
    try:
        with path.open("r", encoding="utf-8") as fh:
            while True:
                offset = fh.tell()
                raw = fh.readline()
                if not raw:
                    break
                line = raw.strip()
                if not line:
                    continue
                try:
                    yield json.loads(line)
                except json.JSONDecodeError as exc:
                    if offset not in _seen_corrupt_offsets:
                        _seen_corrupt_offsets.add(offset)
                        emit_fallback(
                            "receipts_log_corrupt_line_skip",
                            scope="api/events",
                            payload={
                                "line_preview": line[:120],
                                "error": str(exc),
                                "offset": offset,
                            },
                        )
                    continue
    except OSError as exc:
        logger.warning("receipts.jsonl unreadable: %s", exc)
        return


def _legacy_project_events() -> list[EngineEvent]:
    """Emit one warning event per legacy project (missing global_bible.json)."""
    out: list[EngineEvent] = []
    for warn in list_legacy_project_warnings():
        out.append(
            EngineEvent(
                schema_version=SCHEMA_VERSION,
                id=f"legacy-project-{warn.slug}",
                ts=datetime.now(tz=timezone.utc),
                severity="warning",
                scope=f"projects/{warn.slug}",
                summary=f"Legacy project format — rebuild required ({warn.reason})",
                detail=warn.reason,
                payload={"project_slug": warn.slug, "kind": "legacy_project_format"},
            )
        )
    return out


def list_events(
    severities: Optional[list[str]] = None,
    scope_prefix: Optional[str] = None,
    since_id: Optional[str] = None,
    limit: int = 200,
) -> list[EngineEvent]:
    """Read receipts + legacy warnings and emit EngineEvents.

    Phase 16: tail the last `limit` receipts (after applying filters).
    `since_id` cuts the stream at a known event id (returns events strictly
    AFTER that id in file order). Receipt ordering is JSONL-append, so newer
    rows are at the bottom — we read all then truncate to `limit`.

    Debug R1 fix — receipts are streamed through a bounded ``deque`` so
    memory usage is O(limit), not O(file_size). For a multi-million-line
    receipts log this is the difference between a few hundred KB and
    several GB resident.
    """
    severity_set = set(severities) if severities else None

    # Stream filtered receipts into a bounded deque — keeps memory O(cap)
    # instead of O(file_size). Cap is 2× ``limit`` to leave headroom for
    # the since_id cut + final filter pass.
    cap = max(limit * 2, limit + 1) if limit > 0 else None
    receipt_events: deque[EngineEvent] = deque(maxlen=cap)
    for rcpt in _iter_receipts():
        ev = _receipt_to_event(rcpt)
        if ev is None:
            continue
        # Pre-filter to maximize useful headroom in the deque.
        if severity_set and ev.severity not in severity_set:
            continue
        if scope_prefix and not ev.scope.startswith(scope_prefix):
            continue
        receipt_events.append(ev)

    # Legacy warnings always lead the list (ts is generated at read time).
    legacy_events = list(_legacy_project_events())
    if severity_set:
        legacy_events = [e for e in legacy_events if e.severity in severity_set]
    if scope_prefix:
        legacy_events = [e for e in legacy_events if e.scope.startswith(scope_prefix)]

    events: list[EngineEvent] = legacy_events + list(receipt_events)

    # since_id: drop everything up to and including a matching id.
    if since_id:
        idx = -1
        for i, ev in enumerate(events):
            if ev.id == since_id:
                idx = i
        if idx >= 0:
            events = events[idx + 1 :]

    # Tail — newest entries are last; return the last `limit` so newest-first
    # is preserved at the response boundary.
    if limit > 0 and len(events) > limit:
        events = events[-limit:]

    return events


__all__ = ["list_events"]
