#!/usr/bin/env bash
# Serialized dispatch run reaper/retry actor.
#
# Timing knobs:
# - DEAD_THRESHOLD_S defaults to 2400s (40 min) and remains env-overridable.
#   It is the conservative heartbeat-staleness outer bound: once an attempt's
#   heartbeat is older than this, liveness_status reports stale_dead even when
#   the reaper cannot otherwise observe the orchestrator. This is the fallback
#   for truly unobservable runs, not the normal fast-recovery path.
# - FRESH_DEAD_GRACE_S defaults to 300s (5 min) and controls Phase 3's
#   ambiguous-dead escalation. A fresh heartbeat whose pid/tmux evidence is
#   neither provably alive nor provably confirmed-dead stays in fresh_dead until
#   this grace has elapsed, then escalates to ZOMBIE_SUSPECT.
# - launchd StartInterval is 120s for the installed reaper, so each state
#   transition normally advances on the next two-minute tick. Expected
#   ambiguous-dead recovery is roughly FRESH_DEAD_GRACE_S plus one tick to mark
#   ZOMBIE_SUSPECT, one more tick to mark ZOMBIE_REAPED, and then the retry
#   backoff: attempt 1 retries immediately on that tick, attempt 2 waits 120s,
#   later attempts wait 600s. The stale_dead fallback can still take up to
#   DEAD_THRESHOLD_S plus the same SUSPECT->REAPED->backoff cycle.
set -uo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
STATUS_TOOL="$SCRIPT_DIR/dispatch_status.py"
NOTIFY_TOOL="${NOTIFY_TOOL:-$SCRIPT_DIR/dispatch_notify.sh}"
SESSION_WORKSPACE="${SESSION_WORKSPACE:-$HOME/CLAUDE_PROJECTS/recoil/pipeline/tools/session_workspace.sh}"
WORKTREE_REAP_TTL_HOURS="${WORKTREE_REAP_TTL_HOURS:-24}"
# Re-dispatch uses the WORKTREE's own harness (derived per-run in
# launch_orchestrator), so the reaper can live in a stable, branch-independent
# home (~/.recoil/bin) without bundling the 1745-line orchestrator and without
# depending on whatever branch the shared checkout is sitting on. REAPER_HARNESS
# overrides this (used by the test stub).
HARNESS_OVERRIDE="${REAPER_HARNESS:-}"
RUNS_ROOT="${DISPATCH_RUNS_ROOT:-$HOME/.recoil/dispatch-runs}"
REAPER_LOCK="${REAPER_LOCK:-$HOME/.recoil/dispatch-reaper.lock}"
DEAD_THRESHOLD_S="${DEAD_THRESHOLD_S:-2400}"
FRESH_DEAD_GRACE_S="${FRESH_DEAD_GRACE_S:-300}"
REAPER_DRY_RUN="${REAPER_DRY_RUN:-0}"
WATCH_INTERVAL_S="${WATCH_INTERVAL_S:-60}"

TERMINAL_STATES=" CONVERGED_PR_CREATED CAPPED_NEEDS_HUMAN "

log() {
  printf '%s dispatch_reaper: %s\n' "$(date -u '+%Y-%m-%dT%H:%M:%SZ')" "$*" >&2
}

json_get() {
  local file="$1" key="$2"
  python3 - "$file" "$key" <<'PY'
import json
import sys

with open(sys.argv[1], "r", encoding="utf-8") as handle:
    data = json.load(handle)
value = data
for part in sys.argv[2].split("."):
    if isinstance(value, dict):
        value = value.get(part)
    else:
        value = None
        break
if value is True:
    print("true")
elif value is False:
    print("false")
elif value is None:
    print("")
else:
    print(value)
PY
}

json_epoch() {
  local file="$1" key="$2"
  python3 - "$file" "$key" <<'PY'
import datetime as dt
import json
import sys

with open(sys.argv[1], "r", encoding="utf-8") as handle:
    data = json.load(handle)
value = data.get(sys.argv[2])
if not value:
    print(0)
    raise SystemExit
text = str(value)
if text.endswith("Z"):
    text = text[:-1] + "+00:00"
try:
    parsed = dt.datetime.fromisoformat(text)
except ValueError:
    print(0)
    raise SystemExit
if parsed.tzinfo is None:
    parsed = parsed.replace(tzinfo=dt.timezone.utc)
print(int(parsed.timestamp()))
PY
}

