"""Phase 19 — EventBus unit tests.

Verifies:
  • Ring buffer caps at the declared capacity (newest 500 retained).
  • emit_sync schedules onto the bound loop and a subscriber receives it.
  • subscribe(last_event_id=…) replays history after the resume id.
  • emit_sync without a bound loop is a no-op (no exception).
  • Subscribers are removed cleanly when the iterator is closed.

Tests use asyncio.run + threads — the project doesn't depend on
pytest-asyncio so the fixtures stay self-contained.
"""
from __future__ import annotations

import asyncio
import threading

from recoil.api.eventbus import BUS, EventBus


def _run(coro):
    return asyncio.new_event_loop().run_until_complete(coro)


def test_emit_sync_delivers_to_subscriber():
    async def scenario() -> list:
        bus = EventBus()
        bus.bind_loop(asyncio.get_running_loop())
        received: list = []

        async def consumer() -> None:
            async for ev in bus.subscribe():
                received.append(ev)
                return

        task = asyncio.create_task(consumer())
        await asyncio.sleep(0)
        bus.emit_sync("info", "test/scope", "hello")
        await asyncio.wait_for(task, timeout=1.0)
        return received

    received = _run(scenario())
    assert len(received) == 1
    assert received[0].severity == "info"
    assert received[0].scope == "test/scope"
    assert received[0].summary == "hello"


def test_emit_sync_from_other_thread_reaches_loop():
    async def scenario() -> list:
        bus = EventBus()
        bus.bind_loop(asyncio.get_running_loop())
        received: list = []

        async def consumer() -> None:
            async for ev in bus.subscribe():
                received.append(ev)
                return

        task = asyncio.create_task(consumer())
        await asyncio.sleep(0)
        # Off-loop thread emits — proves the run_coroutine_threadsafe bridge.
        t = threading.Thread(
            target=lambda: bus.emit_sync("warning", "off-thread", "from worker")
        )
        t.start()
        t.join()
        await asyncio.wait_for(task, timeout=1.0)
        return received

    received = _run(scenario())
    assert received[0].severity == "warning"
    assert received[0].scope == "off-thread"


def test_ring_buffer_caps_at_capacity():
    async def scenario() -> EventBus:
        bus = EventBus(ring_capacity=10)
        bus.bind_loop(asyncio.get_running_loop())
        for i in range(25):
            await bus.emit(bus._build_event("info", "x", f"e{i}", None, None))
        return bus

    bus = _run(scenario())
    history = bus.history()
    assert len(history) == 10
    # Newest retained — first kept is e15, last is e24.
    assert history[0].summary == "e15"
    assert history[-1].summary == "e24"


def test_history_resume_after_id():
    async def scenario():
        bus = EventBus(ring_capacity=20)
        bus.bind_loop(asyncio.get_running_loop())
        ids: list[str] = []
        for i in range(5):
            ev = bus._build_event("info", "x", f"e{i}", None, None)
            ids.append(ev.id)
            await bus.emit(ev)
        return bus, ids

    bus, ids = _run(scenario())
    after_third = bus.history(after_id=ids[2])
    assert [e.id for e in after_third] == ids[3:]


def test_history_resume_with_unknown_id_returns_full_history():
    async def scenario() -> EventBus:
        bus = EventBus(ring_capacity=20)
        bus.bind_loop(asyncio.get_running_loop())
        for i in range(3):
            await bus.emit(bus._build_event("info", "x", f"e{i}", None, None))
        return bus

    bus = _run(scenario())
    full = bus.history(after_id="ev_does_not_exist")
    assert len(full) == 3


def test_emit_sync_without_bound_loop_is_noop():
    # No loop bound — emit_sync must NOT raise.
    bus = EventBus()
    bus.emit_sync("info", "x", "no loop")
    # Nothing in ring (we never reached emit() without the bridge).
    assert bus.history() == []


