from __future__ import annotations

import json
import os
import socket
from datetime import timedelta

import pytest

from recoil.execution.state_lease import (
    LEASE_TTL_S,
    OVERRIDE_ENV,
    StateLeaseHeldError,
    _now,
    ensure_write_lease,
)


def _project_root(tmp_path):
    root = tmp_path / "projects"
    root.mkdir()
    (root / ".recoil-data-root").write_text("recoil-data-root\n", encoding="utf-8")
    project_root = root / "leaseproj"
    project_root.mkdir()
    return root, project_root


def _lease_path(project_root):
    return project_root / "_pipeline" / "state" / ".write_lease.json"


def _write_lease(project_root, *, hostname, acquired_at, ttl_s=LEASE_TTL_S):
    path = _lease_path(project_root)
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(
        json.dumps(
            {
                "hostname": hostname,
                "pid": 123,
                "acquired_at": acquired_at.isoformat(),
                "ttl_s": ttl_s,
            }
        ),
        encoding="utf-8",
    )
    return path


@pytest.fixture()
def project(monkeypatch, tmp_path):
    root, project_root = _project_root(tmp_path)
    monkeypatch.setenv("RECOIL_PROJECTS_ROOT", str(root))
    monkeypatch.delenv(OVERRIDE_ENV, raising=False)
    return "leaseproj", project_root


def test_acquire_when_absent(project):
    project_name, project_root = project

    ensure_write_lease(project_name)

    lease = json.loads(_lease_path(project_root).read_text(encoding="utf-8"))
    assert lease["hostname"] == socket.gethostname()
    assert lease["pid"] == os.getpid()
    assert lease["ttl_s"] == LEASE_TTL_S


def test_renew_when_own(project):
    project_name, project_root = project
    old = _now() - timedelta(seconds=60)
    path = _write_lease(project_root, hostname=socket.gethostname(), acquired_at=old)

    ensure_write_lease(project_name)

    lease = json.loads(path.read_text(encoding="utf-8"))
    assert lease["hostname"] == socket.gethostname()
    assert lease["pid"] == os.getpid()
    assert lease["acquired_at"] != old.isoformat()


def test_raise_when_other_host_fresh(project, monkeypatch):
    project_name, project_root = project
    monkeypatch.setattr(socket, "gethostname", lambda: "this-host")
    _write_lease(
        project_root,
        hostname="other-host",
        acquired_at=_now() - timedelta(seconds=60),
    )

    with pytest.raises(StateLeaseHeldError) as exc:
        ensure_write_lease(project_name)

    message = str(exc.value)
    assert "other-host" in message
    assert "age" in message
    assert OVERRIDE_ENV in message


def test_pass_when_other_host_expired(project, monkeypatch):
    project_name, project_root = project
    monkeypatch.setattr(socket, "gethostname", lambda: "this-host")
    path = _write_lease(
        project_root,
        hostname="other-host",
        acquired_at=_now() - timedelta(seconds=LEASE_TTL_S + 1),
    )

    ensure_write_lease(project_name)

    lease = json.loads(path.read_text(encoding="utf-8"))
    assert lease["hostname"] == "this-host"


def test_override_env(project, monkeypatch, caplog):
    project_name, project_root = project
    monkeypatch.setattr(socket, "gethostname", lambda: "this-host")
    monkeypatch.setenv(OVERRIDE_ENV, "1")
    path = _write_lease(
        project_root,
        hostname="other-host",
        acquired_at=_now() - timedelta(seconds=60),
    )

    ensure_write_lease(project_name)

    lease = json.loads(path.read_text(encoding="utf-8"))
    assert lease["hostname"] == "this-host"
    assert "Overriding project state write lease" in caplog.text


def test_future_skew_tolerance(project, monkeypatch):
    project_name, project_root = project
    monkeypatch.setattr(socket, "gethostname", lambda: "this-host")
    path = _write_lease(
        project_root,
        hostname="other-host",
        acquired_at=_now() + timedelta(minutes=4),
    )

    with pytest.raises(StateLeaseHeldError) as exc:
        ensure_write_lease(project_name)

    assert "Clock anomaly" not in str(exc.value)
    beyond_tolerance = _now() + timedelta(minutes=6)
    _write_lease(project_root, hostname="other-host", acquired_at=beyond_tolerance)

    with pytest.raises(StateLeaseHeldError) as exc:
        ensure_write_lease(project_name)

    assert "Clock anomaly" in str(exc.value)
    lease = json.loads(path.read_text(encoding="utf-8"))
    assert lease["hostname"] == "other-host"


def test_atomic_write_no_partial_file_on_crash_injection(project, monkeypatch):
    project_name, project_root = project

    def crash_replace(_src, _dst):
        raise RuntimeError("crash injection")

    monkeypatch.setattr("recoil.execution.state_lease.os.replace", crash_replace)

    with pytest.raises(RuntimeError, match="crash injection"):
        ensure_write_lease(project_name)

    path = _lease_path(project_root)
    assert not path.exists()
    assert list(path.parent.glob(".tmp_write_lease_*.json")) == []
