"""Tests for fs_watcher.transports.sse — framework-agnostic SSE generator."""

from __future__ import annotations

import threading
import time

import pytest

from recoil.pipeline._lib.fs_watcher.events import FsEvent, FsEventType
from recoil.pipeline._lib.fs_watcher.pubsub import EventBroker
from recoil.pipeline._lib.fs_watcher.transports.sse import format_sse_frame, sse_stream


def _make_event(event_id: str, event_type: FsEventType = FsEventType.MODIFIED) -> FsEvent:
    return FsEvent(
        event_id=event_id,
        event_type=event_type,
        path="projects/test/file.txt",
        project="test",
        asset_type=None,
        asset_id=None,
        src_path=None,
        is_directory=False,
        size_bytes=100,
        sha256=None,
        mtime=1700000000.0,
        ts=1700000001.0,
    )


def test_format_sse_frame_structure():
    """An SSE frame is `id: X\\nevent: Y\\ndata: {...}\\n\\n` encoded as bytes."""
    ev = _make_event("evt_1", FsEventType.CREATED)
    frame = format_sse_frame(ev)
    assert isinstance(frame, bytes)
    text = frame.decode("utf-8")
    assert text.startswith("id: evt_1\n")
    assert "event: created\n" in text
    assert "data: " in text
    assert text.endswith("\n\n")


def test_sse_stream_initial_connect_skips_history():
    """On initial connect (last_event_id=None), sse_stream does NOT replay history.

    This prevents a render storm on every page load: if the ring buffer has 500
    events and we replayed them all, the client would call _render() 500 times
    concurrently and DDoS the server.
    """
    broker = EventBroker(history_size=10)
    # Pre-populate with 5 events
    for i in range(5):
        broker.publish(_make_event(f"hist_{i}"))

    gen = sse_stream(broker, last_event_id=None)

    # First yield should come from a LIVE event published after subscription,
    # not from history. Publish from another thread and verify the generator
    # yields only the live event.
    import threading
    import time
    received: list[bytes] = []

    def consumer():
        try:
            for frame in gen:
                received.append(frame)
                if len(received) >= 1:
                    break
        except Exception:
            pass

    t = threading.Thread(target=consumer, daemon=True)
    t.start()
    time.sleep(0.1)  # let the generator subscribe

    broker.publish(_make_event("live_1"))
    t.join(timeout=2.0)
    gen.close()

    assert len(received) == 1
    assert b"live_1" in received[0]
    # And none of the historical events should have been yielded
    for i in range(5):
        assert f"hist_{i}".encode() not in received[0]


def test_sse_stream_reconnect_with_cursor_replays_after_cursor():
    """On reconnect with last_event_id, sse_stream yields events after the cursor."""
    broker = EventBroker(history_size=10)
    for i in range(5):
        broker.publish(_make_event(f"hist_{i}"))

    gen = sse_stream(broker, last_event_id="hist_1")

    # Should yield hist_2, hist_3, hist_4 (3 events) then wait for live
    frames: list[bytes] = []
    for _ in range(3):
        frames.append(next(gen))

    assert b"hist_2" in frames[0]
    assert b"hist_3" in frames[1]
    assert b"hist_4" in frames[2]

    gen.close()


def test_sse_stream_cursor_gap_yields_synthetic_event():
    """If client's cursor has fallen out of the ring, yield a cursor_gap event.

    This signals the client to force-refresh its entire state instead of
    silently staying out of sync.
    """
    broker = EventBroker(history_size=3)
    # Publish 10 events — ring now holds only evt_7, evt_8, evt_9
    for i in range(10):
        broker.publish(_make_event(f"evt_{i}"))

    # Client had cursor evt_0, but ring has rolled past
    gen = sse_stream(broker, last_event_id="evt_0")
    first_frame = next(gen)
    assert b"event: cursor_gap" in first_frame
    gen.close()


def test_sse_stream_up_to_date_cursor_does_not_yield_cursor_gap():
    """Reconnect with a cursor that is the MOST RECENT event in the ring.

    This is the common idle-reconnect case: the client disconnects briefly
    (tab refocus, network blip, heartbeat cycle) and reconnects with its
    last-seen event id, which happens to be the newest event in the ring.
    `replay_since` returns [] (empty list — up to date), NOT None (cursor
    gap). `sse_stream` must NOT yield a cursor_gap frame — it should just
    fall through to subscribe for live events.
    """
    broker = EventBroker(history_size=10)
    for i in range(5):
        broker.publish(_make_event(f"evt_{i}"))

    gen = sse_stream(broker, last_event_id="evt_4")  # cursor = most recent

    # The generator should not yield any history frames (client is up to date)
    # and should not yield a cursor_gap. It should then block on the live
    # subscription. We publish a live event from another thread to unblock.
    import threading
    received: list[bytes] = []

    def consumer():
        try:
            for frame in gen:
                received.append(frame)
                if len(received) >= 1:
                    break
        except Exception:
            pass

    t = threading.Thread(target=consumer, daemon=True)
    t.start()
    time.sleep(0.1)  # let consumer subscribe

    broker.publish(_make_event("live_1"))
    t.join(timeout=2.0)
    gen.close()

    assert len(received) == 1
    assert b"live_1" in received[0]
    # No cursor_gap should have been yielded
    assert b"cursor_gap" not in received[0]


def test_sse_stream_honors_last_event_id_cursor():
    """With a cursor, sse_stream replays only events after that cursor."""
    broker = EventBroker(history_size=10)
    for i in range(5):
        broker.publish(_make_event(f"evt_{i}"))

    gen = sse_stream(broker, last_event_id="evt_2")

    # Should yield evt_3 and evt_4 (2 events), then wait for live
    frame_3 = next(gen)
    frame_4 = next(gen)
    assert b"evt_3" in frame_3
    assert b"evt_4" in frame_4

    gen.close()


def test_sse_stream_yields_live_events_after_history():
    """After history replay, sse_stream yields live events as they're published."""
    broker = EventBroker()
    gen = sse_stream(broker, last_event_id=None)

    received: list[bytes] = []
    stop = threading.Event()

    def consumer():
        for frame in gen:
            if stop.is_set():
                break
            received.append(frame)
            if len(received) >= 2:
                break

    t = threading.Thread(target=consumer, daemon=True)
    t.start()

    # Give the consumer time to subscribe
    time.sleep(0.1)

    broker.publish(_make_event("live_1"))
    broker.publish(_make_event("live_2"))

    t.join(timeout=2.0)
    stop.set()
    gen.close()

    assert len(received) == 2
    assert b"live_1" in received[0]
    assert b"live_2" in received[1]


# NOTE: test_sse_stream_unsubscribes_on_close was removed during Opus Round 2
# review (R2-C1). With the Gemini C1 fix (skip history on last_event_id=None),
# calling next(gen) after the publish-before-subscribe pattern would block
# forever in sub_queue.get(timeout=30) — the published event was lost because
# it was published BEFORE subscription. The unsubscribe-on-close behavior is
# implicitly verified by the other tests that call gen.close() after
# advancing the generator into the subscribe phase (e.g., the threaded consumer
# in test_sse_stream_initial_connect_skips_history).