def test_subscribe_replays_history_then_streams_live():
    async def scenario() -> list:
        bus = EventBus(ring_capacity=10)
        bus.bind_loop(asyncio.get_running_loop())
        # Two history entries.
        await bus.emit(bus._build_event("info", "x", "h0", None, None))
        await bus.emit(bus._build_event("info", "x", "h1", None, None))
        received: list = []

        async def consume(n: int) -> None:
            async for ev in bus.subscribe():
                received.append(ev.summary)
                if len(received) >= n:
                    return

        task = asyncio.create_task(consume(3))
        await asyncio.sleep(0)
        # Live emit after subscribe started.
        bus.emit_sync("info", "x", "live")
        await asyncio.wait_for(task, timeout=1.0)
        return received

    received = _run(scenario())
    assert received == ["h0", "h1", "live"]


def test_unsubscribe_on_iter_close():
    async def scenario() -> tuple[int, int]:
        bus = EventBus()
        bus.bind_loop(asyncio.get_running_loop())
        assert len(bus._subscribers) == 0

        gen = bus.subscribe()
        # Drive the generator once to register subscription + receive history.
        # No history yet, so this awaits a live event.
        receive_task = asyncio.create_task(gen.__anext__())
        await asyncio.sleep(0)
        live_count = len(bus._subscribers)
        bus.emit_sync("info", "x", "tick")
        await asyncio.wait_for(receive_task, timeout=1.0)
        # Now explicitly close the generator → finally block runs.
        await gen.aclose()
        return (live_count, len(bus._subscribers))

    live, after = _run(scenario())
    assert live == 1
    assert after == 0


def test_singleton_bus_reset_for_tests_clears_state():
    BUS._reset_for_tests()
    assert BUS.history() == []
    assert BUS._subscribers == []
    assert BUS._next_id == 0


# ── Debug R3 — cross-thread emit_sync ordering ─────────────────────────────


def test_cross_thread_emit_sync_preserves_id_append_order():
    """Two worker threads concurrently calling emit_sync produce a ring
    where the i-th element's id has the i-th increment value.

    Pre-Debug-R3, ``_build_event`` assigned the id under the lock but the
    actual ``self.emit(event)`` ring-append happened ASYNC on the loop —
    so two cross-thread emits could land in the loop in either order,
    breaking ``_history_after_locked``'s "higher index = later id"
    assumption.

    Post-fix: cross-thread path appends to the ring under the same lock
    that holds the id assignment. This test would fail with non-zero
    probability under the old code — under the fixed code it's
    deterministic.
    """
    import re

    async def scenario() -> EventBus:
        bus = EventBus(ring_capacity=1000)
        bus.bind_loop(asyncio.get_running_loop())

        n_per_thread = 50
        n_threads = 2
        total = n_per_thread * n_threads
        ready = threading.Barrier(n_threads)

        def worker(tag: str) -> None:
            ready.wait()  # maximize contention
            for i in range(n_per_thread):
                bus.emit_sync("info", "race", f"{tag}-{i}")

        threads = [
            threading.Thread(target=worker, args=(f"t{n}",))
            for n in range(n_threads)
        ]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        # Drain the loop so any scheduled fan-out coroutines (none here —
        # no subscribers — but still). Yield once to flush.
        await asyncio.sleep(0)
        # Sanity: every event's id is in the ring.
        assert len(bus.history()) == total
        return bus

    bus = _run(scenario())
    history = bus.history()
    # 1) ids are unique.
    ids = [ev.id for ev in history]
    assert len(set(ids)) == len(ids)
    # 2) ids are monotonically increasing in append order.
    id_re = re.compile(r"^ev_(\d{8})$")
    nums = [int(id_re.match(eid).group(1)) for eid in ids]  # type: ignore[union-attr]
    assert nums == sorted(nums), (
        f"ring append order does not match id order: {nums[:10]}..."
    )
    # 3) and they're contiguous (no gaps) — every id from 1..N is present.
    assert nums == list(range(1, len(nums) + 1))


