"""Atomic write SSOT — Law 1, Law 5.

Phase 20 home for the atomic write/append helpers. tmp file → fsync (EINVAL-
tolerant for cloud-synced dirs like Dropbox/iCloud) → os.replace. JSONL
append uses fcntl advisory lock for concurrent-writer safety.

The 5 prior local impls (pipeline/core/persistence.py, api/adapters/beats.py,
workspace/verdict.py, workspace/sidecar.py, core/sidecar_writer.py) collapse
into this module. Each prior site gets a thin import + delegation in the
phase that swaps it.
"""
from __future__ import annotations

import contextlib
import errno
import fcntl
import json
import os
import tempfile
from pathlib import Path
from typing import Any


def atomic_write_text(path: Path, content: str, *, encoding: str = "utf-8") -> None:
    """Atomic POSIX write: tmp → fsync (EINVAL-tolerant) → os.replace.

    Tolerates fsync EINVAL on cloud-synced dirs (Dropbox / iCloud); the
    os.replace below is still atomic. Crash recovery loses at most the
    in-flight write.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(dir=path.parent, prefix=f".{path.name}.", suffix=".tmp")
    try:
        with os.fdopen(fd, "w", encoding=encoding) as f:
            f.write(content)
            try:
                f.flush()
                os.fsync(f.fileno())
            except OSError as e:
                if e.errno != errno.EINVAL:
                    raise
        os.replace(tmp_path, path)
    except Exception:
        try:
            os.unlink(tmp_path)
        except FileNotFoundError:
            pass
        raise


def atomic_write_bytes(path: Path, data: bytes) -> None:
    """Atomic POSIX byte write: tmp → fsync (EINVAL-tolerant) → os.replace."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(dir=path.parent, prefix=f".{path.name}.", suffix=".tmp")
    try:
        with os.fdopen(fd, "wb") as f:
            f.write(data)
            try:
                f.flush()
                os.fsync(f.fileno())
            except OSError as e:
                if e.errno != errno.EINVAL:
                    raise
        os.replace(tmp_path, path)
    except Exception:
        try:
            os.unlink(tmp_path)
        except FileNotFoundError:
            pass
        raise


def atomic_write_json(path: Path, data: Any, *, indent: int = 2) -> None:
    """JSON-serialize via json.dumps + atomic_write_text."""
    atomic_write_text(path, json.dumps(data, indent=indent, ensure_ascii=False))


@contextlib.contextmanager
def with_sidecar_lock(media_path: Path):
    """Acquire the per-sidecar fcntl.flock for a critical section.

    R6 Phase 1 — shared helper for both pipeline sidecar writes and workspace
    sidecar set_status calls. Key on a hidden sibling file
    `.{media_path.name}.json.lock` (matches the existing convention at
    `recoil/workspace/sidecar.py:143-145` and `recoil/core/sidecar_writer.py:68-74`
    so cross-module concurrent writes interlock via fcntl.flock).

    The lock file is intentionally NOT unlinked on exit — unlinking races with
    concurrent acquirers and the workspace's existing lock convention does not
    unlink either. Lock files are sub-1KB; cumulative cost is negligible.

    Releases the flock on every exit path (including exceptions).
    """
    lock_path = media_path.parent / f".{media_path.name}.json.lock"
    lock_path.parent.mkdir(parents=True, exist_ok=True)
    lock_fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR)
    try:
        fcntl.flock(lock_fd, fcntl.LOCK_EX)
        yield
    finally:
        fcntl.flock(lock_fd, fcntl.LOCK_UN)
        os.close(lock_fd)


def jsonl_append_locked(path: Path, record: dict) -> None:
    """Append one JSON line to a JSONL file with fcntl advisory lock.

    Cross-process safe: the lock blocks concurrent writers from
    interleaving partial lines.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    line = json.dumps(record, ensure_ascii=False) + "\n"
    with open(path, "a", encoding="utf-8") as f:
        try:
            fcntl.flock(f.fileno(), fcntl.LOCK_EX)
            f.write(line)
            f.flush()
            try:
                os.fsync(f.fileno())
            except OSError:
                pass
        finally:
            fcntl.flock(f.fileno(), fcntl.LOCK_UN)


__all__ = [
    "atomic_write_bytes",
    "atomic_write_text",
    "atomic_write_json",
    "jsonl_append_locked",
    "with_sidecar_lock",
]