now_epoch() {
  python3 - <<'PY'
import time
print(int(time.time()))
PY
}

shell_quote() {
  python3 - "$@" <<'PY'
import shlex
import sys
print(" ".join(shlex.quote(arg) for arg in sys.argv[1:]))
PY
}

fresh_dead_grace_elapsed() {
  local heartbeat="$1" now="$2"
  local updated_epoch heartbeat_age
  [ -f "$heartbeat" ] || return 1
  updated_epoch="$(json_epoch "$heartbeat" updated_at)"
  [ "$updated_epoch" -gt 0 ] || return 1
  heartbeat_age=$((now - updated_epoch))
  [ "$heartbeat_age" -ge "$FRESH_DEAD_GRACE_S" ]
}

attempt_dir_for() {
  local run_dir="$1" attempt="$2"
  printf '%s/attempt-%03d\n' "$run_dir" "$attempt"
}

notify_run() {
  local run_dir="$1" event="$2" message="$3"
  # Reaper notifications are terminal/attention events (converged PR, capped,
  # re-dispatch) — send high priority so they actually buzz, distinct from the
  # silent low-priority per-phase progress pings the harness emits.
  bash "$NOTIFY_TOOL" notify --run-dir "$run_dir" --event "$event" --message "$message" --priority high >/dev/null 2>&1 || true
}

project_dirty() {
  local run_dir="$1"
  bash "$NOTIFY_TOOL" project-dirty --run-dir "$run_dir" >/dev/null 2>&1 || true
}

transition_state() {
  local run_dir="$1" state="$2"
  shift 2
  python3 "$STATUS_TOOL" transition --run-dir "$run_dir" --state "$state" "$@" >/dev/null
}

terminal_signature() {
  local terminal="$1"
  local sig
  sig="$(json_get "$terminal" failure_signature 2>/dev/null || true)"
  if [ -n "$sig" ] && [ "$sig" != "null" ]; then
    printf '%s\n' "$sig"
    return 0
  fi
  python3 "$STATUS_TOOL" signature --terminal-status "$terminal"
}

heartbeat_fresh() {
  local heartbeat="$1" now="$2" updated updated_epoch age
  [ -f "$heartbeat" ] || return 1
  updated="$(json_get "$heartbeat" updated_at 2>/dev/null || true)"
  [ -n "$updated" ] || return 1
  updated_epoch="$(json_epoch "$heartbeat" updated_at 2>/dev/null || echo 0)"
  age=$((now - updated_epoch))
  [ "$updated_epoch" -gt 0 ] && [ "$age" -lt "$DEAD_THRESHOLD_S" ]
}

tmux_session_exists() {
  local session="$1"
  [ -n "$session" ] || return 1
  command -v tmux >/dev/null 2>&1 || return 1
  tmux has-session -t "$session" >/dev/null 2>&1
}

pid_identity_alive() {
  local heartbeat="$1" run_id="$2"
  local pid expected_start actual_start ps_match cmdline tmux_session
  [ -f "$heartbeat" ] || return 1
  pid="$(json_get "$heartbeat" orchestrator_pid 2>/dev/null || true)"
  expected_start="$(json_get "$heartbeat" orchestrator_start_time 2>/dev/null || true)"
  ps_match="$(json_get "$heartbeat" ps_match 2>/dev/null || true)"
  tmux_session="$(json_get "$heartbeat" tmux_session 2>/dev/null || true)"
  [ -n "$pid" ] || return 1
  case "$pid" in *[!0-9]*|"") return 1 ;; esac
  kill -0 "$pid" 2>/dev/null || return 1
  actual_start="$(ps -p "$pid" -o lstart= 2>/dev/null | sed 's/^ *//;s/ *$//')"
  [ -n "$actual_start" ] && [ "$actual_start" = "$expected_start" ] || return 1
  cmdline="$(ps -p "$pid" -o command= 2>/dev/null || true)"
  [ -n "$cmdline" ] || return 1
  [ -z "$ps_match" ] || case "$cmdline" in *"$ps_match"*) ;; *) return 1 ;; esac
  case "$cmdline" in *"$run_id"*) ;; *) return 1 ;; esac
  tmux_session_exists "$tmux_session" || return 1
  return 0
}