def test_cross_thread_emit_sync_history_after_resume_is_well_ordered():
    """``history(after_id=…)`` returns events strictly newer than the
    resume id — relies on ring-append order matching id order."""

    async def scenario() -> tuple[EventBus, str]:
        bus = EventBus(ring_capacity=1000)
        bus.bind_loop(asyncio.get_running_loop())

        n_per_thread = 30
        ready = threading.Barrier(2)

        def worker(tag: str) -> None:
            ready.wait()
            for i in range(n_per_thread):
                bus.emit_sync("info", "race", f"{tag}-{i}")

        ts = [threading.Thread(target=worker, args=(f"t{n}",)) for n in range(2)]
        for t in ts:
            t.start()
        for t in ts:
            t.join()
        await asyncio.sleep(0)
        # Pick the 10th-from-last event's id as the resume point.
        full = bus.history()
        resume_id = full[-11].id
        return bus, resume_id

    bus, resume_id = _run(scenario())
    after = bus.history(after_id=resume_id)
    # Strictly later than resume_id, and exactly 10 events.
    assert len(after) == 10
    # Their ids must all be > resume_id (numerically).
    resume_num = int(resume_id.split("_")[1])
    for ev in after:
        assert int(ev.id.split("_")[1]) > resume_num


# ── Debug R4 — cross-thread emit_sync × concurrent subscribe(): no double delivery ──


def test_cross_thread_emit_no_double_delivery_during_subscribe():
    """Regression for Debug R4.

    R3's fix preserved id/append order under lock for ring-append, but split
    the fan-out OUTSIDE the lock. That left a window where a worker thread's
    cross-thread ``emit_sync`` could:
      1. acquire lock, append event to ring, release lock, schedule
         ``_broadcast_locked`` coro on the loop;
      2. before the broadcast coro runs, a concurrent ``subscribe()`` on the
         loop locks, snapshots ring (sees event), registers as listener,
         unlocks;
      3. broadcast coro runs: snapshots listeners (includes new one),
         delivers event.

    Result: subscriber receives the event TWICE — once via history replay,
    once via the live queue.

    R4 fix: the cross-thread ``emit_sync`` records the event id in
    ``_pending_fanout`` under the same lock as the ring-append.
    ``subscribe()`` excludes ``_pending_fanout`` ids from its history
    snapshot under the same lock. ``_broadcast_locked`` discards the id
    from the set under the same lock that snapshots subscribers. Net:
    every event is delivered exactly once — either via history (already
    fanned out) or via live queue (still pending), never both.

    This test races a worker thread emitting cross-thread against a loop
    subscriber that registers shortly after the burst starts. Without R4
    the assertion ``len(received) == len(set(received))`` fires under
    contention.
    """

    # Strategy: emit cross-thread continuously while spawning many
    # subscribers continuously on the loop. Each subscriber's critical
    # section may interleave with a broadcast coro that's already been
    # scheduled (because cross-thread emit_sync adds to ring + schedules
    # broadcast in two stages — even though the ring-append is now under
    # lock with _pending_fanout marking, the broadcast coro runs on the
    # loop strictly later than any subscribe() that's already in _ready).
    # Without R4's _pending_fanout mechanism, subscribers spawned during
    # the in-flight window receive the event via history (sees ring) AND
    # via live queue (when broadcast eventually runs).

    async def scenario() -> list[tuple[int, list[str]]]:
        bus = EventBus(ring_capacity=10000)
        bus.bind_loop(asyncio.get_running_loop())

        n_emits_per_thread = 40
        n_emit_threads = 3
        n_subscribers = 25  # spawned over the lifetime of the emit burst
        total_emits = n_emits_per_thread * n_emit_threads

        per_subscriber_received: list[tuple[int, list[str]]] = []
        emit_done = asyncio.Event()

        async def consumer(idx: int) -> None:
            ids: list[str] = []
            try:
                async for ev in bus.subscribe():
                    ids.append(ev.id)
                    # If we've gathered everything emitted so far AND
                    # emit is done, stop.
                    if emit_done.is_set() and len(ids) >= total_emits:
                        break
                    # Generous cap so a misbehaving test doesn't hang.
                    if len(ids) > 4 * total_emits:
                        break
            finally:
                per_subscriber_received.append((idx, ids))

        ready = threading.Barrier(n_emit_threads)

        def emit_thread(tag: str) -> None:
            ready.wait()  # maximize contention between emit threads
            for i in range(n_emits_per_thread):
                bus.emit_sync("info", "race", f"{tag}-{i}")

        threads = [
            threading.Thread(target=emit_thread, args=(f"t{n}",))
            for n in range(n_emit_threads)
        ]
        for t in threads:
            t.start()

        # Race: spawn subscribers WHILE emits are landing on the ring.
        # Each subscriber's critical section may run between a
        # cross-thread ring-append and the matching broadcast coro,
        # exposing the R4 double-delivery window.
        consumer_tasks = []
        for idx in range(n_subscribers):
            consumer_tasks.append(asyncio.create_task(consumer(idx)))
            # Yield once per spawn so subscribers actually get scheduled
            # interleaved with the broadcast coros arriving from threads.
            await asyncio.sleep(0)

        for t in threads:
            t.join()
        emit_done.set()

        # Drain the loop so all scheduled broadcast coros run.
        await asyncio.sleep(0.05)

        # Cancel any still-pending consumer tasks (they may be waiting
        # for more events than will arrive given small per-subscriber
        # windows).
        for task in consumer_tasks:
            if not task.done():
                task.cancel()
        for task in consumer_tasks:
            try:
                await task
            except (asyncio.CancelledError, asyncio.TimeoutError):
                pass

        return per_subscriber_received

    results = _run(scenario())

    # The R4 invariant: NO subscriber sees the same event id twice.
    duplicates_found: list[tuple[int, str]] = []
    for idx, ids in results:
        seen: set[str] = set()
        for ev_id in ids:
            if ev_id in seen:
                duplicates_found.append((idx, ev_id))
                break
            seen.add(ev_id)

    assert not duplicates_found, (
        f"R4 regression: {len(duplicates_found)} subscriber(s) received "
        f"duplicate event ids (sample: {duplicates_found[:5]})"
    )


