#!/usr/bin/env bash
# session_workspace.sh — parallel-session isolation + lease + handoff launcher.
#
# One external worktree per mutating actor-session, on an issue-keyed actor-prefixed
# branch, where creating-and-pushing that branch to origin IS the atomic cross-host
# claim (server-side CAS). See:
#   consultations/tooling/parallel-session-model-2026-06-03/BUILD_SPEC.md  (Phases 0-2)
#   consultations/tooling/parallel-session-model-2026-06-03/SYNTHESIS_R2.md
#
# Subcommands:
#   observe   [--json]                                   # Phase 0: pure read + one ledger event, exit 0 always
#   create    --actor <claude|codex> --issue REC-NN --slug <slug> [--sid S] [--base origin/main]
#   create    --actor <claude|codex> --adhoc --slug <slug> [--sid S] [--base origin/main]
#   close     --worktree <path> [--reason safe|dirty]
#   reap      [--ttl-hours 24] [--dry-run]
#   reap-one  --worktree <path> [--dry-run]
#   heartbeat --worktree <path>                          # refresh the lease TTL (call from long-running sessions)
#   pr-metadata --issue REC-NN --actor <a> --worktree <p>
#   converge-codex [--dry-run]
#
# LOCKED DECISIONS baked in (JT 2026-06-03): worktrees OUTSIDE the repo only;
# lease = branch-on-origin (push-to-claim); mutex granularity = per-BRANCH not per-issue;
# dirty close = WIP-commit-and-push (never auto-delete unpushed work); reap = reconcile-on-read.
#
# SAFETY INVARIANTS (hardened after adversarial review 2026-06-03):
#  - Never `--force` a worktree removal: git refuses to drop a dirty/untracked tree, so
#    a failed commit/push can never silently delete work.
#  - close/heartbeat operate ONLY on paths under SESSIONS_ROOT (assert_owned).
#  - adopt RE-CLAIMS the lease (rewrite+push) so the remote lease always names the live holder;
#    if that push is rejected, someone else claimed it -> refuse (sequential double-writer closed).
#  - origin-unreachable fails CLOSED (cannot claim the lease -> do not start mutating work).
set -uo pipefail

# ---- environment / defaults --------------------------------------------------
_SELF="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CANONICAL_REPO="${CANONICAL_REPO:-$HOME/CLAUDE_PROJECTS}"
REPO_ROOT="${REPO_ROOT:-$(git -C "$_SELF" rev-parse --show-toplevel 2>/dev/null || echo "")}"
SESSIONS_ROOT="${SESSIONS_ROOT:-$HOME/Code/recoil-sessions}"
LEDGER="${LEDGER:-$HOME/Dropbox/Claude_Config/maintenance/parallel-sessions/parallel_sessions.jsonl}"
HOST="${HOST:-$(scutil --get LocalHostName 2>/dev/null || hostname -s 2>/dev/null || echo unknown-host)}"
SID="${SID:-$(tmux display-message -p '#S' 2>/dev/null || echo "no-tmux-$$")}"
LEASE_TTL_HRS="${LEASE_TTL_HRS:-24}"
# Short ACTIVITY window (seconds) for the safe-reap live-lease guard — mirrors
# checkout-lease-gate.py LIVE_WINDOW_SEC. This is "a session is actively here NOW",
# distinct from the 24h reap TTL: a just-completed run's heartbeat is minutes old but
# the run is terminal, so the live-guard must use the short window, not LEASE_TTL_HRS,
# or close-on-completion would never reap (it would wait the full TTL).
LIVE_WINDOW_SEC="${LIVE_WINDOW_SEC:-600}"
HEARTBEAT_MIN="${HEARTBEAT_MIN:-30}"
LEASE_FILE=".session-lease.json"

err()     { echo "session_workspace: $*" >&2; }
valid_nonnegative_integer() { # $1=name $2=value
  case "$2" in ""|*[!0-9]*) err "invalid $1 '$2' (want non-negative integer)"; exit 2;; esac
}
valid_nonnegative_integer LEASE_TTL_HRS "$LEASE_TTL_HRS"

now_iso() { date -u +%Y-%m-%dT%H:%M:%SZ; }
epoch()   { date -u +%s; }
ttl_iso() { python3 -c "import sys,time;print(time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime(time.time()+int(sys.argv[1])*3600)))" "${1:-$LEASE_TTL_HRS}"; }

# Append exactly one JSON event to the ledger (append-only; never rewrite).
log_event() { # $1=event_type  $2=issue  $3=branch  $4=worktree  $5=payload_json
  local etype="$1" issue="${2:-null}" branch="${3:-null}" wt="${4:-null}" payload="${5:-{\}}"
  mkdir -p "$(dirname "$LEDGER")" 2>/dev/null || true
  ISSUE="$issue" BRANCH="$branch" WT="$wt" PAYLOAD="$payload" ETYPE="$etype" \
  HOST="$HOST" SID="$SID" ACTOR="${ACTOR:-null}" CWD="$(pwd)" RR="$REPO_ROOT" TS="$(now_iso)" PHASE="${PHASE_N:-1}" \
  python3 - "$LEDGER" <<'PY'
import json, os, sys, hashlib
ledger = sys.argv[1]
def n(x): return None if x in (None,"","null") else x
ev = {
  "schema_version": 1, "event_ts": os.environ["TS"], "event_type": os.environ["ETYPE"],
  "phase": int(os.environ.get("PHASE","1")), "host": os.environ.get("HOST"), "sid": os.environ.get("SID"),
  "actor": n(os.environ.get("ACTOR")), "cwd": os.environ.get("CWD"), "repo_root": n(os.environ.get("RR")),
  "issue": n(os.environ.get("ISSUE")), "branch": n(os.environ.get("BRANCH")), "worktree": n(os.environ.get("WT")),
}
try: ev["payload"] = json.loads(os.environ.get("PAYLOAD") or "{}")
except Exception: ev["payload"] = {"raw": os.environ.get("PAYLOAD")}
ev["event_id"] = "sha256:" + hashlib.sha256((ev["event_ts"]+str(ev["sid"])+ev["event_type"]+str(ev["branch"])).encode()).hexdigest()
with open(ledger,"a") as f: f.write(json.dumps(ev,separators=(",",":"))+"\n")
PY
}