orchestrator_confirmed_dead() {
  local heartbeat="$1"
  local pid tmux_session
  [ -f "$heartbeat" ] || return 1
  pid="$(json_get "$heartbeat" orchestrator_pid 2>/dev/null || true)"
  tmux_session="$(json_get "$heartbeat" tmux_session 2>/dev/null || true)"
  [ -n "$pid" ] || return 1
  case "$pid" in *[!0-9]*|"") return 1 ;; esac
  [ -n "$tmux_session" ] || return 1
  command -v tmux >/dev/null 2>&1 || return 1
  tmux has-session -t "$tmux_session" >/dev/null 2>&1 && return 1
  kill -0 "$pid" 2>/dev/null && return 1
  return 0
}

liveness_status() {
  local run_dir="$1" attempt="$2" now="$3"
  local status_file="$run_dir/status.json" run_id attempt_dir heartbeat
  run_id="$(json_get "$status_file" run_id)"
  attempt_dir="$(attempt_dir_for "$run_dir" "$attempt")"
  heartbeat="$attempt_dir/heartbeat.json"
  if heartbeat_fresh "$heartbeat" "$now" && pid_identity_alive "$heartbeat" "$run_id"; then
    printf '%s\n' "alive"
  elif heartbeat_fresh "$heartbeat" "$now" && orchestrator_confirmed_dead "$heartbeat"; then
    printf '%s\n' "fresh_confirmed_dead"
  elif heartbeat_fresh "$heartbeat" "$now"; then
    printf '%s\n' "fresh_dead"
  else
    printf '%s\n' "stale_dead"
  fi
}

start_heartbeat_watcher() {
  local run_dir="$1" attempt="$2" pid="$3" start_time="$4" session="$5" log_path="$6"
  # Delegate to the shared bound-watcher entrypoint (SSOT — the same one /dispatch
  # uses for attempt 1). It writes the initial heartbeat synchronously and
  # self-daemonizes the WATCH_INTERVAL_S refresh loop, so the subsequent
  # `transition ATTEMPT_RUNNING` never races ahead of first liveness.
  WATCH_INTERVAL_S="$WATCH_INTERVAL_S" bash "$SCRIPT_DIR/dispatch_heartbeat_watch.sh" \
    "$run_dir" "$attempt" "$pid" "$start_time" "$session" "$log_path"
}

session_name_for() {
  local run_dir="$1" attempt="$2" run_id
  run_id="$(json_get "$run_dir/status.json" run_id)"
  printf '%s-attempt-%03d\n' "$run_id" "$attempt"
}

launch_orchestrator() {
  local run_dir="$1" reason="$2"
  local status_file="$run_dir/status.json" attempt worktree spec attempt_dir log_path session cmd pid start_time
  attempt="$(json_get "$status_file" attempt)"
  worktree="$(json_get "$status_file" worktree)"
  spec="$(json_get "$status_file" spec)"
  attempt_dir="$(attempt_dir_for "$run_dir" "$attempt")"
  mkdir -p "$attempt_dir"
  log_path="$attempt_dir/build-log.md"
  session="$(session_name_for "$run_dir" "$attempt")"

  if [ "$REAPER_DRY_RUN" = "1" ]; then
    log "DRY_RUN: would launch attempt $attempt for $(json_get "$status_file" run_id)"
    transition_state "$run_dir" ATTEMPT_RUNNING
    return 0
  fi

  if ! command -v tmux >/dev/null 2>&1; then
    log "tmux unavailable for $run_dir"
    transition_state "$run_dir" CAPPED_NEEDS_HUMAN
    notify_run "$run_dir" capped "needs human: tmux unavailable"
    return 1
  fi

  if ! tmux has-session -t "$session" >/dev/null 2>&1; then
    local harness="${HARNESS_OVERRIDE:-$worktree/recoil/pipeline/tools/harness_orchestrator.sh}"
    cmd="cd $(shell_quote "$worktree") && exec env DISPATCH_RUN_DIR=$(shell_quote "$run_dir") DISPATCH_ATTEMPT=$(shell_quote "$attempt") bash $(shell_quote "$harness") --coder codex --dir $(shell_quote "$worktree") --log $(shell_quote "$log_path") --no-codex-spec-review $(shell_quote "$spec")"
    tmux new-session -d -s "$session" "$cmd" || return 1
  fi

  pid="$(tmux display-message -p -t "$session" "#{pane_pid}" 2>/dev/null || true)"
  if [ -z "$pid" ]; then
    log "could not determine orchestrator pid for $session"
    return 1
  fi
  start_time=""
  for _ in 1 2 3 4 5; do
    start_time="$(ps -p "$pid" -o lstart= 2>/dev/null | sed 's/^ *//;s/ *$//')"
    [ -n "$start_time" ] && break
    sleep 1
  done
  if [ -z "$start_time" ]; then
    log "could not determine orchestrator start time for $pid"
    return 1
  fi
  start_heartbeat_watcher "$run_dir" "$attempt" "$pid" "$start_time" "$session" "$log_path"
  transition_state "$run_dir" ATTEMPT_RUNNING
  notify_run "$run_dir" re-dispatch "re-dispatching $attempt/$(json_get "$status_file" max_attempts): $reason"
  return 0
}