# ── Debug R5 — _pending_fanout cleanup on cross-thread schedule failure ─────


def test_pending_fanout_cleaned_up_when_schedule_fails():
    """Regression for Debug R5.

    The cross-thread ``emit_sync`` path adds the event id to
    ``_pending_fanout`` under lock, then calls
    ``asyncio.run_coroutine_threadsafe(self._broadcast_locked(event), loop)``.
    Pre-fix, if that call raised (e.g. the loop closes between the
    ``loop.is_closed()`` check and the schedule call — a real race during
    lifespan teardown), the outer ``except Exception`` swallowed the error
    and the event id stayed in ``_pending_fanout`` forever. Two failure
    modes:
      1. unbounded memory growth on every failed schedule;
      2. every future ``subscribe()`` excludes that id from history replay
         (because R4's filter treats ``_pending_fanout`` as in-flight) —
         even though no broadcast will ever fire to deliver it via the
         live queue, so the event is permanently lost from resume paths.

    Post-fix wraps the schedule call in its own try/except and discards the
    id from ``_pending_fanout`` under the lock before re-raising into the
    outer guard.

    This test patches ``asyncio.run_coroutine_threadsafe`` to raise from
    a worker thread, then asserts ``_pending_fanout`` is empty.
    """
    import recoil.api.eventbus as eb_mod

    async def scenario() -> tuple[set[str], int]:
        bus = EventBus()
        bus.bind_loop(asyncio.get_running_loop())

        original = eb_mod.asyncio.run_coroutine_threadsafe

        def boom(coro, loop):
            # Close the coroutine so we don't leak a "coroutine never
            # awaited" RuntimeWarning that pytest -W error would surface.
            coro.close()
            raise RuntimeError("simulated schedule failure (loop closing)")

        eb_mod.asyncio.run_coroutine_threadsafe = boom  # type: ignore[assignment]

        try:
            # Off-loop thread emits — exercises the cross-thread branch.
            t = threading.Thread(
                target=lambda: bus.emit_sync("info", "r5", "schedule fails"),
            )
            t.start()
            t.join()
        finally:
            eb_mod.asyncio.run_coroutine_threadsafe = original  # type: ignore[assignment]

        # The event was appended to the ring (cross-thread path appends
        # under lock BEFORE scheduling), but the scheduler raised — so
        # ``_pending_fanout`` MUST have been cleaned up. Pre-fix this set
        # would still contain the event id.
        return set(bus._pending_fanout), len(bus.history())

    pending, ring_len = _run(scenario())
    assert pending == set(), (
        f"R5 regression: _pending_fanout leaked after schedule failure: "
        f"{pending!r}"
    )
    # Sanity: the event still made it into the ring (history append happened
    # before the schedule call). It's reachable via history replay now that
    # the pending-fanout filter has been cleared.
    assert ring_len == 1


