Skip to content

HITL queue

When a policy returns requires_hitl=true, the runtime parks the attempted action in HITLQueue — a SQLite-backed async queue — and the agent waits for an out-of-band approve/deny decision. This page covers how the queue actually behaves: the four states, the timeout semantics, the background expiry loop, and the failure modes you need to plan for in production.

The four states

Every queued row moves through ActionStatus:

StateSet by
PENDINGenqueue() — initial state
APPROVEDapprove(action_id, decided_by)
DENIEDdeny(action_id, decided_by, reason)
TIMED_OUTexpire_old() background sweep, or wait_for_decision() timeout

A PENDING row is the only mutable state — approve / deny / expiry all guard with WHERE status = 'PENDING', so a race between two approvers (or an approver + the timeout sweep) can't double-decide a row.

Setup

python
from kitelogik.anchor.queue import HITLQueue

queue = HITLQueue(db_path="hitl.db")    # ":memory:" is rejected
await queue.setup()                      # one-time table creation

In-memory SQLite is explicitly rejected: every async call hops to a fresh thread (asyncio.to_thread), and :memory: is per-connection, so state would silently disappear. Use a real file path; for tests, tempfile.mkdtemp() + "/hitl.db".

The schema runs in journal_mode=WAL for concurrent readers + a single writer.

Enqueue → wait → decide

The full agent-side loop:

python
from kitelogik.anchor.models import PendingAction
from datetime import datetime, timezone

action = PendingAction(
    id="",                                # auto-generated if empty
    session_id=context.session_id,
    tool_name="approve_refund",
    args={"customer_id": "cust_001", "amount": 500},
    risk_tier="TRANSACTIONAL_HIGH",
    created_at=datetime.now(timezone.utc),
)

action_id = await queue.enqueue(action)   # returns the assigned id

# Block until decided (or timeout)
final = await queue.wait_for_decision(action_id, timeout_seconds=300.0)

if final.status.value == "APPROVED":
    result = await tool(...)
elif final.status.value == "DENIED":
    raise RuntimeError(f"Operator denied: {final.denial_reason}")
else:  # TIMED_OUT
    raise TimeoutError("No operator decision within 5 minutes")

wait_for_decision uses an asyncio.Event keyed on action_idapprove() and deny() set the event, so the agent unblocks with zero latency rather than waiting for a poll interval. On timeout the row is marked TIMED_OUT in the same transaction.

Decide out-of-band

Your approver UI / Slack handler / on-call dashboard calls approve() or deny():

python
# Approve
ok = await queue.approve("act_a3f9...", decided_by="alice@example.com")

# Deny with a reason that propagates back to the agent
ok = await queue.deny(
    "act_a3f9...",
    decided_by="alice@example.com",
    reason="Refund exceeds quarterly budget — escalate to manager",
)

Both return True only when the row was PENDING. A False means "already decided or expired" — fine to surface as a no-op in your UI.

List pending work

python
pending = await queue.get_pending()      # ordered by created_at ASC
for action in pending:
    print(f"{action.id}  {action.tool_name}  {action.risk_tier}  "
          f"queued {action.created_at}")

Use this for the operator dashboard's "queue depth" panel and for the per-action approval card.

Timeouts: explicit vs background

There are two timeout mechanisms; both default to 300 seconds (5 minutes) so a wait and the sweep agree on the cutoff.

Per-call timeout — wait_for_decision(timeout_seconds=...)

The caller's wait_for_decision blocks on its own asyncio.Event. On timeout, that single row is marked TIMED_OUT in the DB with decided_by="system", denial_reason="Approval timeout exceeded", and the call returns the now-TIMED_OUT action.

Background sweep — expire_old() / start_expiry_task()

Catches rows whose wait_for_decision caller died, was killed, or never started — without a sweep, those would stay PENDING forever and pollute the operator dashboard.

python
expiry_task = await queue.start_expiry_task(
    check_interval_seconds=30.0,
    action_timeout_seconds=300,
)
# ... on shutdown:
expiry_task.cancel()

The returned asyncio.Task is the loop; cancel it on shutdown.

expire_old() also sets the in-process asyncio.Event for any expired actions, so a wait_for_decision caller that's still running wakes up immediately rather than burning to its own timeout.

Failure-mode handling in the expiry task

The background loop is built defensively because a silently broken expiry task means HITL timeouts stop firing, which in turn means queued actions accumulate forever:

  • sqlite3.DatabaseError is caught and logged (with consecutive_failures count). The loop continues.
  • After 5 consecutive failures, the loop emits a CRITICAL log pointing at the audit store / DB connection. The counter resets so the alert fires again on a future burst rather than only once per process lifetime.
  • Unexpected crash on the task triggers a CRITICAL log via the add_done_callback hook: "HITL expiry task crashed unexpectedly — HITL timeout enforcement is disabled until restart."

Hook your alerting at the CRITICAL level on the kitelogik.anchor.queue logger and you'll get paged before stale queue rows become a real problem.

What persists vs. what's per-process

Lives in SQLiteLives in dict[str, asyncio.Event] (per-process)
The row itself, including final state and decided_byThe wakeup channel for wait_for_decision
All 4 statuses survive restartLost on restart — a wait that was in flight on the old process won't unblock on the new one

If a process restarts mid-wait, the new process can get_status() to see the eventual decision, but won't get a push notification through the in-memory event. Your runtime should re-issue wait_for_decision against the same action_id after reconnecting — the timeout-driven path still works.

What you'd build yourself

  • Multi-tenant isolation. The bundled queue is single-tenant. Per-tenant queues live in your backend.
  • Postgres backend. The default is SQLite + WAL — fine for the throughput HITL implies (operator clicks). High-throughput / HA needs a Postgres-backed HITLQueue implementation.
  • Operator UI. The queue ships an API; the approver tool — Slack handler, internal dashboard, web UI — is yours to build.
  • Webhook notifications. No push on enqueue(); poll get_pending() from your UI, or wire your own webhook layer on top of the queue.
  • Risk tiers & HITL — when policies return requires_hitl=true and what the runtime does with it
  • Audit trail exporthitl_queued, hitl_approved, hitl_denied, hitl_timeout outcomes propagate to the audit log
  • agent_lifecycle.rego — the OSS lifecycle policy that produces HITL decisions

Released under the Apache 2.0 License.