verify_and_reset_worktree() {
  local run_dir="$1"
  local status_file="$run_dir/status.json" worktree branch expected_commit current_branch
  worktree="$(json_get "$status_file" worktree)"
  branch="$(json_get "$status_file" branch)"
  expected_commit="$(json_get "$status_file" last_validated_commit)"
  if [ ! -d "$worktree" ]; then
    log "worktree missing for $run_dir: $worktree"
    transition_state "$run_dir" CAPPED_NEEDS_HUMAN
    notify_run "$run_dir" capped "needs human: worktree missing"
    return 1
  fi
  current_branch="$(git -C "$worktree" rev-parse --abbrev-ref HEAD 2>/dev/null || true)"
  if [ "$current_branch" != "$branch" ]; then
    log "branch mismatch for $run_dir: expected $branch got ${current_branch:-unknown}"
    transition_state "$run_dir" CAPPED_NEEDS_HUMAN
    notify_run "$run_dir" capped "needs human: branch mismatch"
    return 1
  fi
  verify_lease_owner "$run_dir" "$worktree" || return 1
  git -C "$worktree" reset --hard "$expected_commit" >/dev/null || {
    log "reset failed for $run_dir at $expected_commit"
    transition_state "$run_dir" CAPPED_NEEDS_HUMAN
    notify_run "$run_dir" capped "needs human: reset failed"
    return 1
  }
  return 0
}

verify_lease_owner() {
  local run_dir="$1" worktree="$2" lease_file="$worktree/.session-lease.json"
  local result
  [ -f "$lease_file" ] || return 0
  result="$(python3 - "$lease_file" <<'PY'
import datetime as dt
import json
import socket
import subprocess
import sys

def local_host():
    try:
        value = subprocess.check_output(["scutil", "--get", "LocalHostName"], text=True, stderr=subprocess.DEVNULL).strip()
        if value:
            return value
    except Exception:
        pass
    return socket.gethostname().split(".")[0]

try:
    with open(sys.argv[1], "r", encoding="utf-8") as handle:
        data = json.load(handle)
except Exception:
    print("invalid")
    raise SystemExit

expires = str(data.get("expires_at") or "")
if expires.endswith("Z"):
    expires = expires[:-1] + "+00:00"
try:
    expires_at = dt.datetime.fromisoformat(expires)
except Exception:
    print("invalid")
    raise SystemExit
if expires_at.tzinfo is None:
    expires_at = expires_at.replace(tzinfo=dt.timezone.utc)
if expires_at < dt.datetime.now(dt.timezone.utc):
    print("expired")
elif str(data.get("host") or "") == local_host():
    print("ours")
else:
    print("other")
PY
)"
  case "$result" in
    ours) return 0 ;;
    expired)
      log "lease expired for $run_dir"
      ;;
    *)
      log "lease owner check failed for $run_dir: $result"
      ;;
  esac
  transition_state "$run_dir" CAPPED_NEEDS_HUMAN
  notify_run "$run_dir" capped "needs human: lease owner mismatch"
  return 1
}