def test_pending_fanout_cleanup_lets_subsequent_subscribe_replay_event():
    """R5 follow-on: after schedule failure + cleanup, ``subscribe()``
    must replay the orphaned event from history.

    Pre-fix the id sat in ``_pending_fanout`` permanently, so
    ``subscribe()``'s R4 history filter would exclude it forever — the
    event was effectively unrecoverable.

    Post-fix the cleanup discards the id on schedule-failure, restoring
    the event to the history-replay set.
    """
    import recoil.api.eventbus as eb_mod

    async def scenario() -> list[str]:
        bus = EventBus()
        bus.bind_loop(asyncio.get_running_loop())

        original = eb_mod.asyncio.run_coroutine_threadsafe

        def boom(coro, loop):
            coro.close()
            raise RuntimeError("simulated schedule failure")

        eb_mod.asyncio.run_coroutine_threadsafe = boom  # type: ignore[assignment]

        try:
            t = threading.Thread(
                target=lambda: bus.emit_sync("info", "r5", "orphaned"),
            )
            t.start()
            t.join()
        finally:
            eb_mod.asyncio.run_coroutine_threadsafe = original  # type: ignore[assignment]

        # New subscriber should see the orphaned event via history replay.
        replayed: list[str] = []

        async def consumer() -> None:
            async for ev in bus.subscribe():
                replayed.append(ev.summary)
                # Only one event in the ring; close after replaying it.
                return

        # Subscribe runs on the same loop. Drive it directly via aclose
        # rather than racing with a live emit.
        gen = bus.subscribe()
        try:
            ev = await gen.__anext__()
            replayed.append(ev.summary)
        except StopAsyncIteration:
            pass
        finally:
            await gen.aclose()
        return replayed

    replayed = _run(scenario())
    assert replayed == ["orphaned"], (
        f"R5 regression: orphaned event was filtered out of history replay "
        f"by stale _pending_fanout entry; got {replayed!r}"
    )


# ── Debug R9 — id-assign + ring-append are atomic under thread contention ──