# Parse a lease blob (passed as $1, NOT stdin) -> "sid|host|expires_epoch|heartbeat_epoch".
# host is part of the identity: the same tmux SID on two machines is NOT the same session.
parse_lease() { python3 - "${1:-}" <<'PY'
import json,sys,calendar,time
def parse_epoch(value):
    try: return calendar.timegm(time.strptime(value,"%Y-%m-%dT%H:%M:%SZ"))
    except Exception: return 0
try:
    d=json.loads(sys.argv[1])
    exp=parse_epoch(d.get("expires_at",""))
    hb=parse_epoch(d.get("heartbeat",""))
    print(f"{d.get('sid','')}|{d.get('host','')}|{exp}|{hb}")
except Exception: print("||0|0")
PY
}

# ---- validation / assertions -------------------------------------------------
assert_repo() {
  [ -n "$REPO_ROOT" ] || { err "REPO_ROOT not resolved (not in a git repo?)"; exit 2; }
  if [ "${SESSION_WS_ALLOW_ANY_REPO:-0}" != "1" ] && [ "$REPO_ROOT" != "$CANONICAL_REPO" ]; then
    err "REPO_ROOT ($REPO_ROOT) must be $CANONICAL_REPO (CLAUDE.md:9). (tests set SESSION_WS_ALLOW_ANY_REPO=1)"; exit 2
  fi
  case "$SESSIONS_ROOT/" in "$REPO_ROOT"/*) err "SESSIONS_ROOT ($SESSIONS_ROOT) must be OUTSIDE the repo ($REPO_ROOT) — CLAUDE.md:9"; exit 2;; esac
}
# Refuse to touch any path the launcher does not own.
assert_owned() { # $1 = path
  local p; p="$(cd "$1" 2>/dev/null && pwd -P || echo "$1")"
  local root; root="$(cd "$SESSIONS_ROOT" 2>/dev/null && pwd -P || echo "$SESSIONS_ROOT")"
  case "$p/" in "$root"/*) return 0;; *) err "REFUSE: $1 is not under SESSIONS_ROOT ($SESSIONS_ROOT) — will not touch a non-owned checkout"; exit 2;; esac
}
rm_rf_owned() { # $1 = path; fallback only after git worktree remove refuses
  assert_owned "$1"
  rm -rf "$1"
}
valid_session_component() { # $1=display name  $2=value
  local name="$1" value="$2"
  [ -n "$value" ] || { err "invalid $name (empty)"; exit 2; }
  case "$value" in */*|*..*) err "invalid $name '$value' (must not contain '/', '..', whitespace, or control chars)"; exit 2;; esac
  if printf '%s' "$value" | LC_ALL=C grep -q '[[:space:][:cntrl:]]'; then
    err "invalid $name '$value' (must not contain '/', '..', whitespace, or control chars)"; exit 2
  fi
}
valid_inputs() { # $1=actor $2=issue $3=slug
  case "$1" in claude|codex) :;; *) err "invalid --actor '$1' (want claude|codex)"; exit 2;; esac
  printf '%s' "$2" | grep -qE '^REC-[0-9]+$' || { err "invalid --issue '$2' (want REC-NN)"; exit 2; }
  printf '%s' "$3" | grep -qE '^[a-z0-9][a-z0-9-]*$' || { err "invalid --slug '$3' (want [a-z0-9-])"; exit 2; }
}
valid_adhoc_inputs() { # $1=actor $2=slug
  case "$1" in claude|codex) :;; *) err "invalid --actor '$1' (want claude|codex)"; exit 2;; esac
  printf '%s' "$2" | grep -qE '^[a-z0-9][a-z0-9-]*$' || { err "invalid --slug '$2' (want [a-z0-9-])"; exit 2; }
}

# Worktree directory id: issue + actor + slug + a FRESH per-create id.
# Deliberately NOT derived from the (possibly leaked) ambient SID — that is the
# REC-98-labeled-REC-88 scramble bug. SID stays the lease/ledger identity only.
derive_worktree_id() { # $1=issue $2=actor $3=slug
  local issue="$1" actor="$2" slug="$3" fresh ts uid
  ts="$(date -u +%Y%m%dT%H%M%SZ)"
  if command -v uuidgen >/dev/null 2>&1; then
    uid="$(uuidgen | tr 'A-Z' 'a-z' | cut -c1-8)"
  else
    uid="$$"   # uuidgen-absent fallback: date + pid; NEVER the ambient SID
  fi
  fresh="${ts}-${uid}"
  printf '%s--%s--%s--%s\n' "$issue" "$actor" "$slug" "$fresh"
}

