"""Best-effort project write lease for cross-machine state writes.

The lease file lives at ``<projects_root>/<project>/_pipeline/state/.write_lease.json``
and guards both PassStore and ExecutionStore write entries.

This is explicitly not an atomic mutex across Dropbox-synced machines. Two
machines that both observe an absent or expired lease inside one sync interval
can both acquire it. The contract is narrower: catch the operationally common
hazard where one machine is actively working a project and a second machine
later attempts to write, by making that later write fail loudly unless the
operator overrides it. The residual first-write race is accepted; conflicted
copy detection is a separate follow-up.

The TTL is four hours and there is no heartbeat thread. This is valid because
active store writes happen at least once per take completion, typically minutes
apart, so normal active runs renew the lease well inside the TTL.
"""

from __future__ import annotations

import json
import logging
import os
import socket
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from recoil.core.paths import ProjectPaths

logger = logging.getLogger(__name__)

LEASE_TTL_S = 14_400
FUTURE_SKEW_TOLERANCE_S = 300
OVERRIDE_ENV = "RECOIL_STATE_LEASE_OVERRIDE"


class StateLeaseHeldError(RuntimeError):
    """Raised when another host holds a fresh project state write lease."""


def _lease_path(project: str) -> Path:
    return ProjectPaths.for_project(project).state_dir / ".write_lease.json"


def _now() -> datetime:
    return datetime.now(timezone.utc)


def _parse_acquired_at(raw: Any) -> datetime | None:
    if not isinstance(raw, str):
        return None
    try:
        value = raw
        if value.endswith("Z"):
            value = f"{value[:-1]}+00:00"
        parsed = datetime.fromisoformat(value)
    except ValueError:
        return None
    if parsed.tzinfo is None:
        parsed = parsed.replace(tzinfo=timezone.utc)
    return parsed.astimezone(timezone.utc)


def _read_lease(path: Path) -> dict[str, Any] | None:
    if not path.is_file():
        return None
    with path.open("r", encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, dict):
        return None
    return data


def _write_lease(path: Path, hostname: str, now: datetime) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    payload = {
        "hostname": hostname,
        "pid": os.getpid(),
        "acquired_at": now.isoformat(),
        "ttl_s": LEASE_TTL_S,
    }
    fd, tmp = tempfile.mkstemp(
        dir=str(path.parent), prefix=".tmp_write_lease_", suffix=".json"
    )
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            json.dump(payload, f, indent=2)
        os.replace(tmp, str(path))
    except Exception:
        try:
            os.unlink(tmp)
        except OSError:
            pass
        raise


def _held_message(
    *,
    project: str,
    holder: str,
    age_s: float,
    ttl_s: int,
    clock_anomaly: bool,
) -> str:
    age_desc = f"{age_s:.0f}s"
    anomaly = " Clock anomaly: acquired_at is more than 5 minutes in the future." if clock_anomaly else ""
    return (
        f"PROJECT STATE WRITE LEASE HELD for {project!r} by host {holder!r} "
        f"(age {age_desc}, ttl {ttl_s}s). Refusing to write because another "
        f"machine may be actively updating this project.{anomaly} Set "
        f"{OVERRIDE_ENV}=1 only after verifying the other machine is idle."
    )


def ensure_write_lease(project: str) -> None:
    """Acquire or renew this host's best-effort write lease for ``project``.

    If the lease is absent, expired, or already owned by this hostname, it is
    atomically rewritten with a fresh timestamp. A fresh lease held by another
    host raises StateLeaseHeldError unless ``RECOIL_STATE_LEASE_OVERRIDE=1`` is
    set, in which case the takeover is logged and the lease is rewritten.
    """

    path = _lease_path(project)
    hostname = socket.gethostname()
    now = _now()
    lease = _read_lease(path)
    if lease is None:
        _write_lease(path, hostname, now)
        return

    holder = str(lease.get("hostname") or "<unknown>")
    ttl_s = lease.get("ttl_s", LEASE_TTL_S)
    if not isinstance(ttl_s, int) or ttl_s <= 0:
        ttl_s = LEASE_TTL_S

    acquired_at = _parse_acquired_at(lease.get("acquired_at"))
    if acquired_at is None:
        age_s = float("inf")
        expired = True
        clock_anomaly = False
    else:
        age_s = (now - acquired_at).total_seconds()
        clock_anomaly = age_s < -FUTURE_SKEW_TOLERANCE_S
        expired = age_s > ttl_s and not clock_anomaly

    if holder == hostname or expired:
        _write_lease(path, hostname, now)
        return

    if os.environ.get(OVERRIDE_ENV) == "1":
        logger.warning(
            "Overriding project state write lease for %s held by %s; %s=1",
            project,
            holder,
            OVERRIDE_ENV,
        )
        _write_lease(path, hostname, now)
        return

    raise StateLeaseHeldError(
        _held_message(
            project=project,
            holder=holder,
            age_s=age_s,
            ttl_s=ttl_s,
            clock_anomaly=clock_anomaly,
        )
    )


__all__ = [
    "StateLeaseHeldError",
    "ensure_write_lease",
]
