"""Tests for fs_watcher.pubsub — EventBroker with ring-buffered history."""

from __future__ import annotations

import queue
import threading
import time

import pytest

from recoil.pipeline._lib.fs_watcher.events import FsEvent, FsEventType
from recoil.pipeline._lib.fs_watcher.pubsub import EventBroker


def _make_event(event_id: str, path: str = "projects/test/file.txt") -> FsEvent:
    return FsEvent(
        event_id=event_id,
        event_type=FsEventType.MODIFIED,
        path=path,
        project="test",
        asset_type=None,
        asset_id=None,
        src_path=None,
        is_directory=False,
        size_bytes=100,
        sha256=None,
        mtime=1700000000.0,
        ts=1700000001.0,
    )


def test_subscribe_returns_id_and_queue():
    broker = EventBroker()
    sid, q = broker.subscribe()
    assert isinstance(sid, str)
    assert sid.startswith("sub_")
    assert isinstance(q, queue.Queue)


def test_publish_delivers_to_all_subscribers():
    broker = EventBroker()
    sid1, q1 = broker.subscribe()
    sid2, q2 = broker.subscribe()

    ev = _make_event("evt_1")
    broker.publish(ev)

    assert q1.get(timeout=1).event_id == "evt_1"
    assert q2.get(timeout=1).event_id == "evt_1"


def test_publish_appends_to_history_ring_buffer():
    broker = EventBroker(history_size=5)
    for i in range(7):
        broker.publish(_make_event(f"evt_{i}"))

    # Ring buffer should only hold the last 5
    history = broker.replay_since(None)
    assert len(history) == 5
    assert history[0].event_id == "evt_2"  # oldest kept
    assert history[-1].event_id == "evt_6"  # newest


def test_replay_since_with_known_cursor_returns_events_after():
    broker = EventBroker(history_size=10)
    for i in range(5):
        broker.publish(_make_event(f"evt_{i}"))

    # Ask for events after evt_2 — should get evt_3 and evt_4
    tail = broker.replay_since("evt_2")
    ids = [ev.event_id for ev in tail]
    assert ids == ["evt_3", "evt_4"]


def test_replay_since_with_unknown_cursor_returns_none():
    """Client's cursor is older than the ring — None means 'true cursor gap'."""
    broker = EventBroker(history_size=3)
    for i in range(5):
        broker.publish(_make_event(f"evt_{i}"))

    # Ring now holds evt_2, evt_3, evt_4 — cursor evt_0 is gone
    tail = broker.replay_since("evt_0")
    assert tail is None


def test_replay_since_with_up_to_date_cursor_returns_empty_list():
    """Cursor matches the most recent event — returns [] (up to date), NOT None."""
    broker = EventBroker(history_size=10)
    for i in range(5):
        broker.publish(_make_event(f"evt_{i}"))

    # Client already saw evt_4 (the most recent) — should get [] (no newer events)
    tail = broker.replay_since("evt_4")
    assert tail == []  # empty list, NOT None


def test_replay_since_none_returns_full_history():
    broker = EventBroker(history_size=10)
    for i in range(3):
        broker.publish(_make_event(f"evt_{i}"))

    full = broker.replay_since(None)
    ids = [ev.event_id for ev in full]
    assert ids == ["evt_0", "evt_1", "evt_2"]


def test_unsubscribe_removes_subscriber():
    broker = EventBroker()
    sid, q = broker.subscribe()
    broker.unsubscribe(sid)

    # Publish after unsubscribe — queue should stay empty
    broker.publish(_make_event("evt_1"))
    assert q.empty()


def test_slow_consumer_gets_dropped_silently():
    """If a subscriber's queue is full, the broker drops them on next publish."""
    broker = EventBroker()
    sid, q = broker.subscribe()

    # Fill the subscriber's queue beyond capacity (Queue.maxsize=1000 default)
    # Rather than filling it, use a tiny queue via direct swap for test speed
    broker._subscribers[sid] = queue.Queue(maxsize=2)  # pyright: ignore[reportAttributeAccessIssue]
    tiny_q = broker._subscribers[sid]  # pyright: ignore[reportAttributeAccessIssue]

    # Fill beyond capacity
    broker.publish(_make_event("evt_1"))
    broker.publish(_make_event("evt_2"))
    # At this point tiny_q has 2 items (its maxsize)
    broker.publish(_make_event("evt_3"))  # This drops the subscriber

    # Subscriber should be removed
    assert sid not in broker._subscribers  # pyright: ignore[reportAttributeAccessIssue]


def test_subscribe_with_replay_atomic():
    """subscribe_with_replay installs a queue AND returns replay atomically.

    The atomicity guarantee is: once this method returns, any publish that
    happened BEFORE the call is in the returned replay list, and any
    publish that happens AFTER the call returns is delivered to the
    returned queue. There is no window in which an event can be lost.

    We verify three cases:
      1. last_event_id=None returns a working queue that receives live
         events (the full history is returned as the replay list).
      2. last_event_id=<known cursor> returns events-after-cursor AND the
         queue also receives subsequent publishes.
      3. last_event_id=<unknown cursor> returns None (true cursor gap) but
         still a working queue.
    """
    # Case 1: initial connect (last_event_id=None)
    broker = EventBroker(history_size=10)
    for i in range(3):
        broker.publish(_make_event(f"evt_{i}"))

    sid, q, replayed = broker.subscribe_with_replay(None)
    assert replayed is not None
    assert [ev.event_id for ev in replayed] == ["evt_0", "evt_1", "evt_2"]

    # Queue should be empty right now (replay went into the replay list, not the queue)
    assert q.empty()

    # Publish a live event — it MUST land in the queue
    broker.publish(_make_event("evt_3"))
    assert q.get(timeout=1).event_id == "evt_3"
    broker.unsubscribe(sid)

    # Case 2: reconnect with known cursor
    broker = EventBroker(history_size=10)
    for i in range(5):
        broker.publish(_make_event(f"evt_{i}"))

    sid, q, replayed = broker.subscribe_with_replay("evt_2")
    assert replayed is not None
    assert [ev.event_id for ev in replayed] == ["evt_3", "evt_4"]

    # Queue should be empty (replay is separate)
    assert q.empty()

    # Live publish lands in queue
    broker.publish(_make_event("evt_5"))
    assert q.get(timeout=1).event_id == "evt_5"
    broker.unsubscribe(sid)

    # Case 3: reconnect with unknown cursor (true cursor gap)
    broker = EventBroker(history_size=3)
    for i in range(5):
        broker.publish(_make_event(f"evt_{i}"))
    # Ring now only has evt_2, evt_3, evt_4 — evt_0 is rolled out

    sid, q, replayed = broker.subscribe_with_replay("evt_0")
    assert replayed is None  # true cursor gap

    # Queue still works despite the gap
    assert q.empty()
    broker.publish(_make_event("evt_5"))
    assert q.get(timeout=1).event_id == "evt_5"
    broker.unsubscribe(sid)


def test_concurrent_publish_is_thread_safe():
    """Publishing from many threads shouldn't corrupt the history."""
    broker = EventBroker(history_size=1000)

    def worker(start: int):
        for i in range(50):
            broker.publish(_make_event(f"evt_{start}_{i}"))

    threads = [threading.Thread(target=worker, args=(t,)) for t in range(10)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    history = broker.replay_since(None)
    assert len(history) == 500  # 10 threads × 50 events