# ---- git helpers -------------------------------------------------------------
origin_reachable() { git -C "$REPO_ROOT" ls-remote origin >/dev/null 2>&1; }
branch_on_origin() { # $1=branch ; exact ref, fixed-string
  [ -n "$(git -C "$REPO_ROOT" ls-remote --heads origin "refs/heads/$1" 2>/dev/null)" ]
}
fetch_branch() { # $1=branch ; update the remote-tracking ref explicitly
  git -C "$REPO_ROOT" fetch origin "refs/heads/$1:refs/remotes/origin/$1" --quiet 2>/dev/null
}
fetch_origin_main() {
  git -C "$REPO_ROOT" fetch origin "refs/heads/main:refs/remotes/origin/main" --quiet 2>/dev/null
}
read_origin_lease() { # $1=branch -> prints blob (may be empty)
  git -C "$REPO_ROOT" show "origin/$1:$LEASE_FILE" 2>/dev/null || true
}
worktree_for_branch() { # $1=branch -> prints worktree path holding it (or empty)
  git -C "$REPO_ROOT" worktree list --porcelain 2>/dev/null | awk -v b="refs/heads/$1" '
    /^worktree /{p=substr($0,10)} /^branch /{if(substr($0,8)==b){print p}}'
}
# Who holds the lease on origin for $1=branch -> "ours" | "free" | "other:<sid>@<host>".
# "free" also covers branch-absent and expired. Caller should fetch_branch first.
lease_holder_status() { # $1=branch
  local branch="$1" blob h hh hx hb
  branch_on_origin "$branch" || { echo free; return; }
  blob="$(read_origin_lease "$branch")"
  [ -n "$blob" ] || { echo free; return; }
  IFS='|' read -r h hh hx hb <<EOF
$(parse_lease "$blob")
EOF
  if [ "$h" = "$SID" ] && [ "$hh" = "$HOST" ]; then echo ours
  elif [ "${hx:-0}" -lt "$(epoch)" ]; then echo free
  else echo "other:${h}@${hh}"; fi
}
# write lease file in $1=worktree for $2=branch, commit, push. returns push exit code.
write_lease_push() { # $1=wt $2=branch $3=action-tag $4=ttl_hrs(optional)
  local wt="$1" branch="$2" tag="$3" ttl="${4:-$LEASE_TTL_HRS}" exp
  exp="$(ttl_iso "$ttl")"
  python3 - "$SID" "$HOST" "$(now_iso)" "$exp" > "$wt/$LEASE_FILE" <<'PY'
import json,sys
print(json.dumps({"sid":sys.argv[1],"host":sys.argv[2],"heartbeat":sys.argv[3],"expires_at":sys.argv[4]}))
PY
  git -C "$wt" add "$LEASE_FILE" >/dev/null 2>&1
  git -C "$wt" commit -q -m "lease: $tag $branch by $SID" >/dev/null 2>&1 || true
  # the commit may have failed silently (identity/hook/etc). Refuse to "claim" a branch
  # whose lease file is not actually committed — an empty/uncommitted lease is adoptable by anyone.
  if [ -n "$(git -C "$wt" status --porcelain "$LEASE_FILE" 2>/dev/null)" ]; then
    err "lease commit failed for $branch (lease file uncommitted) — NOT claiming"; return 1
  fi
  git -C "$wt" push -u origin "$branch" --quiet 2>/dev/null
}

# release the lease (write an EXPIRED lease + push) so the branch is immediately adoptable.
release_lease() { # $1=wt $2=branch
  local wt="$1" branch="$2"
  python3 - "$HOST" "$(now_iso)" > "$wt/$LEASE_FILE" <<'PY'
import json,sys
print(json.dumps({"sid":"","host":sys.argv[1],"heartbeat":sys.argv[2],"expires_at":"1970-01-01T00:00:00Z"}))
PY
  git -C "$wt" add "$LEASE_FILE" >/dev/null 2>&1
  git -C "$wt" commit -q -m "lease: release $branch by $SID" >/dev/null 2>&1 || true
  if [ -n "$(git -C "$wt" status --porcelain "$LEASE_FILE" 2>/dev/null)" ]; then
    err "lease release commit failed for $branch (lease file uncommitted) — NOT releasing"; return 1
  fi
  git -C "$wt" push --quiet 2>/dev/null
}

# ---- observe (Phase 0) -------------------------------------------------------
cmd_observe() {
  PHASE_N=0
  local json_mode="${1:-}" cwd repo_here cur_branch issue would_branch="null"
  local exists="false" held_json="null" wtconf="false" collision="none" mutating="false"
  cwd="$(pwd)"; repo_here="$(git rev-parse --show-toplevel 2>/dev/null || echo "")"
  cur_branch="$(git rev-parse --abbrev-ref HEAD 2>/dev/null || echo "")"
  if [ -n "$repo_here" ] && [ -n "$REPO_ROOT" ] && [ "$repo_here" = "$REPO_ROOT" ]; then
    mutating="true"; collision="same_host_double_checkout"
  fi
  issue="$(printf '%s' "$cur_branch" | grep -oiE 'REC-[0-9]+' | head -1 || true)"
  if [ -n "$issue" ] && [ -n "$repo_here" ]; then
    would_branch="$cur_branch"
    if git -C "$repo_here" ls-remote --heads origin "refs/heads/$cur_branch" 2>/dev/null | grep -q .; then
      exists="true"
      local blob; blob="$(git -C "$repo_here" show "origin/$cur_branch:$LEASE_FILE" 2>/dev/null || true)"
      if [ -n "$blob" ]; then held_json="$blob"; fi
    fi
    # same-host: is this branch checked out in more than one worktree?
    if [ "$(git -C "$repo_here" worktree list --porcelain 2>/dev/null | grep -cF "refs/heads/$cur_branch")" -gt 1 ]; then wtconf="true"; fi
  fi
  local payload
  payload="$(HELD="$held_json" python3 - "$cwd" "$would_branch" "$exists" "$collision" "$mutating" "$wtconf" <<'PY'
import json,os,sys
cwd,wb,exists,coll,mut,wtc=sys.argv[1:7]
held=os.environ.get("HELD","null")
try: held=json.loads(held) if held not in("","null") else None
except Exception: held=None
print(json.dumps({
  "would_create_worktree": None,
  "would_claim_branch": None if wb=="null" else wb,
  "branch_exists_on_origin": exists=="true",
  "branch_held_by": held,
  "same_host_checkout_conflict": wtc=="true",
  "mutating_shared_checkout": mut=="true",
  "collision_class": coll,
  "enforcement": "NONE (phase 0 observe-only)"}))
PY
)"
  log_event observe "${issue:-null}" "$would_branch" null "$payload"
  [ "$json_mode" = "--json" ] && echo "$payload"
  return 0
}

