"""FsWatcher — wraps watchdog.Observer, debounces, emits via EventBroker.

Load-bearing constraints:
- Watcher root is projects_root() (from core.paths). NEVER use PIPELINE_ROOT
  or RECOIL_ROOT — canonical refs live under projects_root(), not
  under the code tree. The wrong root produces zero events and silently
  fails the Phase 2 acceptance gate.
- _meta/ exclusion is load-bearing — sidecar writes would otherwise create
  a self-triggering loop (op writes manifest → watcher fires → UI refetches
  → cycle).
- 200ms debounce collapses fast editor saves (IntelliJ temp+rename pattern).
- 15s heartbeat keeps EventSource connections alive through idle periods.
"""

from __future__ import annotations

import logging
import os
import threading
import time
import uuid
from pathlib import Path
from typing import Iterable, Optional

from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer

from .events import FsEvent, FsEventType
from .pubsub import EventBroker

logger = logging.getLogger(__name__)


EXCLUDED_PATH_PARTS = frozenset({
    "_thumbs",
    "_exploration",
    "_meta",         # LOAD-BEARING — see module docstring
    ".DS_Store",
    "__pycache__",
    ".git",
})

DEBOUNCE_MS = 200
HEARTBEAT_SECONDS = 15


class _DebounceHandler(FileSystemEventHandler):
    """Watchdog handler that debounces per-path and routes into an EventBroker.

    Also filters out the FSEvents cold-start backlog on macOS: when an
    Observer is scheduled on a newly-created directory tree, FSEvents
    replays CREATED events for every pre-existing directory as if they
    were brand-new. We snapshot the set of pre-existing paths at startup
    and drop CREATED events for paths already in the snapshot.
    """

    def __init__(
        self,
        broker: EventBroker,
        existing_paths: Optional[set[str]] = None,
        roots: Optional[tuple[str, ...]] = None,
    ):
        self._broker = broker
        self._last_seen: dict[str, float] = {}
        self._lock = threading.Lock()
        # Paths that existed on disk when the watcher started — a CREATED
        # event for one of these is an FSEvents backlog replay and must be
        # suppressed so consumers only see genuinely new paths. MODIFIED
        # events are NOT filtered against this set; a modification to a
        # pre-existing file is a legitimate edit that must publish.
        #
        # Mutable set, protected by self._lock. Paths are removed when
        # a DELETED or MOVED event fires, so that if the file is later
        # re-created with the same name, the new CREATED event is not
        # filtered as a stale backlog replay.
        self._existing_paths: set[str] = set(existing_paths) if existing_paths else set()
        # Normalized watch roots — events outside any root are dropped.
        # macOS FSEvents sometimes delivers events for the PARENT of a
        # watched directory as part of the backlog (e.g., a MODIFIED
        # event for the pytest session dir when a child test dir was
        # freshly created). These are not meaningful to our consumers.
        self._roots: tuple[str, ...] = roots or ()

    def _is_excluded(self, path: str) -> bool:
        parts = Path(path).parts
        return any(p in EXCLUDED_PATH_PARTS for p in parts)

    def _should_emit(self, path: str) -> bool:
        if self._is_excluded(path):
            return False
        now_ms = time.monotonic() * 1000
        with self._lock:
            last = self._last_seen.get(path, 0.0)
            if now_ms - last < DEBOUNCE_MS:
                return False
            # Cap the debounce memory at 10000 entries to prevent unbounded
            # growth over long-running processes. If we hit the cap, clear
            # the whole dict — the next few events may bypass debouncing
            # briefly, which is acceptable.
            if len(self._last_seen) > 10000:
                self._last_seen.clear()
            self._last_seen[path] = now_ms
        return True

    def on_any_event(self, wd_event: FileSystemEvent) -> None:
        # For MOVED events, debounce on the destination path
        path_to_check = getattr(wd_event, "dest_path", None) or wd_event.src_path

        # Drop events whose path lies outside any watched root. macOS
        # FSEvents occasionally delivers events for the PARENT of a
        # watched directory (e.g., a modify event for the parent when a
        # child dir is created) — those are not meaningful to consumers
        # and would otherwise leak into the event stream.
        if self._roots:
            normalized = path_to_check.rstrip("/")
            under_root = False
            for root in self._roots:
                root_stripped = root.rstrip("/")
                if normalized == root_stripped or normalized.startswith(root_stripped + "/"):
                    under_root = True
                    break
            if not under_root:
                return

        # Suppress FSEvents backlog. On macOS, when an Observer is
        # scheduled on a directory tree, FSEvents replays CREATED events
        # for every pre-existing directory and file as if they were
        # brand-new. We drop CREATED events whose path is in the startup
        # snapshot — a CREATED event for a path that already existed is
        # unambiguously a backlog replay.
        #
        # We do NOT filter MODIFIED events against the snapshot. A
        # MODIFIED event for a pre-existing file is a legitimate edit
        # (e.g., the user replaces sadie/hero.jpg with a new image) and
        # MUST be published so the console can re-render.
        wd_event_type = getattr(wd_event, "event_type", "")

        # If this is a DELETED or MOVED event, remove the path from the
        # startup snapshot so a subsequent CREATED event for the same
        # path is NOT filtered as backlog. Tools that write files via
        # atomic rename (temp + mv) generate delete+create pairs; without
        # this prune, the re-created file would appear permanently
        # deleted to the watcher.
        if wd_event_type in ("deleted", "moved"):
            with self._lock:
                self._existing_paths.discard(path_to_check)
                # For moved events, also prune src_path since it's gone
                src = getattr(wd_event, "src_path", None)
                if src and src != path_to_check:
                    self._existing_paths.discard(src)

        if (
            wd_event_type == "created"
            and path_to_check in self._existing_paths
        ):
            return

        if not self._should_emit(path_to_check):
            return
        try:
            broker_ts = time.time()
            event_id = f"evt_{int(broker_ts * 1000)}_{uuid.uuid4().hex[:6]}"
            ev = FsEvent.from_watchdog(wd_event, broker_ts=broker_ts, event_id=event_id)
            self._broker.publish(ev)
        except Exception:
            logger.exception("FsWatcher handler failed to emit event")


