"""CallbackTransport — subscribe to a broker and invoke a callable per event.

Primarily used in tests. The watcher library ships this so consumers can
wire up a synchronous callback without implementing queue-draining themselves.
"""

from __future__ import annotations

import queue
import threading
from typing import Callable, Optional

from ..events import FsEvent
from ..pubsub import EventBroker


class CallbackTransport:
    """Subscribes to an EventBroker and invokes a callback for each event.

    Runs a background daemon thread that drains the subscriber queue and
    calls the provided callback synchronously per event. If the callback
    raises, the exception is swallowed and logged (so one bad consumer
    doesn't kill the transport).
    """

    def __init__(
        self,
        broker: EventBroker,
        callback: Callable[[FsEvent], None],
    ):
        self._broker = broker
        self._callback = callback
        self._sid: Optional[str] = None
        self._queue: Optional[queue.Queue] = None
        self._thread: Optional[threading.Thread] = None
        self._stop_event = threading.Event()

    def start(self) -> None:
        """Subscribe to the broker and start the drain thread."""
        self._sid, self._queue = self._broker.subscribe()
        self._stop_event.clear()
        self._thread = threading.Thread(target=self._drain_loop, daemon=True)
        self._thread.start()

    def stop(self) -> None:
        """Stop the drain thread and unsubscribe."""
        self._stop_event.set()
        if self._sid is not None:
            self._broker.unsubscribe(self._sid)
            self._sid = None
        # Poison the queue so the thread wakes up and notices stop_event
        if self._queue is not None:
            try:
                self._queue.put_nowait(None)  # type: ignore[arg-type]
            except queue.Full:
                pass
        if self._thread is not None:
            self._thread.join(timeout=2.0)
            self._thread = None

    def _drain_loop(self) -> None:
        assert self._queue is not None
        while not self._stop_event.is_set():
            try:
                ev = self._queue.get(timeout=0.1)
            except queue.Empty:
                continue
            if ev is None:  # poison from stop()
                break
            try:
                self._callback(ev)
            except Exception:
                # Swallow callback exceptions — don't kill the transport
                import logging
                logging.getLogger(__name__).exception("CallbackTransport callback raised")
