"""Phase 19 — SSE routes smoke tests.

Full SSE wire reads are exercised by the curl-based validation gate, not
here — TestClient buffers chunked streams and would deadlock against
``EventSourceResponse`` (which never closes from the server side).

These unit tests cover the testable surface:
  • The /api/events/stream route is registered on the FastAPI app.
  • The SSE generator yields the SSE-shape dicts {id, event, data:json}
    when fed BUS events directly.
  • Mutation routes land EngineEvent rows on BUS, observable via
    BUS.history() — the same data path the SSE endpoint streams.
"""
from __future__ import annotations

import asyncio
import json

import pytest
from fastapi.testclient import TestClient

from recoil.api.eventbus import BUS
from recoil.api.main import app
from recoil.api.sse_routes import _event_generator


@pytest.fixture
def client():
    BUS._reset_for_tests()
    with TestClient(app) as c:
        yield c


def test_stream_route_is_registered() -> None:
    paths = [r.path for r in app.routes]  # type: ignore[attr-defined]
    assert "/api/events/stream" in paths


def test_stream_replays_ring_history_via_last_event_id() -> None:
    # Pre-populate the ring with three events.
    async def setup() -> list[str]:
        BUS._reset_for_tests()
        BUS.bind_loop(asyncio.get_running_loop())
        BUS.emit_sync("info", "test", "e0")
        BUS.emit_sync("info", "test", "e1")
        BUS.emit_sync("info", "test", "e2")
        return [e.id for e in BUS.history()]

    ids = asyncio.new_event_loop().run_until_complete(setup())

    # Without Last-Event-ID, history is all 3.
    full = BUS.history()
    assert [e.id for e in full] == ids
    # With Last-Event-ID set to the second id, only the third remains.
    after = BUS.history(after_id=ids[1])
    assert [e.id for e in after] == [ids[2]]


def test_event_generator_emits_sse_shape() -> None:
    # Drive the SSE generator with one event and confirm the shape.
    async def run() -> dict:
        BUS._reset_for_tests()
        BUS.bind_loop(asyncio.get_running_loop())
        BUS.emit_sync("warning", "test", "shape")
        gen = _event_generator(last_event_id=None)
        out = await gen.__anext__()
        await gen.aclose()
        return out

    out = asyncio.new_event_loop().run_until_complete(run())
    assert set(out.keys()) == {"id", "event", "data"}
    assert out["event"] == "engine_event"
    body = json.loads(out["data"])
    assert body["severity"] == "warning"
    assert body["scope"] == "test"
    assert body["summary"] == "shape"


def test_mutation_route_event_lands_on_bus(client: TestClient) -> None:
    # Mutation should land an event on BUS that any SSE subscriber would see.
    BUS._reset_for_tests()
    client.post("/api/proposals/prop_007/reject")
    history = BUS.history()
    assert len(history) == 1
    ev = history[0]
    assert ev.payload["proposal_id"] == "prop_007"
    assert ev.payload["decision"] == "rejected"


# ── Debug R2 — query-param fallback for cross-page-reload resume ────────────


def test_event_generator_honors_query_param_resume() -> None:
    """SSE generator yields only events strictly after the resume id.

    Avoids the SSE TestClient deadlock by driving _event_generator directly
    with the same id-string the route would extract from ?lastEventId=.
    """
    async def run() -> list[dict]:
        BUS._reset_for_tests()
        BUS.bind_loop(asyncio.get_running_loop())
        BUS.emit_sync("info", "test", "first")
        BUS.emit_sync("info", "test", "second")
        BUS.emit_sync("info", "test", "third")
        ids = [e.id for e in BUS.history()]
        gen = _event_generator(last_event_id=ids[1])
        out: list[dict] = []
        # Only ids[2] is strictly after ids[1]; gen would block waiting for
        # more, so collect history-replay then close.
        first_after = await gen.__anext__()
        out.append(first_after)
        await gen.aclose()
        return out

    out = asyncio.new_event_loop().run_until_complete(run())
    assert len(out) == 1
    body = json.loads(out[0]["data"])
    assert body["summary"] == "third"


def test_stream_route_query_param_takes_effect_through_subscribe() -> None:
    """End-to-end at the BUS layer: subscribe with after_id behaves correctly.

    Covers the path the route stitches: the route extracts ``lastEventId``
    from query (when no header), passes it to ``_event_generator`` which
    forwards to ``BUS.subscribe`` — same code path as header-driven resume.
    """
    async def run() -> list[str]:
        BUS._reset_for_tests()
        BUS.bind_loop(asyncio.get_running_loop())
        BUS.emit_sync("info", "test", "a")
        BUS.emit_sync("info", "test", "b")
        BUS.emit_sync("info", "test", "c")
        ids = [e.id for e in BUS.history()]
        gen = _event_generator(last_event_id=ids[0])
        replayed: list[str] = []
        # Two events should be replayed (b and c) before the generator
        # blocks waiting for new ones.
        for _ in range(2):
            entry = await gen.__anext__()
            replayed.append(json.loads(entry["data"])["summary"])
        await gen.aclose()
        return replayed

    summaries = asyncio.new_event_loop().run_until_complete(run())
    assert summaries == ["b", "c"]