def test_id_to_ring_order_atomic_under_thread_contention():
    """Regression for Debug R9.

    Pre-fix, the cross-thread ``emit_sync`` path called ``_build_event``,
    which acquired ``self._lock`` to bump ``_next_id`` and RELEASED the
    lock before returning. Then ``emit_sync`` re-acquired the lock to do
    the ring-append. Between those two critical sections a second thread
    could:

      1. Acquire lock, bump ``_next_id`` (gets N+1)
      2. Append to ring  (event N+1 lands first)
      3. Release lock

    Then thread #1 finally re-acquires the lock and appends event N. The
    ring now contains ``[..., ev_N+1, ev_N]`` — out of order.

    This breaks ``_history_after_locked``'s "higher ring index = later id"
    invariant. SSE Last-Event-ID resume relies on this invariant: a client
    that received ev_N+1 and reconnects with ``Last-Event-ID: ev_N+1`` will
    silently miss ev_N (linear scan finds ev_N+1's index, slices everything
    after — but ev_N is BEHIND ev_N+1 in the ring, so it's already
    "consumed" from the slicer's perspective).

    Post-fix: id-assign and ring-append happen inside a SINGLE lock
    acquisition (``_build_event_locked`` is called inside ``emit_sync``'s
    ``with self._lock:`` block). The race window is gone.

    To make the regression deterministic (the GIL alone makes the buggy
    interleaving probabilistic), we monkey-patch ``_build_event`` on the
    bus instance so the first thread to enter it BLOCKS until a second
    thread has fully completed its own ``emit_sync`` call. Under the bug
    this forces ring order ``[N+1, N]`` 100% of the time. Under the fix
    the patch can never block (because production ``emit_sync`` no longer
    routes through ``_build_event``), so the test passes.

    NOTE: this test relies on the implementation detail that the post-fix
    cross-thread path uses ``_build_event_locked`` directly and DOES NOT
    call ``_build_event``. If a future refactor reintroduces the wrapper
    in the cross-thread hot path, the patched-blocker would re-trigger
    and this test would also need to update. Acceptable trade for a
    deterministic R9 regression.
    """
    import time

    async def scenario() -> EventBus:
        bus = EventBus(ring_capacity=10000)
        bus.bind_loop(asyncio.get_running_loop())

        # Sequence: thread A enters _build_event, releases lock with id=1,
        # then BLOCKS for a moment. While blocked, thread B enters
        # _build_event, gets id=2, exits, appends to ring (under bug).
        # Then A unblocks, appends id=1 to ring. Ring = [ev_2, ev_1] = bug.
        a_in_build = threading.Event()
        b_done = threading.Event()
        original_build = bus._build_event
        thread_a_name = "emit-A"

        def slow_build_for_a(*args, **kwargs):
            # Only delay thread A so B has time to overtake. Under the
            # post-fix code _build_event is NEVER called from emit_sync's
            # hot path — so this delay never fires and the test passes
            # because both threads serialize through the single lock.
            current = threading.current_thread().name
            if current == thread_a_name:
                ev = original_build(*args, **kwargs)
                a_in_build.set()
                # Wait up to 500ms for B to overtake. If B never enters
                # this path (post-fix) we just unblock after the timeout
                # and continue — the ring will be naturally ordered.
                b_done.wait(timeout=0.5)
                return ev
            else:
                ev = original_build(*args, **kwargs)
                # Signal A that B has finished its emit. We can't easily
                # know "B has appended" from here — but B's emit_sync will
                # complete the ring-append synchronously after exiting
                # _build_event, so a small sleep gives B time to land.
                time.sleep(0.01)
                b_done.set()
                return ev

        bus._build_event = slow_build_for_a  # type: ignore[method-assign]

        def emit_a():
            bus.emit_sync("info", "race", "a-event")

        def emit_b():
            # Wait until A is parked between id-assign and ring-append.
            a_in_build.wait(timeout=1.0)
            bus.emit_sync("info", "race", "b-event")

        ta = threading.Thread(target=emit_a, name=thread_a_name)
        tb = threading.Thread(target=emit_b, name="emit-B")
        ta.start()
        tb.start()
        ta.join()
        tb.join()
        await asyncio.sleep(0)
        return bus

    bus = _run(scenario())
    history = bus.history()
    # Sanity: both events landed.
    assert len(history) == 2

    # Under the bug: history = [ev_00000002 (b), ev_00000001 (a)] — out of order.
    # Under the fix: history = [ev_00000001 (a), ev_00000002 (b)] — in order
    # (because the fix never calls _build_event from emit_sync, so the
    # patched blocker cannot fire — id-assign + ring-append serialize
    # naturally through the single lock).
    ids = [int(ev.id.replace("ev_", "")) for ev in history]
    assert ids == sorted(ids), (
        f"R9 regression: ring append order does not match id order. "
        f"history (in ring order): {[(ev.id, ev.summary) for ev in history]!r}"
    )