class FsWatcher:
    """Watches one or more filesystem roots, publishing events to an EventBroker.

    Usage:
        broker = EventBroker()
        watcher = FsWatcher(roots=[projects_root()], broker=broker)
        watcher.start()
        # ... use broker ...
        watcher.stop()
    """

    def __init__(
        self,
        roots: Iterable[Path],
        broker: Optional[EventBroker] = None,
    ):
        self._roots = [Path(r) for r in roots]
        self.broker = broker or EventBroker()
        self._observer: Optional[Observer] = None
        self._heartbeat_thread: Optional[threading.Thread] = None
        self._stop_event = threading.Event()

    def start(self) -> None:
        """Start the watchdog Observer and heartbeat thread."""
        if self._observer is not None:
            return  # already started

        # Snapshot every path that currently exists under each root so the
        # handler can suppress the FSEvents cold-start backlog. Walk is
        # bounded by EXCLUDED_PATH_PARTS to avoid descending into
        # __pycache__/_meta/etc. On typical project trees this is cheap
        # (<100ms for thousands of files); on very large trees it's a
        # one-time cost at startup.
        #
        # Paths are stored in BOTH raw (os.walk) and resolved
        # (Path.resolve) form because macOS reports watchdog events under
        # /private/var/... while os.walk of a /var/... root returns
        # /var/... and vice versa. Storing both ensures the in-check
        # matches regardless of which form the emitter uses.
        existing: set[str] = set()
        for root in self._roots:
            if not root.exists():
                continue
            try:
                walk_root = str(root)
                for dirpath, dirnames, filenames in os.walk(walk_root):
                    # Prune excluded dirs in-place so os.walk skips them.
                    dirnames[:] = [d for d in dirnames if d not in EXCLUDED_PATH_PARTS]
                    existing.add(dirpath)
                    try:
                        existing.add(str(Path(dirpath).resolve()))
                    except OSError:
                        pass
                    for fn in filenames:
                        if fn in EXCLUDED_PATH_PARTS:
                            continue
                        fp = str(Path(dirpath) / fn)
                        existing.add(fp)
                        try:
                            existing.add(str(Path(fp).resolve()))
                        except OSError:
                            pass
            except OSError:
                logger.exception("FsWatcher failed to snapshot root: %s", root)

        # Build the list of normalized watch-root strings for the
        # in-root check. Include both the raw and resolved form so macOS
        # /private/var symlink differences don't cause false negatives.
        normalized_roots: list[str] = []
        for root in self._roots:
            if not root.exists():
                continue
            normalized_roots.append(str(root))
            try:
                normalized_roots.append(str(root.resolve()))
            except OSError:
                pass
        handler = _DebounceHandler(
            self.broker,
            existing_paths=existing,
            roots=tuple(normalized_roots),
        )
        self._observer = Observer()
        for root in self._roots:
            if root.exists():
                self._observer.schedule(handler, str(root), recursive=True)
                logger.info("FsWatcher scheduled root: %s", root)
            else:
                logger.warning("FsWatcher root does not exist: %s", root)

        self._observer.start()

        self._stop_event.clear()
        self._heartbeat_thread = threading.Thread(
            target=self._heartbeat_loop, daemon=True, name="FsWatcher-heartbeat"
        )
        self._heartbeat_thread.start()

    def stop(self) -> None:
        """Stop the observer and heartbeat thread cleanly."""
        self._stop_event.set()
        if self._observer is not None:
            self._observer.stop()
            self._observer.join(timeout=5.0)
            self._observer = None
        if self._heartbeat_thread is not None:
            self._heartbeat_thread.join(timeout=2.0)
            self._heartbeat_thread = None

    def _heartbeat_loop(self) -> None:
        """Emit a HEARTBEAT event every HEARTBEAT_SECONDS until stopped."""
        while not self._stop_event.wait(HEARTBEAT_SECONDS):
            now = time.time()
            hb = FsEvent(
                event_id=f"hb_{int(now * 1000)}",
                event_type=FsEventType.HEARTBEAT,
                path="",
                project=None,
                asset_type=None,
                asset_id=None,
                src_path=None,
                is_directory=False,
                size_bytes=None,
                sha256=None,
                mtime=None,
                ts=now,
            )
            try:
                self.broker.publish(hb)
            except Exception:
                logger.exception("FsWatcher heartbeat publish failed")
