"""Phase 19 — process-local in-memory EventBus.

Architecture:
  • Single ``BUS`` instance per FastAPI process.
  • Ring buffer of the last 500 events for SSE Last-Event-ID resume.
  • async pub/sub via ``asyncio.Queue`` (one queue per subscriber).
  • Sync→async bridge: ``emit_sync()`` schedules ``emit()`` onto the bound
    asyncio loop via ``asyncio.run_coroutine_threadsafe``. This is the
    entry point for non-async code paths (the ``dispatch.py`` hook runs
    on the engine's worker thread, not the FastAPI event loop).

Wire-up:
  • ``main.py`` calls ``BUS.bind_loop(asyncio.get_running_loop())`` from
    its FastAPI ``startup`` event — so the loop binding is established
    before any HTTP route fires.
  • Tests bind a loop manually (see ``test_eventbus.py``).

Constraints (per spec):
  • One bus per process. We do NOT support multi-bus fanout.
  • No per-subscriber filtering (clients filter on their side).
  • Ring buffer is 500 events; older entries drop on the floor.
  • ``emit_sync`` must NEVER raise — engine code paths cannot be blocked
    by the event bus.
"""
from __future__ import annotations

import asyncio
import logging
import sys
import threading
from collections import deque
from datetime import datetime, timezone
from typing import Any, AsyncIterator, Deque, Optional

from recoil.api.schemas.engine import (
    SCHEMA_VERSION,
    EngineEvent,
    EventSeverity,
)

logger = logging.getLogger(__name__)


_RING_CAPACITY = 500
_SUBSCRIBER_QUEUE_MAXSIZE = 200