# ── Debug R5 — empty header must beat query param (header precedence absolute) ──


def test_empty_header_does_not_fall_back_to_query_param() -> None:
    """Regression for Debug R5.

    Pre-fix, ``stream_events`` used truthiness:
        effective = last_event_id if last_event_id else lastEventId
    so an empty ``Last-Event-ID:`` header (value ``""``) was treated as
    falsy and the route silently fell back to ``?lastEventId=``. That
    violated the documented "header wins" precedence — the client
    explicitly sent the header (empty = "start fresh"), and the route
    should honour that intent rather than resume from a stale query param.

    Post-fix uses ``is not None`` for the header check: an empty-string
    header still beats the query param.

    We verify the route logic by stubbing ``EventSourceResponse`` to
    capture which generator was constructed, then inspecting the
    ``last_event_id`` arg the route forwarded into ``_event_generator``.
    Driving the route through ``EventSourceResponse`` end-to-end via
    TestClient deadlocks (per the module docstring), so we exercise the
    handler function directly.
    """
    import recoil.api.sse_routes as sse_mod

    captured: dict[str, object] = {}

    def fake_generator(last_event_id):
        captured["forwarded"] = last_event_id

        async def _gen():
            if False:
                yield  # pragma: no cover

        return _gen()

    class FakeResponse:
        def __init__(self, gen) -> None:
            self.gen = gen

    async def call_handler() -> None:
        # Empty-string header (client sent ``Last-Event-ID:``) AND a stale
        # query param. Header wins — even when empty.
        await sse_mod.stream_events(
            last_event_id="",
            lastEventId="ev_00000001",
        )

    monkey_event_gen = sse_mod._event_generator
    monkey_response = sse_mod.EventSourceResponse
    sse_mod._event_generator = fake_generator  # type: ignore[assignment]
    sse_mod.EventSourceResponse = FakeResponse  # type: ignore[assignment]
    try:
        asyncio.new_event_loop().run_until_complete(call_handler())
    finally:
        sse_mod._event_generator = monkey_event_gen  # type: ignore[assignment]
        sse_mod.EventSourceResponse = monkey_response  # type: ignore[assignment]

    # Header value (empty string) was forwarded — NOT the query param's
    # ev_00000001. Old truthiness-based fallback would have passed
    # "ev_00000001" here, resuming from a stale id the client never sent.
    assert captured["forwarded"] == "", (
        f"Empty header must beat query param; got {captured['forwarded']!r}"
    )


def test_present_header_beats_query_param() -> None:
    """Sanity for Debug R5 fix: non-empty header still beats query param.

    Mirrors the empty-header test but with a real header value. Verifies
    the ``is not None`` switch didn't accidentally invert the precedence.
    """
    import recoil.api.sse_routes as sse_mod

    captured: dict[str, object] = {}

    def fake_generator(last_event_id):
        captured["forwarded"] = last_event_id

        async def _gen():
            if False:
                yield  # pragma: no cover

        return _gen()

    class FakeResponse:
        def __init__(self, gen) -> None:
            self.gen = gen

    async def call_handler() -> None:
        await sse_mod.stream_events(
            last_event_id="ev_00000007",
            lastEventId="ev_00000001",
        )

    monkey_event_gen = sse_mod._event_generator
    monkey_response = sse_mod.EventSourceResponse
    sse_mod._event_generator = fake_generator  # type: ignore[assignment]
    sse_mod.EventSourceResponse = FakeResponse  # type: ignore[assignment]
    try:
        asyncio.new_event_loop().run_until_complete(call_handler())
    finally:
        sse_mod._event_generator = monkey_event_gen  # type: ignore[assignment]
        sse_mod.EventSourceResponse = monkey_response  # type: ignore[assignment]

    assert captured["forwarded"] == "ev_00000007"


def test_query_param_used_when_header_absent() -> None:
    """Sanity for Debug R5 fix: query param still works when no header.

    The fix only changes ``last_event_id if last_event_id else …`` to
    ``last_event_id if last_event_id is not None else …`` — so
    ``last_event_id=None`` (header absent) must still fall back to the
    query param. Easy to break by flipping the condition.
    """
    import recoil.api.sse_routes as sse_mod

    captured: dict[str, object] = {}

    def fake_generator(last_event_id):
        captured["forwarded"] = last_event_id

        async def _gen():
            if False:
                yield  # pragma: no cover

        return _gen()

    class FakeResponse:
        def __init__(self, gen) -> None:
            self.gen = gen

    async def call_handler() -> None:
        await sse_mod.stream_events(
            last_event_id=None,
            lastEventId="ev_00000042",
        )

    monkey_event_gen = sse_mod._event_generator
    monkey_response = sse_mod.EventSourceResponse
    sse_mod._event_generator = fake_generator  # type: ignore[assignment]
    sse_mod.EventSourceResponse = FakeResponse  # type: ignore[assignment]
    try:
        asyncio.new_event_loop().run_until_complete(call_handler())
    finally:
        sse_mod._event_generator = monkey_event_gen  # type: ignore[assignment]
        sse_mod.EventSourceResponse = monkey_response  # type: ignore[assignment]

    assert captured["forwarded"] == "ev_00000042"
