"""EventBroker — thread-safe in-process pub/sub with ring-buffered history.

- Subscribers get a queue; broker publishes synchronously to all queues.
- History ring buffer keeps the last N events for reconnect replay via
  Last-Event-ID cursor. Default N=500.
- Slow consumers (full queue) are dropped silently — they can reconnect
  and replay from the ring buffer.
- Thread-safe via RLock.
"""

from __future__ import annotations

import queue
import threading
import uuid
from collections import deque
from typing import Optional

from .events import FsEvent


_SUBSCRIBER_QUEUE_MAXSIZE = 1000


class EventBroker:
    """Thread-safe in-process broker with ring-buffered history."""

    def __init__(self, history_size: int = 500):
        self._lock = threading.RLock()
        self._subscribers: dict[str, queue.Queue] = {}
        self._history: deque[FsEvent] = deque(maxlen=history_size)

    def subscribe(self, subscriber_id: Optional[str] = None) -> tuple[str, queue.Queue]:
        """Register a new subscriber. Returns (sid, queue).

        The subscriber reads FsEvent instances from the returned queue.
        Call unsubscribe(sid) when done to free the slot.

        The queue may also receive `None` as a poison pill when the broker
        drops this subscriber as a slow consumer (see publish()). Consumers
        MUST check for None and break out of their drain loop when received.
        """
        sid = subscriber_id or f"sub_{uuid.uuid4().hex[:8]}"
        # Queue carries Optional[FsEvent] — None is the poison pill
        q: queue.Queue[Optional[FsEvent]] = queue.Queue(maxsize=_SUBSCRIBER_QUEUE_MAXSIZE)
        with self._lock:
            self._subscribers[sid] = q
        return sid, q

    def unsubscribe(self, subscriber_id: str) -> None:
        """Remove a subscriber. Idempotent — unknown ids are silently ignored."""
        with self._lock:
            self._subscribers.pop(subscriber_id, None)

    def publish(self, event: FsEvent) -> None:
        """Publish an event to all subscribers and append to history.

        Slow consumers (queue full) are dropped — their queue is drained and
        poisoned with None. The consumer's drain loop (CallbackTransport,
        sse_stream) MUST check for None and break cleanly. This causes the
        HTTP connection to close and the browser's EventSource to auto-reconnect,
        which then resumes from the ring buffer via the Last-Event-ID cursor.

        Without the poison pill, dropped subscribers would block forever on
        queue.get() and leak the HTTP connection indefinitely.

        Lock discipline: the dict mutations (history append, subscriber pop)
        happen INSIDE the lock. Drain+poison of dead queues happens OUTSIDE
        the lock — draining can take O(1000) iterations per dropped consumer
        and holding the broker lock for that long would block every other
        publish. Collect-then-release-then-drain is the correct pattern.
        """
        dead_queues: list[tuple[str, queue.Queue]] = []
        with self._lock:
            self._history.append(event)
            for sid, q in self._subscribers.items():
                try:
                    q.put_nowait(event)
                except queue.Full:
                    dead_queues.append((sid, q))
            for sid, _ in dead_queues:
                self._subscribers.pop(sid, None)

        # Drain and poison dropped queues OUTSIDE the lock to minimize
        # contention. The subscriber has already been removed from
        # _subscribers above, so no other publish will target these
        # queues. Safe to touch them lock-free.
        for _, q in dead_queues:
            try:
                while True:
                    q.get_nowait()
            except queue.Empty:
                pass
            try:
                q.put_nowait(None)  # poison pill
            except queue.Full:
                # Shouldn't happen after drain, but don't raise — the
                # consumer will eventually time out via get(timeout=30)
                # and the queue will be GC'd.
                pass

    def subscribe_with_replay(
        self,
        last_event_id: Optional[str],
        subscriber_id: Optional[str] = None,
    ) -> tuple[str, "queue.Queue", Optional[list[FsEvent]]]:
        """Atomically subscribe AND replay history to close the race window.

        Without this, a naive sequence of `replay = replay_since(cursor); sid, q = subscribe()`
        has a gap: any event published between the two calls is lost — it's
        not in the returned replay list, and the subscription queue didn't
        exist when publish tried to deliver it.

        This method installs the subscriber queue and builds the replay list
        under a SINGLE lock acquisition, guaranteeing that any concurrent
        publish is either:
          - Visible in the returned replay list (if it happened before this
            call), OR
          - Delivered to the subscriber queue (if it happens after this call
            returns, because the queue is already installed).

        Return:
          (sid, queue, replayed)
          where replayed is:
            - list[FsEvent] — events after the cursor (possibly empty if client
              is up-to-date)
            - None — cursor not found in ring (true cursor gap)
            - full history list — if last_event_id is None (initial connect
              with explicit replay request — rare; callers generally skip
              history on initial connect)
        """
        import queue as _queue
        sid = subscriber_id or f"sub_{uuid.uuid4().hex[:8]}"
        q: _queue.Queue[Optional[FsEvent]] = _queue.Queue(maxsize=_SUBSCRIBER_QUEUE_MAXSIZE)

        with self._lock:
            # Install subscriber FIRST so any publish that happens after we
            # release the lock is delivered to the queue.
            self._subscribers[sid] = q

            # Build replay list from current history under the same lock.
            if last_event_id is None:
                replayed: Optional[list[FsEvent]] = list(self._history)
            else:
                found = False
                out: list[FsEvent] = []
                for ev in self._history:
                    if found:
                        out.append(ev)
                    elif ev.event_id == last_event_id:
                        found = True
                replayed = out if found else None

        return sid, q, replayed

    def replay_since(self, last_event_id: Optional[str]) -> Optional[list[FsEvent]]:
        """Return events after the given event_id cursor.

        Return semantics:
        - `last_event_id=None`: returns the full ring buffer (list, possibly empty).
        - Cursor found in ring: returns a list of events strictly AFTER the
          cursor. May be empty if the cursor is the most recent event — this
          means the client is up-to-date, NOT a cursor gap.
        - Cursor NOT found in ring (rolled past): returns None. Callers interpret
          None as a true cursor gap and should force-resync via a cursor_gap
          event or full refresh.

        This distinction matters: heartbeats flow through the ring every ~15s,
        so a client that reconnects quickly almost always has a cursor that
        exists in the ring but has no newer events. Returning [] for "up to
        date" vs None for "gap" lets sse_stream avoid spurious cursor_gap
        frames on every idle reconnect.
        """
        with self._lock:
            if last_event_id is None:
                return list(self._history)

            found = False
            out: list[FsEvent] = []
            for ev in self._history:
                if found:
                    out.append(ev)
                elif ev.event_id == last_event_id:
                    found = True
            return out if found else None
