HITL queue
When a policy returns requires_hitl=true, the runtime parks the attempted action in HITLQueue — a SQLite-backed async queue — and the agent waits for an out-of-band approve/deny decision. This page covers how the queue actually behaves: the four states, the timeout semantics, the background expiry loop, and the failure modes you need to plan for in production.
The four states
Every queued row moves through ActionStatus:
| State | Set by |
|---|---|
PENDING | enqueue() — initial state |
APPROVED | approve(action_id, decided_by) |
DENIED | deny(action_id, decided_by, reason) |
TIMED_OUT | expire_old() background sweep, or wait_for_decision() timeout |
A PENDING row is the only mutable state — approve / deny / expiry all guard with WHERE status = 'PENDING', so a race between two approvers (or an approver + the timeout sweep) can't double-decide a row.
Setup
from kitelogik.anchor.queue import HITLQueue
queue = HITLQueue(db_path="hitl.db") # ":memory:" is rejected
await queue.setup() # one-time table creationIn-memory SQLite is explicitly rejected: every async call hops to a fresh thread (asyncio.to_thread), and :memory: is per-connection, so state would silently disappear. Use a real file path; for tests, tempfile.mkdtemp() + "/hitl.db".
The schema runs in journal_mode=WAL for concurrent readers + a single writer.
Enqueue → wait → decide
The full agent-side loop:
from kitelogik.anchor.models import PendingAction
from datetime import datetime, timezone
action = PendingAction(
id="", # auto-generated if empty
session_id=context.session_id,
tool_name="approve_refund",
args={"customer_id": "cust_001", "amount": 500},
risk_tier="TRANSACTIONAL_HIGH",
created_at=datetime.now(timezone.utc),
)
action_id = await queue.enqueue(action) # returns the assigned id
# Block until decided (or timeout)
final = await queue.wait_for_decision(action_id, timeout_seconds=300.0)
if final.status.value == "APPROVED":
result = await tool(...)
elif final.status.value == "DENIED":
raise RuntimeError(f"Operator denied: {final.denial_reason}")
else: # TIMED_OUT
raise TimeoutError("No operator decision within 5 minutes")wait_for_decision uses an asyncio.Event keyed on action_id — approve() and deny() set the event, so the agent unblocks with zero latency rather than waiting for a poll interval. On timeout the row is marked TIMED_OUT in the same transaction.
Decide out-of-band
Your approver UI / Slack handler / on-call dashboard calls approve() or deny():
# Approve
ok = await queue.approve("act_a3f9...", decided_by="alice@example.com")
# Deny with a reason that propagates back to the agent
ok = await queue.deny(
"act_a3f9...",
decided_by="alice@example.com",
reason="Refund exceeds quarterly budget — escalate to manager",
)Both return True only when the row was PENDING. A False means "already decided or expired" — fine to surface as a no-op in your UI.
List pending work
pending = await queue.get_pending() # ordered by created_at ASC
for action in pending:
print(f"{action.id} {action.tool_name} {action.risk_tier} "
f"queued {action.created_at}")Use this for the operator dashboard's "queue depth" panel and for the per-action approval card.
Timeouts: explicit vs background
There are two timeout mechanisms; both default to 300 seconds (5 minutes) so a wait and the sweep agree on the cutoff.
Per-call timeout — wait_for_decision(timeout_seconds=...)
The caller's wait_for_decision blocks on its own asyncio.Event. On timeout, that single row is marked TIMED_OUT in the DB with decided_by="system", denial_reason="Approval timeout exceeded", and the call returns the now-TIMED_OUT action.
Background sweep — expire_old() / start_expiry_task()
Catches rows whose wait_for_decision caller died, was killed, or never started — without a sweep, those would stay PENDING forever and pollute the operator dashboard.
expiry_task = await queue.start_expiry_task(
check_interval_seconds=30.0,
action_timeout_seconds=300,
)
# ... on shutdown:
expiry_task.cancel()The returned asyncio.Task is the loop; cancel it on shutdown.
expire_old() also sets the in-process asyncio.Event for any expired actions, so a wait_for_decision caller that's still running wakes up immediately rather than burning to its own timeout.
Failure-mode handling in the expiry task
The background loop is built defensively because a silently broken expiry task means HITL timeouts stop firing, which in turn means queued actions accumulate forever:
sqlite3.DatabaseErroris caught and logged (withconsecutive_failurescount). The loop continues.- After 5 consecutive failures, the loop emits a
CRITICALlog pointing at the audit store / DB connection. The counter resets so the alert fires again on a future burst rather than only once per process lifetime. - Unexpected crash on the task triggers a
CRITICALlog via theadd_done_callbackhook: "HITL expiry task crashed unexpectedly — HITL timeout enforcement is disabled until restart."
Hook your alerting at the CRITICAL level on the kitelogik.anchor.queue logger and you'll get paged before stale queue rows become a real problem.
What persists vs. what's per-process
| Lives in SQLite | Lives in dict[str, asyncio.Event] (per-process) |
|---|---|
The row itself, including final state and decided_by | The wakeup channel for wait_for_decision |
| All 4 statuses survive restart | Lost on restart — a wait that was in flight on the old process won't unblock on the new one |
If a process restarts mid-wait, the new process can get_status() to see the eventual decision, but won't get a push notification through the in-memory event. Your runtime should re-issue wait_for_decision against the same action_id after reconnecting — the timeout-driven path still works.
What you'd build yourself
- Multi-tenant isolation. The bundled queue is single-tenant. Per-tenant queues live in your backend.
- Postgres backend. The default is SQLite + WAL — fine for the throughput HITL implies (operator clicks). High-throughput / HA needs a Postgres-backed
HITLQueueimplementation. - Operator UI. The queue ships an API; the approver tool — Slack handler, internal dashboard, web UI — is yours to build.
- Webhook notifications. No push on
enqueue(); pollget_pending()from your UI, or wire your own webhook layer on top of the queue.
Related
- Risk tiers & HITL — when policies return
requires_hitl=trueand what the runtime does with it - Audit trail export —
hitl_queued,hitl_approved,hitl_denied,hitl_timeoutoutcomes propagate to the audit log agent_lifecycle.rego— the OSS lifecycle policy that produces HITL decisions