class EventBus:
    """Process-local pub/sub with bounded ring-buffer history.

    The bus is single-process. Multi-process deployments (gunicorn workers)
    would emit on one process and miss on the others — Phase 19 does NOT
    address that. The console runs as a single uvicorn worker by design.
    """

    def __init__(self, ring_capacity: int = _RING_CAPACITY) -> None:
        self._ring: Deque[EngineEvent] = deque(maxlen=ring_capacity)
        self._subscribers: list[asyncio.Queue[EngineEvent]] = []
        self._loop: Optional[asyncio.AbstractEventLoop] = None
        self._next_id: int = 0
        # Guards _next_id increment + ring-append + history-snapshot /
        # listener-registration / pending-fanout atomicity. Used from sync
        # paths (emit_sync, _build_event_locked) AND from async paths
        # (subscribe, emit, _broadcast_locked). asyncio loop holds the GIL
        # between awaits, so a threading.Lock is sufficient — we only need
        # to serialize the narrow critical sections (no I/O, no awaits
        # inside). Debug R9: id-assign and ring-append are now ALWAYS in
        # the same lock acquisition (see ``_build_event_locked``).
        self._lock = threading.Lock()
        # Debug R4: ids of events whose cross-thread fan-out coroutine has
        # been scheduled but has not yet run. ``subscribe()`` excludes these
        # from its history snapshot — they will arrive via the live queue
        # when the broadcast coro fires. This closes the window where
        # ``subscribe()`` could interpose between cross-thread ring-append
        # (which has happened) and broadcast (which hasn't), and end up
        # with the event delivered twice (once from history, once from
        # the live queue). See ``_broadcast_locked`` and ``subscribe``.
        self._pending_fanout: set[str] = set()

    # ── loop binding ────────────────────────────────────────────────────
    def bind_loop(self, loop: asyncio.AbstractEventLoop) -> None:
        """Bind the asyncio loop used by ``emit_sync`` to schedule emits.

        Idempotent — re-binding to the same loop is a no-op. Re-binding to
        a DIFFERENT loop replaces the previous binding (test pattern: each
        test installs its own loop).
        """
        self._loop = loop

    # ── publishing ──────────────────────────────────────────────────────
    async def emit(self, event: EngineEvent) -> None:
        """Append to ring + fan out to live subscribers.

        Subscriber queues are bounded (maxsize=200) — if a subscriber is
        slow we drop events on the floor and log a warning. The 500-event
        ring is the truncation point that matters for resume; live
        subscribers either keep up or fall behind.

        Lock spans ring append + subscriber snapshot so a concurrent
        ``subscribe()`` cannot register a listener between ring append and
        fan-out (which would yield the same event twice).
        """
        with self._lock:
            self._ring.append(event)
            subscribers_snapshot = list(self._subscribers)
        for q in subscribers_snapshot:
            try:
                q.put_nowait(event)
            except asyncio.QueueFull:
                logger.warning("subscriber queue full; dropping event %s", event.id)

    async def _broadcast_locked(self, event: EngineEvent) -> None:
        """Fan out an already-appended event to live subscribers.

        Used by the cross-thread ``emit_sync`` path (Debug R3): the worker
        thread appends to the ring under the lock to preserve id/append
        order, then schedules THIS coroutine on the loop to deliver the
        event to listener queues. Re-appending here would double-write the
        ring — so this is a fan-out-only variant of ``emit``.

        Debug R4: discards the event id from ``_pending_fanout`` under the
        same lock that snapshots subscribers — so ``subscribe()``'s
        history-snapshot/listener-register critical section sees a
        consistent view (event is either still pending and excluded from
        history but included in live fan-out, or already broadcast and
        included in history but excluded from live fan-out). This closes
        the double-delivery window from R3.
        """
        with self._lock:
            subscribers_snapshot = list(self._subscribers)
            self._pending_fanout.discard(event.id)
        for q in subscribers_snapshot:
            try:
                q.put_nowait(event)
            except asyncio.QueueFull:
                logger.warning("subscriber queue full; dropping event %s", event.id)

    def emit_sync(
        self,
        severity: EventSeverity,
        scope: str,
        summary: str,
        *,
        detail: Optional[str] = None,
        payload: Optional[dict[str, Any]] = None,
    ) -> None:
        """Sync entry point — schedules ``emit()`` onto the bound loop.

        Two delivery paths:
          • Caller IS on the bound loop (FastAPI handler thread): append to
            the ring synchronously and schedule fan-out to live subscribers
            via ``loop.create_task(...)``. Synchronous append guarantees
            ``BUS.history()`` reflects the event before the function returns.
          • Caller is on a DIFFERENT thread (engine worker via dispatch.py
            hook): use ``run_coroutine_threadsafe`` to cross the thread
            boundary onto the bound loop.

        Must NEVER raise. If the loop isn't bound yet (e.g. dispatch fires
        before FastAPI startup) we log a one-line warning and drop the
        event. The engine's hook also gates on RECOIL_EVENTBUS_ENABLED,
        so this path is reached only when the operator opted in.

        Bug-fix (Debug R9): id-assignment, ring-append, and (cross-thread)
        ``_pending_fanout``-add MUST happen inside a SINGLE lock acquisition.
        Previously ``_build_event`` acquired/released the lock to bump
        ``_next_id``, then ``emit_sync`` re-acquired the lock for the
        ring-append. Between those two critical sections a second thread
        could grab the lock, bump-and-append id N+1, releasing the lock —
        then thread #1 finally appended id N. Result: ring out of order
        (``[ev_N+1, ev_N]``), violating ``_history_after_locked``'s
        invariant. Fix: ``_build_event_locked`` is called inside the same
        ``with self._lock:`` block that does the ring-append.
        """
        try:
            loop = self._loop
            if loop is None:
                # Build the event under the lock (id-assign + … nothing else)
                # purely so logging gets a stable id. No ring entry is made.
                with self._lock:
                    event = self._build_event_locked(
                        severity, scope, summary, detail, payload,
                    )
                logger.debug("emit_sync called before loop bound; dropping %s", event.id)
                return

            # Detect "are we on the bound loop right now?"
            try:
                running = asyncio.get_running_loop()
            except RuntimeError:
                running = None

            if running is loop:
                # Same-loop path: id-assign + ring-append + subscriber-snapshot
                # all under ONE lock acquisition. This closes the R9 window
                # (id-bump and ring-append must be a single critical section)
                # AND the original R1 subscribe-races-emit window (snapshot of
                # listeners is taken under the same lock that appends).
                with self._lock:
                    event = self._build_event_locked(
                        severity, scope, summary, detail, payload,
                    )
                    self._ring.append(event)
                    subscribers_snapshot = list(self._subscribers)
                for q in subscribers_snapshot:
                    try:
                        q.put_nowait(event)
                    except asyncio.QueueFull:
                        logger.warning("subscriber queue full; dropping %s", event.id)
            else:
                if loop.is_closed():
                    # Stale binding — drop. Common in test suites that exit
                    # the lifespan context (closes the loop) without rebinding.
                    # Build under lock first so we have an id for logging,
                    # but DO NOT append to ring (this matches pre-R9 behavior
                    # for the closed-loop case).
                    with self._lock:
                        event = self._build_event_locked(
                            severity, scope, summary, detail, payload,
                        )
                    logger.debug("emit_sync called with closed loop; dropping %s", event.id)
                    return
                # Cross-thread path.
                #
                # Bug-fix (Debug R3): two worker threads can land their
                # ``self.emit(event)`` coroutines on the loop in either order
                # — meaning ring-append order may not match ID order, which
                # breaks ``_history_after_locked``'s assumption that higher
                # ring index = later id. Fix: append to the ring under the
                # SAME lock that ``_build_event`` used for the id assignment.
                # ``deque.append`` is atomic and the lock guarantees that the
                # critical section "(id-assign) → (ring-append)" is serialized
                # across threads. The async fan-out to listeners is scheduled
                # AFTER the append so listener delivery can race freely (queue
                # consumers don't depend on cross-listener ordering anyway).
                #
                # Bug-fix (Debug R4): R3's fix preserved id/append order
                # but split the fan-out OUTSIDE the lock — leaving a window
                # where ``subscribe()`` could interpose between the lock
                # release (after append) and ``_broadcast_locked`` running:
                # subscribe sees the event in the ring (history replay) AND
                # gets it again via the live queue once the broadcast coro
                # runs. Fix: under the same lock that holds the ring-append,
                # mark the event id in ``_pending_fanout``. ``subscribe()``
                # excludes pending ids from its history snapshot (under the
                # same lock); ``_broadcast_locked`` discards from the set
                # under the same lock that snapshots subscribers. So an
                # event is either in history-but-not-pending (delivered via
                # history once) or pending-but-not-in-history (delivered
                # via live queue once) — never both, never neither.
                #
                # Bug-fix (Debug R9): id-assign + ring-append +
                # _pending_fanout-add are now ALL under ONE lock acquisition.
                # The previous code released the lock after id-assign (inside
                # ``_build_event``) and re-acquired it for the ring-append +
                # pending-fanout-add — leaving a gap where another thread
                # could bump id N+1 and append BEFORE this thread appended id
                # N. Single critical section eliminates that race.
                with self._lock:
                    event = self._build_event_locked(
                        severity, scope, summary, detail, payload,
                    )
                    self._ring.append(event)
                    self._pending_fanout.add(event.id)
                # Bug-fix (Debug R5): if scheduling the broadcast coroutine
                # fails (e.g. loop closes between the is_closed() check and
                # this call — a real race in lifespan teardown), the event
                # id stays in ``_pending_fanout`` permanently, leaking memory
                # AND poisoning every future ``subscribe()`` (the id would be
                # filtered out of history forever, even though no broadcast
                # will ever fire to deliver it via the live queue). Cleanup:
                # discard under the lock so subsequent subscribers can replay
                # the event from history. Outer except still logs.
                #
                # Bug-fix (Finding F5): the broadcast coroutine is created as
                # a named local FIRST so that, if scheduling raises before the
                # loop adopts it, we can ``coro.close()`` it explicitly. The
                # previous form built the coroutine inline as the call
                # argument — when ``run_coroutine_threadsafe`` raised, that
                # coroutine object was orphaned (never scheduled, never
                # awaited, never closed), leaking a ``RuntimeWarning:
                # coroutine 'EventBus._broadcast_locked' was never awaited``
                # and silently dropping the fan-out. ``create_task`` is NOT
                # usable here: ``emit_sync`` runs on the engine worker thread,
                # NOT the bound loop, so the coroutine must be marshalled
                # across the thread boundary via ``run_coroutine_threadsafe``.
                broadcast_coro = self._broadcast_locked(event)
                try:
                    asyncio.run_coroutine_threadsafe(broadcast_coro, loop)
                except Exception:
                    # Close the un-scheduled coroutine so it doesn't leak a
                    # "never awaited" RuntimeWarning, then unwind R5 state.
                    broadcast_coro.close()
                    with self._lock:
                        self._pending_fanout.discard(event.id)
                    raise
        except Exception as exc:  # noqa: BLE001
            # Last-resort guard. Engine path must not be blocked by bus errors.
            sys.stderr.write(f"[eventbus] emit_sync failed: {exc}\n")

    def _build_event_locked(
        self,
        severity: EventSeverity,
        scope: str,
        summary: str,
        detail: Optional[str],
        payload: Optional[dict[str, Any]],
    ) -> EngineEvent:
        """Build an event under a caller-owned lock.

        CALLER MUST HOLD ``self._lock``. Bumps ``_next_id`` and returns a
        fresh ``EngineEvent``. Used by ``emit_sync`` so that id-assignment,
        ring-append, and (cross-thread) ``_pending_fanout``-add all happen
        in a SINGLE atomic critical section — see Debug R9 commentary in
        ``emit_sync``.
        """
        self._next_id += 1
        current_id = self._next_id
        ev_id = f"ev_{current_id:08d}"
        return EngineEvent(
            schema_version=SCHEMA_VERSION,
            id=ev_id,
            ts=datetime.now(tz=timezone.utc),
            severity=severity,
            scope=scope,
            summary=summary,
            detail=detail,
            payload=payload,
        )

    def _build_event(
        self,
        severity: EventSeverity,
        scope: str,
        summary: str,
        detail: Optional[str],
        payload: Optional[dict[str, Any]],
    ) -> EngineEvent:
        """Public-test-compatible wrapper.

        Acquires the lock and delegates to ``_build_event_locked``. Used by
        tests that build events outside the ``emit_sync`` flow (e.g. to seed
        the ring via ``await bus.emit(bus._build_event(...))``). Production
        ``emit_sync`` does NOT use this entry point — it calls
        ``_build_event_locked`` directly under its own lock so id-assignment
        is atomic with ring-append.
        """
        with self._lock:
            return self._build_event_locked(severity, scope, summary, detail, payload)

    # ── subscription ────────────────────────────────────────────────────
    def history(self, after_id: Optional[str] = None) -> list[EngineEvent]:
        """Return ring contents, optionally truncated AFTER a given id.

        If ``after_id`` matches an event in the ring, returns events
        strictly newer. If it doesn't match (e.g. the resume id was
        already evicted) returns the full ring — clients can't tell
        the difference but get the closest-possible resume.
        """
        with self._lock:
            if after_id is None:
                return list(self._ring)
            return self._history_after_locked(after_id)

    def _history_after_locked(self, after_id: str) -> list[EngineEvent]:
        """Return events strictly after ``after_id``. CALLER MUST HOLD ``self._lock``."""
        events = list(self._ring)
        for i, ev in enumerate(events):
            if ev.id == after_id:
                return events[i + 1 :]
        return events

    async def subscribe(
        self,
        last_event_id: Optional[str] = None,
    ) -> AsyncIterator[EngineEvent]:
        """Async iterator yielding ring history then live events forever.

        Caller cancels the iteration by closing the SSE response. The
        generator's ``finally`` removes the subscriber queue.

        Bug-fix (Debug R1): queue is bounded (maxsize=200) so a stalled
        subscriber cannot OOM the process. ``emit`` drops events on
        QueueFull rather than blocking.

        Bug-fix (Debug R1): history snapshot + listener registration are
        held under the same lock that ``emit`` / same-loop ``emit_sync``
        use when appending to the ring. This closes the window where an
        event fired between snapshotting history and registering the
        listener could either be missed OR delivered twice.

        Bug-fix (Debug R4): events whose cross-thread fan-out coroutine
        has been scheduled but not yet run (``_pending_fanout``) are
        excluded from the history snapshot — they will arrive via the
        live queue once ``_broadcast_locked`` fires. Without this filter,
        a subscriber registering between cross-thread ring-append and
        broadcast would receive the event twice (once from history,
        once from the queue).
        """
        q: asyncio.Queue[EngineEvent] = asyncio.Queue(
            maxsize=_SUBSCRIBER_QUEUE_MAXSIZE,
        )
        # Atomic: snapshot history AND register the listener under the same
        # lock. Any emit serializes against this critical section, so the
        # snapshot reflects exactly the events the listener won't receive
        # and the listener receives every event after that point.
        with self._lock:
            raw_snapshot = (
                list(self._ring)
                if last_event_id is None
                else self._history_after_locked(last_event_id)
            )
            # R4: drop events still in flight via cross-thread fan-out.
            if self._pending_fanout:
                history_snapshot = [
                    ev for ev in raw_snapshot
                    if ev.id not in self._pending_fanout
                ]
            else:
                history_snapshot = raw_snapshot
            self._subscribers.append(q)
        try:
            for ev in history_snapshot:
                yield ev
            while True:
                ev = await q.get()
                yield ev
        finally:
            with self._lock:
                try:
                    self._subscribers.remove(q)
                except ValueError:  # pragma: no cover
                    pass

    # ── test hooks ──────────────────────────────────────────────────────
    def _reset_for_tests(self) -> None:
        """Test-only — clear ring, subscribers, and counter. Loop stays bound."""
        self._ring.clear()
        self._subscribers.clear()
        self._next_id = 0
        self._pending_fanout.clear()


# Process-singleton bus.
BUS = EventBus()


__all__ = ["EventBus", "BUS"]