other_live_supervisor_exists() {
  local run_dir="$1" now="$2" status_file="$run_dir/status.json"
  local worktree branch other other_status other_state other_attempt
  worktree="$(json_get "$status_file" worktree)"
  branch="$(json_get "$status_file" branch)"
  [ -d "$RUNS_ROOT" ] || return 1
  for other in "$RUNS_ROOT"/*; do
    [ -d "$other" ] || continue
    [ "$other" = "$run_dir" ] && continue
    other_status="$other/status.json"
    [ -f "$other_status" ] || continue
    other_state="$(json_get "$other_status" state 2>/dev/null || true)"
    case "$TERMINAL_STATES" in *" $other_state "*) continue ;; esac
    if [ "$(json_get "$other_status" worktree 2>/dev/null || true)" = "$worktree" ] || [ "$(json_get "$other_status" branch 2>/dev/null || true)" = "$branch" ]; then
      other_attempt="$(json_get "$other_status" attempt 2>/dev/null || echo 1)"
      [ "$(liveness_status "$other" "$other_attempt" "$now")" = "alive" ] && return 0
    fi
  done
  return 1
}

begin_retry() {
  local run_dir="$1" reason="$2" signature="$3" zombie="$4" now="$5"
  local status_file="$run_dir/status.json" attempt max_attempts updated_epoch delay age
  attempt="$(json_get "$status_file" attempt)"
  max_attempts="$(json_get "$status_file" max_attempts)"
  if [ "$attempt" -ge "$max_attempts" ]; then
    transition_state "$run_dir" CAPPED_NEEDS_HUMAN
    notify_run "$run_dir" capped "needs human: attempt cap reached"
    return 0
  fi
  case "$attempt" in
    1) delay=0 ;;
    2) delay=120 ;;
    *) delay=600 ;;
  esac
  updated_epoch="$(json_epoch "$status_file" updated_at)"
  age=$((now - updated_epoch))
  # `now` is captured once at scan start; a same-scan state transition (e.g.
  # ZOMBIE_SUSPECT->ZOMBIE_REAPED) writes a fresh `updated_at` a few ms later,
  # so age can go negative. Clamp to 0 — a negative age means "updated in the
  # future" (clock artifact), and must not defer a delay=0 retry.
  [ "$age" -lt 0 ] && age=0
  [ "$age" -ge "$delay" ] || return 0

  verify_and_reset_worktree "$run_dir" || return 0
  if other_live_supervisor_exists "$run_dir" "$now"; then
    transition_state "$run_dir" CAPPED_NEEDS_HUMAN
    notify_run "$run_dir" capped "needs human: another live supervisor exists"
    return 0
  fi
  if [ "$zombie" = "true" ]; then
    python3 "$STATUS_TOOL" retry-start --run-dir "$run_dir" --zombie >/dev/null || {
      transition_state "$run_dir" CAPPED_NEEDS_HUMAN
      notify_run "$run_dir" capped "needs human: retry cap reached"
      return 0
    }
  else
    python3 "$STATUS_TOOL" retry-start --run-dir "$run_dir" --failure-signature "$signature" >/dev/null || {
      transition_state "$run_dir" CAPPED_NEEDS_HUMAN
      notify_run "$run_dir" capped "needs human: retry cap reached"
      return 0
    }
  fi
  launch_orchestrator "$run_dir" "$reason" || true
}

continue_pending_launch() {
  local run_dir="$1" reason="$2" now="$3"
  verify_and_reset_worktree "$run_dir" || return 0
  if other_live_supervisor_exists "$run_dir" "$now"; then
    transition_state "$run_dir" CAPPED_NEEDS_HUMAN
    notify_run "$run_dir" capped "needs human: another live supervisor exists"
    return 0
  fi
  launch_orchestrator "$run_dir" "$reason" || true
}

session_workspace_ready() {
  local run_dir="${1:-}" context="${2:-worktree reap}"
  if [ -x "$SESSION_WORKSPACE" ]; then
    return 0
  fi
  log "$context skipped: SESSION_WORKSPACE missing/non-executable: $SESSION_WORKSPACE"
  if [ -n "$run_dir" ]; then
    notify_run "$run_dir" worktree-reap-skipped "worktree reap skipped: SESSION_WORKSPACE missing/non-executable: $SESSION_WORKSPACE"
  fi
  return 1
}

reap_run_worktree() {
  local run_dir="$1" worktree
  worktree="$(json_get "$run_dir/status.json" worktree 2>/dev/null || true)"
  [ -n "$worktree" ] || return 0
  [ -d "$worktree" ] || return 0
  session_workspace_ready "$run_dir" "completion worktree reap" || return 0
  if bash "$SESSION_WORKSPACE" reap-one --worktree "$worktree" >/dev/null 2>&1; then
    log "reaped completed worktree for $run_dir: $worktree"
  else
    log "kept worktree for $run_dir (unsafe to reap): $worktree"
    notify_run "$run_dir" worktree-kept "worktree retained (unpushed/dirty work): $worktree"
  fi
}

handle_terminal_status() {
  local run_dir="$1" terminal="$2" now="$3"
  local classification sig pr_url commit
  classification="$(python3 "$STATUS_TOOL" classify --run-dir "$run_dir" --terminal-status "$terminal")" || return 0
  case "$classification" in
    CONVERGED)
      pr_url="$(json_get "$terminal" pr_url 2>/dev/null || true)"
      commit="$(json_get "$terminal" commit 2>/dev/null || true)"
      transition_state "$run_dir" CONVERGED_PR_CREATED --pr-url "$pr_url" --commit "$commit"
      notify_run "$run_dir" converged "converged: PR created"
      reap_run_worktree "$run_dir"
      ;;
    DETERMINISTIC|CAPPED_BUDGET)
      sig="$(terminal_signature "$terminal" 2>/dev/null || true)"
      if [ -n "$sig" ]; then
        transition_state "$run_dir" CAPPED_NEEDS_HUMAN --failure-signature "$sig"
      else
        transition_state "$run_dir" CAPPED_NEEDS_HUMAN
      fi
      notify_run "$run_dir" capped "needs human: $classification"
      reap_run_worktree "$run_dir"
      ;;
    TRANSIENT)
      sig="$(terminal_signature "$terminal" 2>/dev/null || true)"
      begin_retry "$run_dir" "transient failure" "$sig" "false" "$now"
      ;;
  esac
}

handle_dead_terminal_status() {
  local run_dir="$1" terminal="$2"
  local classification sig pr_url commit
  classification="$(python3 "$STATUS_TOOL" classify --run-dir "$run_dir" --terminal-status "$terminal")" || return 0
  case "$classification" in
    CONVERGED)
      pr_url="$(json_get "$terminal" pr_url 2>/dev/null || true)"
      commit="$(json_get "$terminal" commit 2>/dev/null || true)"
      transition_state "$run_dir" CONVERGED_PR_CREATED --pr-url "$pr_url" --commit "$commit"
      notify_run "$run_dir" converged "converged: PR created"
      reap_run_worktree "$run_dir"
      ;;
    *)
      sig="$(terminal_signature "$terminal" 2>/dev/null || true)"
      if [ -n "$sig" ]; then
        transition_state "$run_dir" CAPPED_NEEDS_HUMAN --failure-signature "$sig"
      else
        transition_state "$run_dir" CAPPED_NEEDS_HUMAN
      fi
      notify_run "$run_dir" capped "needs human: terminal status after dead orchestrator"
      reap_run_worktree "$run_dir"
      ;;
  esac
}

handle_run() {
  local run_dir="$1" now="$2"
  local status_file="$run_dir/status.json" state attempt attempt_dir terminal created_epoch wall live started_grace_epoch heartbeat
  [ -f "$status_file" ] || return 0
  state="$(json_get "$status_file" state 2>/dev/null || true)"
  [ -n "$state" ] || return 0
  case "$TERMINAL_STATES" in *" $state "*) project_dirty "$run_dir"; return 0 ;; esac
  attempt="$(json_get "$status_file" attempt 2>/dev/null || echo 1)"
  attempt_dir="$(attempt_dir_for "$run_dir" "$attempt")"
  terminal="$attempt_dir/terminal_status.json"

  live="$(liveness_status "$run_dir" "$attempt" "$now")"
  case "$state" in
    RETRY_PENDING)
      if [ "$live" = "alive" ]; then
        transition_state "$run_dir" ATTEMPT_RUNNING
      else
        continue_pending_launch "$run_dir" "resume pending retry" "$now"
      fi
      project_dirty "$run_dir"
      return 0
      ;;
    ZOMBIE_REAPED)
      if [ "$live" != "alive" ]; then
        begin_retry "$run_dir" "zombie reaped" "" "true" "$now"
      fi
      project_dirty "$run_dir"
      return 0
      ;;
    STARTED)
      started_grace_epoch="$(json_epoch "$status_file" started_grace_until)"
      if [ "$started_grace_epoch" -gt 0 ] && [ "$now" -gt "$started_grace_epoch" ] && [ "$live" != "alive" ]; then
        begin_retry "$run_dir" "startup zombie" "" "true" "$now"
      fi
      project_dirty "$run_dir"
      return 0
      ;;
  esac

  created_epoch="$(json_epoch "$status_file" created_at)"
  wall=$((now - created_epoch))
  [ "$created_epoch" -gt 0 ] || wall=0
  if ! python3 "$STATUS_TOOL" budget-check --run-dir "$run_dir" --set-wall-used "$wall" >/dev/null; then
    transition_state "$run_dir" CAPPED_NEEDS_HUMAN
    notify_run "$run_dir" capped "needs human: budget exceeded"
    project_dirty "$run_dir"
    return 0
  fi

  heartbeat="$attempt_dir/heartbeat.json"
  if [ "$state" = "ATTEMPT_RUNNING" ] && [ -f "$terminal" ] && [ -f "$heartbeat" ] && [ "$live" != "alive" ]; then
    handle_dead_terminal_status "$run_dir" "$terminal"
    project_dirty "$run_dir"
    return 0
  fi

  if [ -f "$terminal" ]; then
    handle_terminal_status "$run_dir" "$terminal" "$now"
    project_dirty "$run_dir"
    return 0
  fi

  case "$state" in
    ATTEMPT_RUNNING)
      case "$live" in
        alive) ;;
        fresh_dead)
          heartbeat="$attempt_dir/heartbeat.json"
          if fresh_dead_grace_elapsed "$heartbeat" "$now"; then
            transition_state "$run_dir" ZOMBIE_SUSPECT
            notify_run "$run_dir" zombie-suspect "zombie suspect: ambiguous dead beyond grace"
          fi
          ;;
        fresh_confirmed_dead|stale_dead)
          transition_state "$run_dir" ZOMBIE_SUSPECT
          notify_run "$run_dir" zombie-suspect "zombie suspect: no live orchestrator"
          ;;
      esac
      ;;
    ZOMBIE_SUSPECT)
      heartbeat="$attempt_dir/heartbeat.json"
      if [ "$live" = "fresh_confirmed_dead" ] || [ "$live" = "stale_dead" ] || { [ "$live" = "fresh_dead" ] && fresh_dead_grace_elapsed "$heartbeat" "$now"; }; then
        transition_state "$run_dir" ZOMBIE_REAPED
        begin_retry "$run_dir" "zombie reaped" "" "true" "$now"
      fi
      ;;
  esac
  project_dirty "$run_dir"
}

main() {
  mkdir -p "$(dirname "$REAPER_LOCK")"
  if ! /usr/bin/shlock -f "$REAPER_LOCK" -p "$$" >/dev/null 2>&1; then
    exit 0
  fi
  trap 'rm -f "$REAPER_LOCK"' EXIT
  [ -d "$RUNS_ROOT" ] || exit 0
  local now run_dir sweep_notify_run
  now="$(now_epoch)"
  for run_dir in "$RUNS_ROOT"/*; do
    [ -d "$run_dir" ] || continue
    [ -z "${sweep_notify_run:-}" ] && sweep_notify_run="$run_dir"
    handle_run "$run_dir" "$now" || log "scan failed for $run_dir"
  done
  if session_workspace_ready "${sweep_notify_run:-}" "periodic worktree reap"; then
    bash "$SESSION_WORKSPACE" reap --ttl-hours "$WORKTREE_REAP_TTL_HOURS" >/dev/null 2>&1 || true
  fi
}

main "$@"