# ---- create ------------------------------------------------------------------
cmd_create() {
  PHASE_N=1
  local actor="" issue="" slug="" base="origin/main" adhoc=0
  while [ $# -gt 0 ]; do case "$1" in
    --actor) actor="$2"; shift 2;; --issue) issue="$2"; shift 2;;
    --slug) slug="$2"; shift 2;; --sid) SID="$2"; shift 2;;
    --adhoc) adhoc=1; shift;;
    --base) base="$2"; shift 2;; *) err "create: unknown arg $1"; exit 2;; esac; done
  if [ "$adhoc" = "1" ]; then
    [ -z "$issue" ] || { err "create: --adhoc and --issue are mutually exclusive"; exit 2; }
    [ -n "$actor" ] && [ -n "$slug" ] || { err "create --adhoc needs --actor --slug"; exit 2; }
    valid_adhoc_inputs "$actor" "$slug"
  else
    [ -n "$actor" ] && [ -n "$issue" ] && [ -n "$slug" ] || { err "create needs --actor --issue --slug"; exit 2; }
    valid_inputs "$actor" "$issue" "$slug"
  fi
  valid_session_component --sid "$SID"
  valid_session_component HOST "$HOST"
  ACTOR="$actor"; assert_repo
  local branch wt_issue
  if [ "$adhoc" = "1" ]; then
    branch="${actor}/${slug}"
    wt_issue="adhoc"
  else
    branch="${actor}/${issue}-${slug}"
    wt_issue="$issue"
  fi
  # Resume-by-branch FIRST (Decision 12): an existing worktree for this branch is
  # found by branch, never by recomputing the directory name (the fresh id differs
  # each create). Only mint a new fresh dir id when no existing worktree is adopted.
  local wt existing
  existing="$(worktree_for_branch "$branch")"
  if [ -n "$existing" ]; then
    case "$existing/" in "$SESSIONS_ROOT"/*) wt="$existing";; *) wt="$SESSIONS_ROOT/$HOST/$(derive_worktree_id "$wt_issue" "$actor" "$slug")";; esac
  else
    wt="$SESSIONS_ROOT/$HOST/$(derive_worktree_id "$wt_issue" "$actor" "$slug")"
  fi
  case "$wt/" in "$REPO_ROOT"/*) err "refuse: worktree path inside repo"; exit 2;; esac

  origin_reachable || { err "origin unreachable — failing CLOSED (cannot verify/claim the lease, so will not start mutating work)"; exit 2; }
  fetch_origin_main || { err "fetch origin/main failed — failing CLOSED (cannot create a fresh worktree base)"; exit 2; }
  case "$base" in
    ""|main|origin/main|refs/heads/main|refs/remotes/origin/main) base="origin/main";;
    *) err "create: unsupported --base '$base' (worktrees must start from fresh origin/main)"; exit 2;;
  esac
  mkdir -p "$(dirname "$wt")"
  fetch_branch "$branch" || true

  # idempotent resume: this session's worktree already present on the right branch.
  # Re-verify against ORIGIN (another host may have adopted it after our lease expired),
  # then re-claim via push CAS — a rejected push means we lost the branch.
  if [ -e "$wt/.git" ]; then
    local have; have="$(git -C "$wt" rev-parse --abbrev-ref HEAD 2>/dev/null || echo "")"
    if [ "$have" = "$branch" ]; then
      if branch_on_origin "$branch"; then
        local cblob chold chost cexp chb
        cblob="$(read_origin_lease "$branch")"
        if [ -n "$cblob" ]; then IFS='|' read -r chold chost cexp chb <<EOF
$(parse_lease "$cblob")
EOF
        else chold=""; chost=""; cexp=0; fi
        if ! { [ "$chold" = "$SID" ] && [ "$chost" = "$HOST" ]; } && [ "${cexp:-0}" -ge "$(epoch)" ]; then
          err "REFUSED: $branch is now held by a different live session (sid=$chold host=$chost) — your local worktree is stale"; exit 1
        fi
      fi
      if write_lease_push "$wt" "$branch" "resume"; then
        log_event create "$issue" "$branch" "$wt" '{"action":"adopted-existing"}'
        echo "adopted $branch (worktree already present): $wt"; return 0
      fi
      err "REFUSED: lost $branch (resume re-claim push rejected — another session advanced it)"; exit 1
    fi
  fi

  if ! branch_on_origin "$branch"; then
    # first claimant
    git -C "$REPO_ROOT" worktree add --quiet "$wt" -b "$branch" "$base" 2>/dev/null \
      || { err "worktree add failed for $branch"; exit 2; }
    if write_lease_push "$wt" "$branch" "claim"; then
      log_event create "$issue" "$branch" "$wt" '{"action":"created"}'
      echo "created $branch  worktree: $wt"; return 0
    fi
    # lost the ls-remote -> push race: back out and fall through to adopt/refuse
    # safe: pristine just-created worktree (base + lease file only); no user work to lose
    git -C "$REPO_ROOT" worktree remove "$wt" 2>/dev/null || rm_rf_owned "$wt"
    git -C "$REPO_ROOT" branch -D "$branch" >/dev/null 2>&1 || true
    err "branch taken during claim race; resolving holder..."
    fetch_branch "$branch" || true
  fi

  # branch exists -> adopt vs refuse
  fetch_branch "$branch" || true
  local blob holder hhost hexp hbeat now
  blob="$(read_origin_lease "$branch")"
  if [ -n "$blob" ]; then IFS='|' read -r holder hhost hexp hbeat <<EOF
$(parse_lease "$blob")
EOF
  else holder=""; hhost=""; hexp=0; fi
  now="$(epoch)"
  # adopt only if THIS exact session (sid AND host) holds it, or the lease has expired.
  if { [ "$holder" = "$SID" ] && [ "$hhost" = "$HOST" ]; } || [ "${hexp:-0}" -lt "$now" ]; then
    # free a stale LOCAL checkout of this branch first (else git refuses 2nd checkout)
    local other; other="$(worktree_for_branch "$branch")"
    if [ -n "$other" ] && [ "$other" != "$wt" ]; then
      if [ -z "$(git -C "$other" status --porcelain --ignored 2>/dev/null)" ] && [ -z "$(git -C "$other" log "origin/$branch..HEAD" --oneline 2>/dev/null)" ]; then
        git -C "$REPO_ROOT" worktree remove "$other" 2>/dev/null || true
      else
        err "REFUSE: $branch is checked out with un-pushed/uncommitted work at $other"; exit 1
      fi
    fi
    git -C "$REPO_ROOT" worktree add --quiet "$wt" -B "$branch" "origin/$branch" 2>/dev/null \
      || { err "adopt: worktree add failed"; exit 2; }
    # RE-CLAIM atomically: rewrite lease + push. If rejected, someone else claimed -> refuse.
    if write_lease_push "$wt" "$branch" "adopt"; then
      log_event create "$issue" "$branch" "$wt" "{\"action\":\"adopted\",\"prev_holder\":\"$holder\"}"
      echo "adopted $branch  worktree: $wt  (prev holder: ${holder:-none})"; return 0
    else
      # safe: pristine just-created worktree (base + lease file only); no user work to lose
      git -C "$REPO_ROOT" worktree remove "$wt" 2>/dev/null || rm_rf_owned "$wt"
      log_event create "$issue" "$branch" null '{"action":"refused-lost-adopt-race"}'
      err "REFUSED: lost the adopt race for $branch (another session claimed it first)"; exit 1
    fi
  else
    log_event create "$issue" "$branch" null "{\"action\":\"refused\",\"live_holder\":\"$holder\"}"
    err "REFUSED: $branch is held by a different live session (sid=$holder). Single-writer guarantee."; exit 1
  fi
}

# ---- close -------------------------------------------------------------------
cmd_close() {
  PHASE_N=1
  local wt="" reason=""
  while [ $# -gt 0 ]; do case "$1" in
    --worktree) wt="$2"; shift 2;; --reason) reason="$2"; shift 2;; *) err "close: unknown arg $1"; exit 2;; esac; done
  [ -n "$wt" ] && [ -d "$wt" ] || { err "close needs an existing --worktree"; exit 2; }
  assert_repo; assert_owned "$wt"
  local branch dirty unpushed
  branch="$(git -C "$wt" rev-parse --abbrev-ref HEAD 2>/dev/null || echo "")"
  dirty="$(git -C "$wt" status --porcelain --ignored 2>/dev/null)"
  unpushed="$(git -C "$wt" log "origin/$branch..HEAD" --oneline 2>/dev/null || echo "")"
  if [ -z "$dirty" ] && [ -z "$unpushed" ] && [ "$reason" != "dirty" ]; then
    fetch_branch "$branch" || true
    local st; st="$(lease_holder_status "$branch")"
    if [ "${st#other:}" != "$st" ]; then
      # branch is owned by a different live session now; our local worktree is stale.
      # remove OUR worktree, do NOT touch their lease.
      if git -C "$REPO_ROOT" worktree remove "$wt" 2>/dev/null; then
        log_event close "null" "$branch" "$wt" '{"reason":"safe","action":"removed-stale-not-released"}'
        echo "closed: removed your stale worktree; $branch is now held by ${st#other:} (their lease left intact)"; return 0
      fi
      err "close: could not remove stale worktree $wt"; exit 1
    fi
    # release the lease (push an expired lease) so the next session can adopt immediately.
    # If the release PUSH fails (origin unreachable), do NOT remove + falsely claim release —
    # keep the worktree as the retry handle (work is already pushed, so nothing is lost).
    if ! release_lease "$wt" "$branch"; then
      log_event close "null" "$branch" "$wt" '{"reason":"safe","action":"kept-release-push-failed"}'
      err "close: lease-release push FAILED (origin unreachable?) — KEEPING $wt; re-run close when origin is reachable ($branch stays leased until then)"; exit 1
    fi
    # NON-force remove: git refuses if anything is actually present.
    if git -C "$REPO_ROOT" worktree remove "$wt" 2>/dev/null; then
      log_event close "null" "$branch" "$wt" '{"reason":"safe","action":"released-removed"}'
      echo "closed (safe): $branch lease released (next session can adopt), worktree removed"; return 0
    fi
    err "close: worktree not clean enough to remove safely — keeping $wt"; exit 1
  fi
  # dirty/unpushed: WIP-commit-and-push; VERIFY before removing; never lose work.
  fetch_branch "$branch" || true
  local st; st="$(lease_holder_status "$branch")"
  if [ "${st#other:}" != "$st" ]; then
    log_event close "null" "$branch" "$wt" "{\"reason\":\"dirty\",\"action\":\"kept-foreign-holder\",\"holder\":\"${st#other:}\"}"
    err "close: REFUSED dirty WIP push for $branch — held by different live session (${st#other:}); KEEPING $wt"; exit 1
  fi
  git -C "$wt" add -A >/dev/null 2>&1
  git -C "$wt" commit -q -m "WIP: session close $SID" >/dev/null 2>&1 || true
  git -C "$wt" push --quiet 2>/dev/null || true
  unpushed="$(git -C "$wt" log "origin/$branch..HEAD" --oneline 2>/dev/null || echo "x")"
  dirty="$(git -C "$wt" status --porcelain --ignored 2>/dev/null)"
  if [ -z "$unpushed" ] && [ -z "$dirty" ]; then
    release_lease "$wt" "$branch" || { err "close (dirty): WIP pushed but lease-release push FAILED — KEEPING $wt; re-run close when origin reachable"; exit 1; }
    git -C "$REPO_ROOT" worktree remove "$wt" 2>/dev/null \
      && { log_event close "null" "$branch" "$wt" '{"reason":"dirty","action":"pushed-wip-removed"}'; echo "closed (dirty): WIP pushed to $branch (survives handoff), worktree removed"; return 0; }
  fi
  log_event close "null" "$branch" "$wt" '{"reason":"dirty","action":"kept-unpushed"}'
  err "close: could not confirm WIP is committed AND pushed — KEEPING worktree $wt (refusing to delete unpushed work)"; exit 1
}

# ---- heartbeat ---------------------------------------------------------------
cmd_heartbeat() {
  local wt=""
  while [ $# -gt 0 ]; do case "$1" in --worktree) wt="$2"; shift 2;; *) shift;; esac; done
  [ -n "$wt" ] && [ -d "$wt" ] || { err "heartbeat needs --worktree"; exit 2; }
  assert_repo; assert_owned "$wt"
  local branch; branch="$(git -C "$wt" rev-parse --abbrev-ref HEAD 2>/dev/null || echo "")"
  [ -n "$branch" ] || { err "heartbeat: no branch at $wt"; exit 2; }
  origin_reachable || { err "heartbeat: origin unreachable — cannot verify/refresh lease"; exit 2; }
  fetch_branch "$branch" || true
  local st; st="$(lease_holder_status "$branch")"
  case "$st" in other:*) err "heartbeat REFUSED: $branch is held by a different live session (${st#other:}) — not reclaiming"; exit 1;; esac
  if write_lease_push "$wt" "$branch" "heartbeat"; then
    echo "heartbeat: $branch lease refreshed (+${LEASE_TTL_HRS}h)"
  else
    err "heartbeat: push rejected — another session may hold $branch"; exit 1
  fi
}

# ---- reap (reconcile-on-read, NO daemon) ------------------------------------
cmd_reap() {
  PHASE_N=1
  local ttl="$LEASE_TTL_HRS" dry=""
  while [ $# -gt 0 ]; do case "$1" in
    --ttl-hours)
      case "${2:-}" in ""|*[!0-9]*) err "invalid --ttl-hours"; exit 2;; esac
      ttl="$2"; shift 2;;
    --dry-run) dry="1"; shift;; *) err "reap: unknown arg $1"; exit 2;; esac; done
  assert_repo
  [ -d "$SESSIONS_ROOT" ] || { echo "reap: no sessions root ($SESSIONS_ROOT)"; return 0; }
  local found=0 wt
  while IFS= read -r wt; do
    [ -e "$wt/.git" ] || continue
    found=1
    local branch blob holder hhost hexp hbeat now eligible action reason reason_json
    branch="$(git -C "$wt" rev-parse --abbrev-ref HEAD 2>/dev/null || echo "")"
    # Eligibility here is TTL/lease ONLY. ALL dirty/unpushed/pushed/branch-name safety is
    # delegated to the single safe_reap_worktree predicate (SSOT) so the periodic sweep and
    # reap-one agree exactly — e.g. a build-log.md-only tree is removable in BOTH paths.
    blob="$(git -C "$wt" show "HEAD:$LEASE_FILE" 2>/dev/null || true)"
    IFS='|' read -r holder hhost hexp hbeat <<EOF
$(parse_lease "$blob")
EOF
    now="$(epoch)"
    if [ "$ttl" -eq 0 ]; then
      hexp=0
    elif [ "${hbeat:-0}" -gt 0 ]; then
      hexp=$((hbeat + ttl * 3600))
    fi
    # eligible = expired AND not held-live by THIS exact session (sid AND host)
    if [ "${hexp:-0}" -lt "$now" ] && ! { [ "$holder" = "$SID" ] && [ "$hhost" = "$HOST" ]; }; then
      eligible=1
    else
      eligible=0
    fi
    if [ "$eligible" = "1" ]; then
      if [ -n "$dry" ]; then
        if reason="$(LEASE_TTL_HRS="$ttl" safe_reap_worktree "$wt" --dry-run 2>&1)"; then action="removed"; reason=""; else action="kept-unsafe"; fi
      else
        if reason="$(LEASE_TTL_HRS="$ttl" safe_reap_worktree "$wt" 2>&1)"; then action="removed"; reason=""; else action="kept-unsafe"; fi
      fi
    else
      action="kept-live"
    fi
    if [ "$action" = "kept-unsafe" ]; then
      reason="$(printf '%s' "$reason" | tr '\n' ' ' | sed 's/[[:space:]]*$//')"
      echo "reap: $action  $branch  ($wt) -- $reason"
      reason_json="$(python3 - "$reason" <<'PY'
import json,sys
print(json.dumps({"action":"kept-unsafe","reason":sys.argv[1]}))
PY
)"
      log_event reap "null" "$branch" "$wt" "$reason_json"
    else
      echo "reap: $action  $branch  ($wt)"
    fi
  done < <(find "$SESSIONS_ROOT" -mindepth 2 -maxdepth 2 \( -type d -o -type l \) 2>/dev/null)
  [ "$found" -eq 1 ] || echo "reap: no worktrees under $SESSIONS_ROOT"
  return 0
}

# ---- safe reap (one worktree) -----------------------------------------------
# The SINGLE safe-removal predicate. Returns 0 and removes the tree iff EVERY
# guard passes; otherwise returns nonzero and KEEPS the tree (warn recovery).
# Background-safe: never WIP-commits, never --force, never rm -rf. Decision 4/6/7/8.
safe_reap_worktree() { # $1=worktree  [$2=--dry-run]
  local wt="$1" dry=""
  [ "${2:-}" = "--dry-run" ] && dry="1"
  [ -n "$wt" ] || { err "safe_reap: no worktree path"; return 2; }
  assert_owned "$wt"                                  # refuses anything outside SESSIONS_ROOT
  [ -e "$wt/.git" ] || { err "safe_reap: keep $wt (not a worktree)"; return 1; }
  local branch
  branch="$(git -C "$wt" rev-parse --abbrev-ref HEAD 2>/dev/null || echo "")"
  # STRICT regex (not a shell glob): `REC-` must be followed by ONE-OR-MORE DIGITS then `-`.
  # A glob like `REC-[0-9]*-*` would also accept non-numeric ids (e.g. `codex/REC-1abc-x`),
  # weakening the safety predicate; `[[ =~ ]]` with `+` enforces all-digits.
  if ! [[ "$branch" =~ ^(claude|codex)/REC-[0-9]+- ]]; then
    err "safe_reap: keep $wt (branch '$branch' not a session branch)"; return 1
  fi
  # Live-lease guard: refuse removal only if the worktree's lease is held-live by a session
  # ACTIVE within the short LIVE_WINDOW_SEC (heartbeat recency), NOT the 24h reap TTL. Using the
  # TTL here would keep every just-completed run's worktree for a full day and defeat
  # close-on-completion; a terminal run whose heartbeat is older than the activity window is
  # safe to reap (the unpushed/dirty guards below still protect any in-flight work).
  local blob holder hhost hexp hbeat now
  blob="$(cat "$wt/$LEASE_FILE" 2>/dev/null || git -C "$wt" show "HEAD:$LEASE_FILE" 2>/dev/null || true)"
  IFS='|' read -r holder hhost hexp hbeat <<EOF
$(parse_lease "$blob")
EOF
  now="$(epoch)"
  if [ -n "$holder" ] && [ "${hbeat:-0}" -gt 0 ] && [ "$((now - hbeat))" -lt "$LIVE_WINDOW_SEC" ]; then
    err "safe_reap: keep $wt (lease held-live by ${holder:-unknown}@${hhost:-unknown}; heartbeat within ${LIVE_WINDOW_SEC}s) — recovery: inspect $wt or wait for the session to go quiet"
    return 1
  fi
  # origin/<branch> must exist and fetch successfully — else we cannot
  # prove the commits are pushed, so KEEP.
  if ! fetch_branch "$branch"; then
    err "safe_reap: keep $wt (fetch origin/$branch failed — cannot confirm pushed)"
    return 1
  fi
  if ! git -C "$wt" rev-parse --verify "origin/$branch" >/dev/null 2>&1; then
    err "safe_reap: keep $wt (no origin/$branch — cannot confirm pushed)"; return 1
  fi
  # Unpushed-commit guard: count-based (stricter + test-friendly than --oneline parse).
  local unpushed_count
  unpushed_count="$(git -C "$wt" rev-list --count "origin/$branch..HEAD" 2>/dev/null || echo UNKNOWN)"
  if [ "$unpushed_count" != "0" ]; then
    err "safe_reap: keep $wt (unpushed commits=$unpushed_count) — recovery: push or close --reason dirty"
    return 1
  fi
  # Real-dirty guard: only an EXACT top-level untracked/ignored build-log.md is disposable.
  local status real_dirty="" line
  status="$(git -C "$wt" status --porcelain --untracked-files=all --ignored=matching 2>/dev/null)"
  while IFS= read -r line; do
    [ -n "$line" ] || continue
    case "$line" in
      "?? build-log.md"|"!! build-log.md") : ;;   # disposable cruft
      *) real_dirty="${real_dirty}${line}
" ;;
    esac
  done <<EOF
$status
EOF
  if [ -n "$real_dirty" ]; then
    err "safe_reap: keep $wt (real dirty/untracked work present) — recovery: inspect $wt"
    return 1
  fi
  if [ -n "$dry" ]; then
    echo "safe_reap: WOULD remove  $branch  ($wt)"
    return 0
  fi
  [ -e "$wt/build-log.md" ] && rm -f "$wt/build-log.md"
  if git -C "$REPO_ROOT" worktree remove "$wt" 2>/dev/null; then   # NEVER --force
    log_event reap "null" "$branch" "$wt" '{"action":"safe-removed"}'
    echo "safe_reap: removed  $branch  ($wt)"
    return 0
  fi
  err "safe_reap: keep $wt (git worktree remove refused — not clean enough)"   # never rm -rf
  return 1
}

# ---- reap-one (close-on-completion entry point) -----------------------------
cmd_reap_one() {
  PHASE_N=1
  local wt="" dry=""
  while [ $# -gt 0 ]; do case "$1" in
    --worktree) wt="$2"; shift 2;;
    --dry-run) dry="--dry-run"; shift;;
    *) err "reap-one: unknown arg $1"; exit 2;; esac; done
  [ -n "$wt" ] || { err "reap-one needs --worktree"; exit 2; }
  assert_repo
  safe_reap_worktree "$wt" $dry
}

# ---- pr-metadata -------------------------------------------------------------
cmd_pr_metadata() {
  local issue="" actor="" wt="" scope="null"
  while [ $# -gt 0 ]; do case "$1" in
    --issue) issue="$2"; shift 2;; --actor) actor="$2"; shift 2;;
    --worktree) wt="$2"; shift 2;; --data-scope) scope="$2"; shift 2;; *) shift;; esac; done
  cat <<EOF
<!-- session_workspace -->
sid: $SID
actor: ${actor:-null}
worktree: ${wt:-null}
linear_issue: ${issue:-null}
data_scope: ${scope:-null}
EOF
}

# ---- converge-codex (supervised one-time) -----------------------------------
cmd_converge_codex() {
  local dry="" target="${CONVERGE_TARGET:-$HOME/Code/recoil-codex}"
  [ "${1:-}" = "--dry-run" ] && dry="1"
  assert_repo
  if [ -f "$target/.git" ]; then echo "already converged: $target is a worktree (gitdir pointer)"; return 0; fi
  if [ ! -d "$target/.git" ]; then echo "no independent clone at $target — nothing to converge"; return 0; fi
  local dirty unpushed unpushed_any prev
  dirty="$(git -C "$target" status --porcelain --ignored 2>/dev/null)"
  # no-upstream MUST be treated as "cannot confirm pushed" -> abort (do not rm unpushed commits)
  if ! git -C "$target" rev-parse --abbrev-ref '@{u}' >/dev/null 2>&1; then
    err "ABORT: $target has no upstream — cannot confirm its commits are pushed (refusing to destroy work)"; exit 1
  fi
  unpushed="$(git -C "$target" log '@{u}..' --oneline 2>/dev/null || echo "UNKNOWN")"
  unpushed_any="$(git -C "$target" log --branches --not --remotes --oneline 2>/dev/null || echo "UNKNOWN")"
  prev="$(git -C "$target" rev-parse --abbrev-ref HEAD 2>/dev/null || echo origin/main)"
  if [ -n "$dirty" ] || [ -n "$unpushed" ]; then
    err "ABORT: $target has dirty/unpushed work — commit+push first (refusing to destroy work)"; exit 1
  fi
  if [ -n "$unpushed_any" ]; then
    err "ABORT: $target has local branch commits absent from origin — push or delete them first (refusing to destroy work)"; exit 1
  fi
  if [ -n "$(git -C "$target" stash list 2>/dev/null)" ]; then
    err "ABORT: $target has stashed work — pop/drop the stash first (refusing to destroy work)"; exit 1
  fi
  if [ -n "$dry" ]; then echo "would converge: rm -rf $target ; git -C $REPO_ROOT worktree add $target $prev"; return 0; fi
  rm -rf "$target"
  git -C "$REPO_ROOT" worktree add "$target" "$prev"
  mkdir -p "$REPO_ROOT/_archive"
  echo "$(now_iso)  ~/Code/recoil-codex converged from independent clone -> main-repo worktree (parallel-session-model Phase 2)" \
    >> "$REPO_ROOT/_archive/recoil-codex-converged.tombstone"
  echo "converged: $target is now a worktree of $REPO_ROOT (was branch $prev)"
}

# ---- dispatch ----------------------------------------------------------------
sub="${1:-}"; shift || true
case "$sub" in
  observe)        cmd_observe "$@";;
  create)         cmd_create "$@";;
  adopt)          cmd_create "$@";;  # REC-257: first-class alias — create already resumes-by-branch + re-claims the lease
  close)          cmd_close "$@";;
  reap)           cmd_reap "$@";;
  reap-one)       cmd_reap_one "$@";;
  heartbeat)      cmd_heartbeat "$@";;
  pr-metadata)    cmd_pr_metadata "$@";;
  converge-codex) cmd_converge_codex "$@";;
  *) err "usage: session_workspace.sh {observe|create|adopt|close|reap|reap-one|heartbeat|pr-metadata|converge-codex} ..."; exit 2;;
esac
