xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs | README

xco.h (54688B)


      1 /*
      2  * xco.h — minimal C11 concurrency library.
      3  *
      4  * Four layers in this header, bottom-up:
      5  *
      6  *   xco_mach_t    Generic resumable function. A value that can be
      7  *                 driven forward one step at a time; each step takes a
      8  *                 uintptr_t in, returns one out, and reports whether
      9  *                 the function suspended or finished. Substrate shared
     10  *                 by stack-switching coroutines (xco) and hand-coded
     11  *                 state machines.
     12  *
     13  *   waiter / event / runtime
     14  *                 Pollable event substrate (poll / unpark) with a
     15  *                 single-threaded FIFO ready queue. Concrete events:
     16  *                 latch, countdown, notify, semaphore/mutex, select,
     17  *                 allof, channel, queue, broadcast, cancel, timer,
     18  *                 pairing-heap timer source, timeout, ticker. All
     19  *                 storage caller-provided, no allocation, no atomics.
     20  *
     21  *   xco_task_t    Lifecycle handle for a running xco_step. Bundles a
     22  *                 done latch (fires with the step's return value) and
     23  *                 a cancel latch (the cooperative wind-down signal).
     24  *                 xco_task_group_t fans these in/out across a dynamic
     25  *                 set of tasks. Storage caller-provided; the step
     26  *                 itself lives wherever the caller put it.
     27  *
     28  *   xco_coro_t    Stack-switching coroutine. xco_coro_t embeds xco_mach_t as
     29  *                 its first member, so a coroutine is one concrete
     30  *                 kind of resumable function: generic code holding an
     31  *                 xco_mach_t* works on coroutines and hand-coded state
     32  *                 machines uniformly. Values pass between caller and
     33  *                 coroutine through a single uintptr_t channel; pack
     34  *                 richer data behind a pointer.
     35  *                 Asymmetric: xco_suspend always returns to the most
     36  *                 recent resumer. Resumes nest like function calls.
     37  *                 Thread portability: the platform switch saves only
     38  *                 callee-saved regs (no TLS register, no signal mask),
     39  *                 so a fully-suspended coroutine could in principle be
     40  *                 resumed on another thread. In practice don't: the
     41  *                 runtime/event/waiter substrate is single-threaded
     42  *                 (no atomics), so a coroutine parked on any event is
     43  *                 tethered to that runtime's thread; and user code
     44  *                 silently rebinds errno / _Thread_local / thread-affine
     45  *                 OS handles to whichever thread resumed it.
     46  *
     47  *   xco_cotask_t  xco specialization of xco_task_t. The xco_trampoline calls
     48  *                 fn(&xt->task, arg) and then xco_task_done with the
     49  *                 return value, so xco_wait_or_cancel-style teardown works
     50  *                 without the user wiring anything.
     51  *
     52  * One-waiter invariant. An xco_step, while suspended, is parked on at most
     53  * one event. Multi-wait is composed in the event graph: build a
     54  * select_event (or any future combinator — all-of, timeout, ...) and
     55  * park on that. The xco_step never sees more than one event directly.
     56  * This is what lets a single xco_waker_t live inline in the xco_step
     57  * and a single next/prev pair serve both event waitlists and the
     58  * runtime ready queue (the two list memberships are disjoint in time).
     59  */
     60 
     61 #ifndef XCO_H
     62 #define XCO_H
     63 
     64 #include <assert.h>
     65 #include <stdbool.h>
     66 #include <stddef.h>
     67 #include <stdint.h>
     68 
     69 /* Provides XCO_SIZE, XCO_ALIGN, XCO_STACK_ALIGN, XCO__CTX_SIZE,
     70  * XCO__CTX_ALIGN; resolved by the build to the platform-specific
     71  * copy via the include path (-Iplatform/$(PLATFORM)). */
     72 #include "xco_platform.h"
     73 
     74 /* ====================================================================
     75  * xco_step — generic resumable function interface.
     76  *
     77  * The first-member convention: embed xco_mach_t as the first field of
     78  * your concrete type so a pointer to your type can be passed wherever
     79  * an xco_mach_t * is expected.
     80  *
     81  *   typedef struct {
     82  *       xco_mach_t base;
     83  *       int     phase;
     84  *       ...
     85  *   } parser_t;
     86  *
     87  *   static xco_step_result_t parser_step(xco_mach_t *s, uintptr_t v) {
     88  *       parser_t *p = (parser_t *)s;
     89  *       switch (p->phase) {
     90  *       case 0: p->phase = 1; return (xco_step_result_t){v + 1, XCO_STEP_SUSPENDED};
     91  *       case 1:               return (xco_step_result_t){v * 2, XCO_STEP_DEAD};
     92  *       }
     93  *       __builtin_unreachable();
     94  *   }
     95  *
     96  *   parser_t p = { .base = {.step = parser_step, .status = XCO_STEP_INIT} };
     97  *   xco_step_result_t r = xco_step(&p.base, 0);
     98  * ==================================================================== */
     99 
    100 typedef enum {
    101     XCO_STEP_INIT,        /* created, never stepped */
    102     XCO_STEP_RUNNING,     /* inside step(), or in an active resume chain */
    103     XCO_STEP_SUSPENDED,   /* yielded; resumable */
    104     XCO_STEP_DEAD,        /* function returned */
    105 } xco_mach_status_t;
    106 
    107 typedef struct {
    108     uintptr_t      value;
    109     xco_mach_status_t status;
    110 } xco_step_result_t;
    111 
    112 typedef struct xco_mach xco_mach_t;
    113 typedef xco_step_result_t (*xco_step_fn)(xco_mach_t *s, uintptr_t value);
    114 
    115 struct xco_mach {
    116     xco_step_fn       step;
    117     xco_mach_status_t status;   /* cached; xco_step() syncs from each result */
    118 };
    119 
    120 /* Drive one step. The wrapper updates s->status from the returned
    121  * result so step implementations only need to populate the result. */
    122 static inline xco_step_result_t xco_step(xco_mach_t *s, uintptr_t value) {
    123     xco_step_result_t r = s->step(s, value);
    124     s->status = r.status;
    125     return r;
    126 }
    127 
    128 static inline xco_mach_status_t xco_mach_status(const xco_mach_t *s) {
    129     return s->status;
    130 }
    131 
    132 /* ====================================================================
    133  * Event substrate.
    134  *
    135  * Fused try-or-park: xco_event_poll(e, &out, w) returns true if the
    136  * event is ready (writes the value to *out, w is NOT parked) and false
    137  * otherwise (parks w on the waitlist iff w is non-NULL). The two
    138  * degenerate forms:
    139  *
    140  *   xco_event_poll(e, &v,  NULL)   — pure try (peek; never parks).
    141  *   xco_event_poll(e, NULL, w)     — park-if-not-ready (out discarded).
    142  *
    143  * Fusing closes the try/park TOCTOU window an MT impl would otherwise
    144  * have to internally re-check, and lets each event arm a waiter
    145  * atomically with its readiness check.
    146  *
    147  * Standard usage from a state machine:
    148  *
    149  *   uintptr_t v;
    150  *   if (xco_event_poll(e, &v, &my_waiter.base)) { ... use v ... }
    151  *   else                                        { return SUSPENDED; }
    152  *
    153  * Standard usage from an xco coroutine wrapper:
    154  *
    155  *   uintptr_t v;
    156  *   xco_waker_t sw;
    157  *   xco_waker_init(&sw, rt, &xco_self()->base);
    158  *   if (!xco_event_poll(e, &v, &sw.base)) {
    159  *       xco_suspend(0);
    160  *       (void)xco_event_poll(e, &v, NULL);  // sticky: now ready
    161  *   }
    162  * ==================================================================== */
    163 
    164 /* ---- Waiter ------------------------------------------------------------ */
    165 
    166 typedef struct xco_waiter xco_waiter_t;
    167 struct xco_waiter {
    168     /* Doubly-linked while parked on an event waitlist, so unpark is
    169      * O(1). Reused as the singly-linked next pointer while on the
    170      * runtime ready queue (FIFO, no removal from middle); prev is
    171      * undefined in that state and reset on the next park. */
    172     xco_waiter_t *next;
    173     xco_waiter_t *prev;
    174     /* Fire callback. value is the event's payload at fire time — sticky
    175      * events also store it on themselves, transient events (channels,
    176      * one-shot signals) deliver only here. Waiters that don't care
    177      * about the value just ignore the parameter.
    178      *
    179      * Invoke via xco_waiter_fire (below), not directly: the helper enforces
    180      * the "fire receives a fully detached waiter" contract that makes it
    181      * safe to re-park inside the callback. */
    182     void (*fire)(xco_waiter_t *w, uintptr_t value);
    183 };
    184 
    185 /* Canonical way to invoke a waiter's fire callback. Hands the callback a
    186  * fully detached waiter so the callback (or whatever the resumed step
    187  * does) can re-park on a fresh waitlist without colliding with stale
    188  * link state. Detachers that lead into fire (queue pops, latch drains,
    189  * etc.) don't need to clear prev/next themselves. */
    190 static inline void xco_waiter_fire(xco_waiter_t *w, uintptr_t value) {
    191     w->prev = NULL;
    192     w->next = NULL;
    193     w->fire(w, value);
    194 }
    195 
    196 /* ---- Event ------------------------------------------------------------ */
    197 
    198 typedef struct xco_event xco_event_t;
    199 
    200 typedef struct {
    201     /* Fused try + park. If the event is ready, write its value to *out
    202      * (when out != NULL), do NOT park w, and return true. Otherwise, if
    203      * w != NULL park it on the waitlist and return false; if w == NULL
    204      * just return false without parking. *out is left untouched on the
    205      * not-ready path. */
    206     bool (*poll)(xco_event_t *e, uintptr_t *out, xco_waiter_t *w);
    207     /* Remove w from the waitlist. Idempotent: no-op if not parked. */
    208     void (*unpark)(xco_event_t *e, xco_waiter_t *w);
    209 } xco_event_vtable_t;
    210 
    211 struct xco_event { const xco_event_vtable_t *vt; };
    212 
    213 static inline bool xco_event_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) {
    214     return e->vt->poll(e, out, w);
    215 }
    216 static inline void xco_event_unpark(xco_event_t *e, xco_waiter_t *w) { e->vt->unpark(e, w); }
    217 
    218 /* ---- Runtime ---------------------------------------------------------- */
    219 
    220 /* Forward-declared: the optional timer source attached to the runtime.
    221  * Defined in the timer section below. */
    222 typedef struct xco_timers xco_timers_t;
    223 
    224 typedef struct xco_op xco_op_t;
    225 
    226 typedef struct xco_runtime {
    227     xco_waiter_t  *head, *tail;
    228     xco_timers_t *timers;        /* optional; advanced inside xco_rt_run */
    229     /* Pending ops list (xco_op layer). The runtime never inspects ops; it
    230      * only owns this list-head pair so the host can pull a batch via
    231      * xco_rt_take_ops. op_epoch is bumped each take and recorded on each
    232      * submit; PENDING vs IN_FLIGHT is a generation match (see xco_op). */
    233     xco_op_t     *op_head, *op_tail;
    234     uint64_t      op_epoch;
    235 } xco_runtime_t;
    236 
    237 static inline void xco_rt_init(xco_runtime_t *rt) {
    238     rt->head     = rt->tail = NULL;
    239     rt->timers   = NULL;
    240     rt->op_head  = rt->op_tail = NULL;
    241     rt->op_epoch = 0;
    242 }
    243 
    244 /* Attach (or detach with NULL) a timer source. While attached, xco_rt_run
    245  * advances it each pass with the now value the caller supplied; firing
    246  * timers may enqueue more waiters, which the same xco_rt_run call then drains. */
    247 static inline void xco_rt_attach_timers(xco_runtime_t *rt, xco_timers_t *ts) {
    248     rt->timers = ts;
    249 }
    250 
    251 /* Append w to the ready queue. Used by xco__waker_fire and by anyone
    252  * else that wants a waiter resumed by the scheduler. */
    253 static inline void xco_rt_enqueue(xco_runtime_t *rt, xco_waiter_t *w) {
    254     w->next = NULL;
    255     if (rt->tail) rt->tail->next = w;
    256     else          rt->head       = w;
    257     rt->tail = w;
    258 }
    259 
    260 /* Drain the ready queue, resuming each waker's xco_step, until empty.
    261  * Steps may re-arm on events (and thus leave the queue) or enqueue
    262  * other steps; xco_rt_run keeps going until quiescent. now is forwarded to
    263  * any attached timer source's advance(); pass 0 (or anything) when no
    264  * source is attached. The library never reads a clock — now is always
    265  * caller-supplied. */
    266 void xco_rt_run(xco_runtime_t *rt, uint64_t now);
    267 
    268 /* The canonical bridge between events and the scheduler. When fired,
    269  * stashes the value and enqueues itself onto rt; xco_rt_run pops it and
    270  * calls xco_step(mach, value), so the resumed step receives the event's
    271  * payload directly without a re-try.
    272  *
    273  * Init once, re-park freely. The runtime hands the waiter back fully
    274  * detached (next/prev cleared) before invoking the resumed step, so a
    275  * subscriber that wants to wait on the next event can call xco_event_poll
    276  * directly — no re-init needed unless rt or step changes. */
    277 typedef struct {
    278     xco_waiter_t    base;
    279     xco_runtime_t *rt;
    280     xco_mach_t   *mach;
    281     uintptr_t  resume_value;     /* set by fire, consumed by xco_rt_run */
    282 } xco_waker_t;
    283 
    284 /* Defined in xco.c; declared here so xco_waker_init can install it. */
    285 void xco__waker_fire(xco_waiter_t *w, uintptr_t value);
    286 
    287 static inline void xco_waker_init(xco_waker_t *sw, xco_runtime_t *rt, xco_mach_t *m) {
    288     sw->base.next     = NULL;
    289     sw->base.prev     = NULL;
    290     sw->base.fire     = xco__waker_fire;
    291     sw->rt            = rt;
    292     sw->mach          = m;
    293     sw->resume_value  = 0;
    294 }
    295 
    296 /* ---- Latch ------------------------------------------------------------ */
    297 
    298 /* One-shot sticky event. set() flips the bit, stores the payload, and
    299  * fires every waiter. Subsequent set() calls are ignored. To re-arm,
    300  * reinitialize a fresh latch. */
    301 typedef struct {
    302     xco_event_t   base;
    303     bool      set;
    304     uintptr_t value;
    305     xco_waiter_t  *waiters;
    306 } xco_latch_t;
    307 
    308 /* Defined in xco.c; referenced by xco_latch_init. */
    309 extern const xco_event_vtable_t xco__latch_vt;
    310 
    311 static inline void xco_latch_init(xco_latch_t *l) {
    312     l->base.vt = &xco__latch_vt;
    313     l->set     = false;
    314     l->value   = 0;
    315     l->waiters = NULL;
    316 }
    317 
    318 void xco_latch_set(xco_latch_t *l, uintptr_t value);
    319 
    320 /* ---- Countdown -------------------------------------------------------- */
    321 
    322 /* One-shot fan-in counter. Fires its embedded latch (payload 0) when
    323  * remaining hits 0. xco_countdown_add(n) is legal while remaining > 0;
    324  * xco_countdown_done decrements; both are UB once the latch has fired.
    325  *
    326  * Compose with the standard event API via xco_countdown_event(). */
    327 typedef struct xco_countdown {
    328     xco_latch_t done;
    329     size_t  remaining;
    330 } xco_countdown_t;
    331 
    332 static inline void xco_countdown_init(xco_countdown_t *c, size_t n) {
    333     xco_latch_init(&c->done);
    334     c->remaining = n;
    335     if (n == 0) xco_latch_set(&c->done, 0);
    336 }
    337 
    338 static inline void xco_countdown_add(xco_countdown_t *c, size_t n) {
    339     assert(!c->done.set);
    340     c->remaining += n;
    341 }
    342 
    343 static inline void xco_countdown_done(xco_countdown_t *c) {
    344     assert(c->remaining > 0);
    345     if (--c->remaining == 0) xco_latch_set(&c->done, 0);
    346 }
    347 
    348 static inline xco_event_t *xco_countdown_event(xco_countdown_t *c) { return &c->done.base; }
    349 static inline bool     xco_countdown_fired(const xco_countdown_t *c) { return c->done.set; }
    350 
    351 /* ---- Notify (wake-one / wake-all) ------------------------------------- */
    352 
    353 /* Transient signal with no sticky state. xco_notify_one fires (and detaches)
    354  * the head of a FIFO waitlist; xco_notify_all fires every parked waiter. Both
    355  * are no-ops when the waitlist is empty. Subscribers must re-park to see
    356  * subsequent notifications.
    357  *
    358  * xco_event_poll never reports ready: there is no "ready now" state — a
    359  * subscriber waits for the *next* notify. */
    360 typedef struct xco_notify {
    361     xco_event_t  base;
    362     xco_waiter_t *head, *tail;
    363 } xco_notify_t;
    364 
    365 extern const xco_event_vtable_t xco__notify_vt;
    366 
    367 static inline void xco_notify_init(xco_notify_t *n) {
    368     n->base.vt = &xco__notify_vt;
    369     n->head    = n->tail = NULL;
    370 }
    371 
    372 static inline xco_event_t *xco_notify_event(xco_notify_t *n) { return &n->base; }
    373 
    374 void xco_notify_one(xco_notify_t *n);
    375 void xco_notify_all(xco_notify_t *n);
    376 
    377 /* ---- Semaphore -------------------------------------------------------- */
    378 
    379 /* Counting semaphore. acquire is exposed as xco_event_t (composable with
    380  * select / xco_wait_or_cancel): xco_event_poll succeeds and decrements when
    381  * permits > 0; otherwise the waiter parks FIFO. xco_semaphore_release(n)
    382  * hands one permit to each of up to n waiting waiters (each is fired,
    383  * which the receiver treats as "you got a permit") before adding any
    384  * leftover to the count.
    385  *
    386  * One permit per acquire. Bulk acquire isn't expressible in xco_event_t's
    387  * shape; if you need it, call sequentially. For binary use (mutex-style
    388  * critical section across awaits) init with permits = 1.
    389  *
    390  * Fairness: FIFO at the waitlist. A racing inline xco_event_poll by a fresh
    391  * caller can jump ahead of parked waiters when permits are released
    392  * back to count rather than directly handed off — release prefers
    393  * parked waiters first to avoid that. */
    394 typedef struct xco_semaphore {
    395     xco_event_t  acquire;
    396     size_t   permits;
    397     xco_waiter_t *head, *tail;
    398 } xco_semaphore_t;
    399 
    400 extern const xco_event_vtable_t xco__semaphore_acquire_vt;
    401 
    402 static inline void xco_semaphore_init(xco_semaphore_t *s, size_t initial) {
    403     s->acquire.vt = &xco__semaphore_acquire_vt;
    404     s->permits    = initial;
    405     s->head       = s->tail = NULL;
    406 }
    407 
    408 static inline xco_event_t *xco_semaphore_event(xco_semaphore_t *s) { return &s->acquire; }
    409 
    410 void xco_semaphore_release(xco_semaphore_t *s, size_t n);
    411 
    412 /* ---- Mutex ------------------------------------------------------------ */
    413 
    414 /* Binary semaphore wrapper for vocabulary at call sites. xco_mutex_init is
    415  * xco_semaphore_init(s, 1); the xco_event_t fires once per release; xco_mutex_release
    416  * hands the permit to the next waiter (or returns it to the count). */
    417 typedef xco_semaphore_t xco_mutex_t;
    418 
    419 static inline void     xco_mutex_init   (xco_mutex_t *m) { xco_semaphore_init(m, 1); }
    420 static inline xco_event_t *xco_mutex_event  (xco_mutex_t *m) { return xco_semaphore_event(m); }
    421 static inline void     xco_mutex_release(xco_mutex_t *m) { xco_semaphore_release(m, 1); }
    422 
    423 /* ---- Select / all-of -------------------------------------------------- */
    424 
    425 /* Wait over N input events. Two semantics share the same storage shape,
    426  * so a caller can switch between them by changing only the init call:
    427  *
    428  *   xco_select_event_init   fires when ANY input fires  (any-of)
    429  *   xco_allof_event_init    fires when ALL inputs fire  (all-of)
    430  *
    431  * In both cases done's payload is the index of the input whose firing
    432  * closed the wait — the winner for select, the last-to-fire for allof —
    433  * and inputs[i].value carries each fired input's payload (works
    434  * uniformly for sticky and transient sources, where re-trying the input
    435  * would either succeed or fail).
    436  *
    437  * Composes: a select_event is itself an event. */
    438 
    439 typedef struct xco_select_event xco_select_event_t;
    440 
    441 /* Per-input arming record. Caller-allocated as an array of n alongside
    442  * the select_event. After fire, .value holds whatever the input
    443  * delivered; other fields are internal. */
    444 typedef struct {
    445     xco_waiter_t         w;
    446     xco_event_t        *src;
    447     xco_select_event_t *parent;
    448     uintptr_t       value;       /* captured at fire time */
    449 } xco_select_input_t;
    450 
    451 struct xco_select_event {
    452     xco_latch_t         done;       /* fires with the closing input's index */
    453     xco_select_input_t *inputs;
    454     size_t          n;
    455     size_t          remaining;  /* counts down; done fires at 0
    456                                  * (select: starts at 1, allof: at n) */
    457 };
    458 
    459 /* Initialize as a select (any-of). inputs[] is caller-provided storage
    460  * for n nodes; srcs[] is the array of n input event pointers (read
    461  * once during init). If any input is already ready, the select fires
    462  * immediately and no waiters are parked. Use &s->done.base as the
    463  * resulting xco_event_t. */
    464 void xco_select_event_init(xco_select_event_t *s,
    465                        xco_select_input_t *inputs, size_t n,
    466                        xco_event_t *const *srcs);
    467 
    468 /* Initialize as an allof (all-of). Inputs already ready at init are
    469  * consumed inline (no parking, value captured); if every input is
    470  * ready, done fires immediately. n == 0 fires done with payload 0. */
    471 void xco_allof_event_init(xco_select_event_t *s,
    472                       xco_select_input_t *inputs, size_t n,
    473                       xco_event_t *const *srcs);
    474 
    475 /* Disarm any inputs still parked. Safe to call after fire (no-op) and
    476  * after partial completion (allof). Required before s leaves scope if
    477  * it has not yet fired. */
    478 void xco_select_event_deinit(xco_select_event_t *s);
    479 
    480 
    481 /* ---- Queue ------------------------------------------------------------ */
    482 
    483 /* Bounded FIFO of uintptr_t. Caller provides the ring buffer storage.
    484  * Recv side is exposed as xco_event_t (composable with select). Send side
    485  * is a typed API (carries a value), shaped after the event-poll fusion:
    486  * fused try + park, NULL-waiter degenerates to pure-try.
    487  *
    488  * Three full-buffer policies, fixed at init:
    489  *   XCO_QUEUE_BLOCK         senders park until a receiver makes room.
    490  *   XCO_QUEUE_DROP_NEWEST   xco_queue_send_poll silently discards the new value.
    491  *   XCO_QUEUE_DROP_OLDEST   xco_queue_send_poll evicts the head and pushes new tail.
    492  *
    493  * Senders never park under DROP_* policies — passing a non-NULL qsw is
    494  * only meaningful under XCO_QUEUE_BLOCK. xco_queue_send_unpark is
    495  * idempotent (cancellation-safe).
    496  *
    497  * Direct-handoff: xco_queue_send_poll first checks for a parked receiver and
    498  * delivers inline if present (payload bypasses the buffer), regardless
    499  * of policy.
    500  *
    501  * Rendezvous matrix (BLOCK + cap=0; the xco_chan_* aliases at the bottom
    502  * of this section name this configuration explicitly):
    503  *   send + parked recv    fire recv with value, sender continues inline.
    504  *   send + no recv        sender parks (xco_queue_send_poll); peer pulls later.
    505  *   recv + parked sender  read sender's value, fire sender (delivery
    506  *                         confirmation), receiver continues inline.
    507  *   recv + no sender      receiver parks (xco_event_poll on recv); peer
    508  *                         delivers later.
    509  *
    510  * FIFO order on both waitlists.
    511  *
    512  * Close: optional EOF semantics. After xco_queue_close, xco_queue_send_poll
    513  * returns XCO_QSEND_CLOSED regardless of policy (no delivery), parked
    514  * senders are drained with delivered=false, and parked receivers are
    515  * woken so they can observe XCO_RECV_CLOSED via xco_queue_recv. The recv
    516  * event is "ready" iff a value is available OR the queue is closed —
    517  * receivers must call xco_queue_recv to disambiguate value vs EOF. */
    518 
    519 /* Result of a typed receive on a queue (or chan, which is just a queue). */
    520 typedef enum {
    521     XCO_RECV_GOT,        /* *out holds the delivered value */
    522     XCO_RECV_EMPTY,      /* nothing available right now; caller may park */
    523     XCO_RECV_CLOSED,     /* peer closed and no values remain */
    524 } xco_recv_status_t;
    525 
    526 typedef enum {
    527     XCO_QUEUE_BLOCK,
    528     XCO_QUEUE_DROP_NEWEST,
    529     XCO_QUEUE_DROP_OLDEST,
    530 } xco_queue_policy_t;
    531 
    532 typedef struct xco_queue {
    533     xco_event_t        recv;
    534     uintptr_t     *buf;
    535     size_t         cap, head, len;
    536     xco_queue_policy_t policy;
    537     xco_waiter_t       *send_head, *send_tail;
    538     xco_waiter_t       *recv_head, *recv_tail;
    539     bool           closed;
    540 } xco_queue_t;
    541 
    542 extern const xco_event_vtable_t xco__queue_recv_vt;
    543 
    544 static inline void xco_queue_init(xco_queue_t *q, uintptr_t *buf, size_t cap,
    545                               xco_queue_policy_t policy) {
    546     q->recv.vt    = &xco__queue_recv_vt;
    547     q->buf        = buf;
    548     q->cap        = cap;
    549     q->head       = 0;
    550     q->len        = 0;
    551     q->policy     = policy;
    552     q->send_head  = q->send_tail = NULL;
    553     q->recv_head  = q->recv_tail = NULL;
    554     q->closed     = false;
    555 }
    556 
    557 static inline xco_event_t *xco_queue_recv_event(xco_queue_t *q) { return &q->recv; }
    558 
    559 /* Sender-side waiter for XCO_QUEUE_BLOCK. Same shape as xco_chan_send_waiter_t:
    560  * a waker plus a value slot the receiver / close-drain reads back on the
    561  * park path. `delivered` is set by the closing side: true on a normal
    562  * handoff, false on a close drain. */
    563 typedef struct {
    564     xco_waker_t  sw;
    565     uintptr_t    value;       /* set by xco_queue_send_poll on the park path */
    566     bool         delivered;
    567 } xco_queue_send_waiter_t;
    568 
    569 static inline void xco_queue_send_waiter_init(xco_queue_send_waiter_t *qsw,
    570                                               xco_runtime_t *rt, xco_mach_t *m) {
    571     xco_waker_init(&qsw->sw, rt, m);
    572     qsw->value     = 0;
    573     qsw->delivered = false;
    574 }
    575 
    576 /* Result of xco_queue_send_poll. */
    577 typedef enum {
    578     XCO_QSEND_ACCEPTED,   /* delivered to a parked receiver, buffered, or
    579                              accepted-by-policy (silently dropped under
    580                              DROP_NEWEST, evicted-and-pushed under DROP_OLDEST) */
    581     XCO_QSEND_BLOCKED,    /* BLOCK + full; parked iff qsw != NULL */
    582     XCO_QSEND_CLOSED,     /* queue closed; never parks. Returned regardless
    583                              of policy — closed is closed. */
    584 } xco_queue_send_status_t;
    585 
    586 /* Fused try + park for a sender. Direct-delivers to a parked receiver if
    587  * one is waiting (returns XCO_QSEND_ACCEPTED, qsw not parked). Otherwise:
    588  *
    589  *   XCO_QUEUE_BLOCK + room       buffered → XCO_QSEND_ACCEPTED.
    590  *   XCO_QUEUE_BLOCK + full       XCO_QSEND_BLOCKED; if qsw != NULL the
    591  *                                sender's value is stashed in qsw->value
    592  *                                and qsw is parked.
    593  *   XCO_QUEUE_DROP_NEWEST + full silently drops → XCO_QSEND_ACCEPTED.
    594  *   XCO_QUEUE_DROP_OLDEST + full evicts head, pushes new tail → ACCEPTED.
    595  *   closed (any policy)          XCO_QSEND_CLOSED; never parks.
    596  *
    597  * The two degenerate forms mirror xco_event_poll:
    598  *   xco_queue_send_poll(q, v, NULL)   — pure try (peek; never parks).
    599  *   xco_queue_send_poll(q, v, qsw)    — fused try-or-park (BLOCK only). */
    600 xco_queue_send_status_t xco_queue_send_poll(xco_queue_t *q, uintptr_t value,
    601                                             xco_queue_send_waiter_t *qsw);
    602 
    603 void xco_queue_send_unpark(xco_queue_t *q, xco_queue_send_waiter_t *qsw);
    604 
    605 /* Typed receive. Disambiguates value vs EOF where xco_event_poll cannot:
    606  * returns XCO_RECV_GOT (value popped from the buffer or directly from a
    607  * parked sender), XCO_RECV_CLOSED (closed and drained), or
    608  * XCO_RECV_EMPTY (caller may park). */
    609 xco_recv_status_t xco_queue_recv(xco_queue_t *q, uintptr_t *out);
    610 
    611 /* Close the queue. Idempotent. Drains parked senders (delivered=false)
    612  * and wakes parked receivers. After close, xco_queue_send_poll returns
    613  * XCO_QSEND_CLOSED regardless of policy. */
    614 void xco_queue_close(xco_queue_t *q);
    615 static inline bool xco_queue_is_closed(const xco_queue_t *q) { return q->closed; }
    616 
    617 /* Selectable send op. A per-call object that holds the value, parks on
    618  * the queue (only meaningful under XCO_QUEUE_BLOCK), and exposes
    619  * &op->done.base as the event that fires when the send resolves.
    620  *
    621  * The op embeds a xco_queue_send_waiter_t (so the queue's send list stays
    622  * uniform — receivers read .value at the same offset for both direct
    623  * and op senders) but rewires its fire callback: instead of resuming an
    624  * xco_step, fire sets op->done. Polymorphism via the function pointer.
    625  *
    626  * The done latch's payload is 1 on XCO_QSEND_ACCEPTED (handed to a
    627  * receiver, buffered, or accepted under DROP_*) and 0 on
    628  * XCO_QSEND_CLOSED / close-drain.
    629  *
    630  * Under DROP_* policies the send always resolves inline at init (the
    631  * poll returns ACCEPTED and op->done fires immediately).
    632  *
    633  * Lifecycle: init → wait on &op->done.base → deinit. Always deinit;
    634  * it's a no-op after resolution and unparks the queue-side waiter if not. */
    635 typedef struct {
    636     xco_queue_send_waiter_t qsw;        /* parked on queue; fire overridden */
    637     xco_queue_t           *queue;
    638     xco_latch_t            done;
    639 } xco_queue_send_op_t;
    640 
    641 void xco__queue_send_op_fire(xco_waiter_t *w, uintptr_t value);
    642 
    643 void xco_queue_send_op_init(xco_queue_send_op_t *op, xco_queue_t *q, uintptr_t value);
    644 static inline void xco_queue_send_op_deinit(xco_queue_send_op_t *op) {
    645     if (op->done.set) return;
    646     xco_queue_send_unpark(op->queue, &op->qsw);
    647 }
    648 
    649 /* ---- Channel (alias) -------------------------------------------------- */
    650 
    651 /* Unbuffered rendezvous channel: a queue at cap=0 with XCO_QUEUE_BLOCK
    652  * policy. Senders and receivers wait on each other; whichever arrives
    653  * first parks until its peer shows up. The pending value lives in the
    654  * sender's xco_chan_send_waiter_t for the duration of any wait — no
    655  * per-channel buffer storage.
    656  *
    657  * The xco_chan_* names are thin aliases over the queue API: a chan IS a
    658  * queue. They exist so call sites can name "rendezvous" explicitly
    659  * rather than spelling out cap=0+BLOCK. The queue's recv event, send
    660  * poll, close, recv, and selectable send op all carry over unchanged.
    661  *
    662  * Storage: an xco_chan_t carries the queue's buffer/cap/policy fields
    663  * even though they're inert at cap=0 (~40 bytes of overhead vs a
    664  * dedicated rendezvous struct). Below the noise floor for typical use. */
    665 
    666 typedef xco_queue_t              xco_chan_t;
    667 typedef xco_queue_send_waiter_t  xco_chan_send_waiter_t;
    668 typedef xco_queue_send_status_t  xco_chan_send_status_t;
    669 typedef xco_queue_send_op_t      xco_chan_send_op_t;
    670 
    671 /* Status aliases. Same enumerators, vocabulary at call sites: a chan
    672  * "delivers" rather than "accepts" — but the underlying constants are
    673  * the queue's. */
    674 #define XCO_SEND_DELIVERED  XCO_QSEND_ACCEPTED
    675 #define XCO_SEND_BLOCKED    XCO_QSEND_BLOCKED
    676 #define XCO_SEND_CLOSED     XCO_QSEND_CLOSED
    677 
    678 static inline void xco_chan_init(xco_chan_t *c) {
    679     xco_queue_init(c, NULL, 0, XCO_QUEUE_BLOCK);
    680 }
    681 static inline xco_event_t *xco_chan_recv_event(xco_chan_t *c) {
    682     return xco_queue_recv_event(c);
    683 }
    684 static inline void xco_chan_send_waiter_init(xco_chan_send_waiter_t *csw,
    685                                              xco_runtime_t *rt, xco_mach_t *m) {
    686     xco_queue_send_waiter_init(csw, rt, m);
    687 }
    688 static inline xco_chan_send_status_t xco_chan_send_poll(xco_chan_t *c, uintptr_t value,
    689                                                         xco_chan_send_waiter_t *csw) {
    690     return xco_queue_send_poll(c, value, csw);
    691 }
    692 static inline void xco_chan_send_unpark(xco_chan_t *c, xco_chan_send_waiter_t *csw) {
    693     xco_queue_send_unpark(c, csw);
    694 }
    695 static inline xco_recv_status_t xco_chan_recv(xco_chan_t *c, uintptr_t *out) {
    696     return xco_queue_recv(c, out);
    697 }
    698 static inline void xco_chan_close(xco_chan_t *c) { xco_queue_close(c); }
    699 static inline bool xco_chan_is_closed(const xco_chan_t *c) {
    700     return xco_queue_is_closed(c);
    701 }
    702 static inline void xco_chan_send_op_init(xco_chan_send_op_t *op, xco_chan_t *c, uintptr_t value) {
    703     xco_queue_send_op_init(op, c, value);
    704 }
    705 static inline void xco_chan_send_op_deinit(xco_chan_send_op_t *op) {
    706     xco_queue_send_op_deinit(op);
    707 }
    708 
    709 /* ---- Broadcast (slot) ------------------------------------------------- */
    710 
    711 /* Re-armable signal carrying a "latest value" slot. Subscribers park on
    712  * the event; xco_broadcast_publish stores the new value, fires every parked
    713  * subscriber with that value, and clears the waitlist — subscribers must
    714  * re-park to see further publishes. Subscribers that aren't parked at
    715  * publish time miss that publish but will see the next one. This is the
    716  * coalescing "watch a slot" semantics, not lossless fan-out.
    717  *
    718  * xco_event_poll never reports ready: there is no "ready now" state — a
    719  * subscriber waits for the *next* publish. To read the latest published
    720  * value at any time, use xco_broadcast_value (valid once xco_broadcast_has_value
    721  * returns true).
    722  *
    723  * For lossless multi-consumer delivery, give each subscriber its own
    724  * queue and have the producer write to all of them. */
    725 
    726 typedef struct xco_broadcast {
    727     xco_event_t   base;
    728     bool      has_value;
    729     uintptr_t value;
    730     xco_waiter_t  *waiters;
    731 } xco_broadcast_t;
    732 
    733 extern const xco_event_vtable_t xco__broadcast_vt;
    734 
    735 static inline void xco_broadcast_init(xco_broadcast_t *b) {
    736     b->base.vt   = &xco__broadcast_vt;
    737     b->has_value = false;
    738     b->value     = 0;
    739     b->waiters   = NULL;
    740 }
    741 
    742 static inline xco_event_t  *xco_broadcast_event    (xco_broadcast_t *b)       { return &b->base; }
    743 static inline bool      xco_broadcast_has_value(const xco_broadcast_t *b) { return b->has_value; }
    744 static inline uintptr_t xco_broadcast_value    (const xco_broadcast_t *b) { return b->value; }
    745 
    746 void xco_broadcast_publish(xco_broadcast_t *b, uintptr_t value);
    747 
    748 /* ---- Cancellation ----------------------------------------------------- */
    749 
    750 /* A cancellation token is a sticky latch — these aliases exist for
    751  * vocabulary at call sites. xco_cancel_set fires every parked waiter; the
    752  * idempotency of xco_latch_set means racing cancellers are fine.
    753  *
    754  * Pair a xco_cancel_t with any blocking event via xco_wait_or_cancel to get
    755  * "await X, or be cancelled."
    756  *
    757  * Discipline: cancellation notifies; it never drops in-flight values.
    758  * A cancelled await returns control to its caller, which is responsible
    759  * for draining whatever it owns — deinit a pending chan_send_op so its
    760  * value goes back to the sender, deinit a select_event so input waiters
    761  * detach, drive a cancellable coroutine to XCO_STEP_DEAD before freeing
    762  * its stack. The xco layer does no unwinding; the coroutine cooperates. */
    763 
    764 typedef xco_latch_t xco_cancel_t;
    765 
    766 static inline void     xco_cancel_init(xco_cancel_t *c)         { xco_latch_init(c); }
    767 static inline void     xco_cancel_set(xco_cancel_t *c)          { xco_latch_set(c, 0); }
    768 static inline bool     xco_cancel_is_set(const xco_cancel_t *c) { return c->set; }
    769 static inline xco_event_t *xco_cancel_event(xco_cancel_t *c)        { return &c->base; }
    770 
    771 /* Outcome indices for xco_wait_or_cancel — match the inputs[] order so the
    772  * value the resumer receives from the latched select reads as one of
    773  * these directly. */
    774 enum {
    775     XCO_WAIT_OK        = 0,   /* ev fired; inputs[0].value holds its payload */
    776     XCO_WAIT_CANCELLED = 1,   /* cancel fired before ev */
    777 };
    778 
    779 /* Build a select over (ev, cancel) using caller-provided storage. If
    780  * either is already ready at init the select fast-paths and never parks
    781  * anyone (ev is checked first, so an event that has already resolved
    782  * wins over a concurrent cancel). Treat &out->done.base as the event
    783  * to wait on. Always pair with xco_select_event_deinit before storage
    784  * leaves scope (no-op once fired). */
    785 static inline void xco_wait_or_cancel(xco_select_event_t *out,
    786                                   xco_select_input_t inputs[2],
    787                                   xco_event_t *ev, xco_cancel_t *c) {
    788     xco_event_t *srcs[2] = {ev, xco_cancel_event(c)};
    789     xco_select_event_init(out, inputs, 2, srcs);
    790 }
    791 
    792 /* ---- Timers ----------------------------------------------------------- */
    793 
    794 /* A timer is a sticky event keyed on a u64 deadline. It fires (exactly
    795  * once) when the attached timer source is advanced past that deadline.
    796  * The library never reads a clock; the caller provides `now` to
    797  * xco_timers_advance (or via xco_rt_run).
    798  *
    799  * Storage is pluggable through the timers vtable so callers can swap a
    800  * pairing heap (in-tree, O(log n) amortized everywhere including cancel)
    801  * for a wheel or other structure without touching the timer/timeout
    802  * surface. The timer struct holds the heap link fields inline; the source
    803  * impl interprets them.
    804  *
    805  * Lifecycle:
    806  *   xco_timer_init(t, ts, deadline)  // inserts into ts
    807  *   ... wait on xco_timer_event(t), or compose into select/xco_wait_or_cancel ...
    808  *   xco_timer_deinit(t)              // removes from ts if not yet fired
    809  *
    810  * Fire payload is the deadline. Re-arming = reinit a fresh timer. */
    811 
    812 typedef struct xco_timer xco_timer_t;
    813 
    814 typedef struct {
    815     /* Insert t into the source. t must be initialized but not yet
    816      * inserted; insert sets t's heap link fields. */
    817     void (*insert) (xco_timers_t *ts, xco_timer_t *t);
    818     /* Remove t from the source if currently inserted. Caller must
    819      * ensure t was inserted into this same source. */
    820     void (*cancel) (xco_timers_t *ts, xco_timer_t *t);
    821     /* Fire every timer whose deadline <= now, in deadline order, popping
    822      * each from the source. Each fire drains the timer's waiter list. */
    823     void (*advance)(xco_timers_t *ts, uint64_t now);
    824     /* Return the earliest queued deadline, or UINT64_MAX if no timer
    825      * is queued. */
    826     uint64_t (*peek)(const xco_timers_t *ts);
    827 } xco_timers_vtable_t;
    828 
    829 struct xco_timers {
    830     const xco_timers_vtable_t *vt;
    831     uint64_t now;       /* most recent advance() input; monotonic */
    832 };
    833 
    834 static inline void     xco_timers_insert (xco_timers_t *ts, xco_timer_t *t)   { ts->vt->insert (ts, t); }
    835 static inline void     xco_timers_cancel (xco_timers_t *ts, xco_timer_t *t)   { ts->vt->cancel (ts, t); }
    836 static inline void     xco_timers_advance(xco_timers_t *ts, uint64_t now) {
    837     assert(now >= ts->now);
    838     ts->now = now;
    839     ts->vt->advance(ts, now);
    840 }
    841 static inline uint64_t xco_timers_peek   (const xco_timers_t *ts)             { return ts->vt->peek(ts); }
    842 static inline uint64_t xco_timers_now    (const xco_timers_t *ts)             { return ts->now; }
    843 
    844 /* Concrete timer. Embeds a latch so try/park/unpark and the fire-all
    845  * waitlist handling come for free; the timer source manipulates only
    846  * the heap link fields and triggers the latch on fire. The latch's
    847  * payload after fire is the timer's deadline. */
    848 struct xco_timer {
    849     xco_latch_t   done;            /* fires once, payload = deadline */
    850     uint64_t  deadline;
    851     xco_timers_t *src;             /* source this timer is registered with */
    852     bool      in_heap;         /* true between insert and fire/cancel */
    853     /* Pairing-heap link fields; opaque to anyone but the source impl.
    854      * prev is parent if first child, else previous sibling, else NULL. */
    855     xco_timer_t  *child, *prev, *next;
    856 };
    857 
    858 static inline xco_event_t *xco_timer_event(xco_timer_t *t)        { return &t->done.base; }
    859 static inline bool     xco_timer_fired(const xco_timer_t *t)  { return t->done.set; }
    860 
    861 static inline void xco_timer_init(xco_timer_t *t, xco_timers_t *ts, uint64_t deadline) {
    862     xco_latch_init(&t->done);
    863     t->deadline = deadline;
    864     t->src      = ts;
    865     t->in_heap  = false;
    866     t->child    = NULL;
    867     t->prev     = NULL;
    868     t->next     = NULL;
    869     xco_timers_insert(ts, t);
    870 }
    871 
    872 static inline void xco_timer_deinit(xco_timer_t *t) {
    873     if (t->in_heap) xco_timers_cancel(t->src, t);
    874 }
    875 
    876 /* In-tree timer source: intrusive pairing heap. O(1) amortized insert
    877  * and meld; O(log n) amortized advance and cancel. No per-source
    878  * allocation — the heap is just a root pointer; nodes live in the
    879  * caller's xco_timer_t's. */
    880 typedef struct {
    881     xco_timers_t  base;
    882     xco_timer_t  *root;
    883 } xco_pairing_heap_t;
    884 
    885 extern const xco_timers_vtable_t xco__pairing_heap_vt;
    886 
    887 static inline void xco_pairing_heap_init(xco_pairing_heap_t *h) {
    888     h->base.vt  = &xco__pairing_heap_vt;
    889     h->base.now = 0;
    890     h->root     = NULL;
    891 }
    892 
    893 /* ---- Timeout ---------------------------------------------------------- */
    894 
    895 /* Bundle: a timer that fires a xco_cancel_t on expiration. The natural
    896  * pairing for "await ev, or be cancelled by deadline":
    897  *
    898  *   xco_timeout_t to;
    899  *   xco_timeout_init(&to, ts, now + budget);
    900  *   xco_select_event_t sel; xco_select_input_t inputs[2];
    901  *   xco_wait_or_cancel(&sel, inputs, ev, &to.cancel);
    902  *   ... wait on &sel.done.base ...
    903  *   xco_select_event_deinit(&sel);
    904  *   xco_timeout_deinit(&to);   // safe whether the timer fired or not
    905  *
    906  * The bridge waiter is parked on the timer; when it fires it sets the
    907  * cancel. Bridge fire is idempotent vs xco_cancel_set (a sticky latch). */
    908 typedef struct xco_timeout {
    909     xco_timer_t  timer;
    910     xco_cancel_t cancel;
    911     xco_waiter_t  bridge;
    912 } xco_timeout_t;
    913 
    914 void xco_timeout_init(xco_timeout_t *to, xco_timers_t *ts, uint64_t deadline);
    915 static inline void xco_timeout_deinit(xco_timeout_t *to) {
    916     xco_event_unpark(xco_timer_event(&to->timer), &to->bridge);
    917     xco_timer_deinit(&to->timer);
    918 }
    919 
    920 /* ---- Ticker ----------------------------------------------------------- */
    921 
    922 /* Re-armable transient signal driven by a timer source. Each time the
    923  * underlying timer fires, the ticker computes the next deadline (period
    924  * past the just-fired one, with skip-ahead for catch-up after overflow),
    925  * reinstalls the timer, and fires every parked subscriber with the
    926  * just-fired deadline as the payload. Subscribers that aren't parked at
    927  * a fire miss it (transient — same coalescing semantics as broadcast).
    928  *
    929  *   xco_ticker_init(&t, ts, period, first_deadline);
    930  *   ... wait on xco_ticker_event(&t), re-park to see further ticks ...
    931  *   xco_ticker_deinit(&t);   // cancels the in-flight timer
    932  *
    933  * xco_event_poll never reports ready; subscribers wait for the *next* tick. */
    934 typedef struct xco_ticker {
    935     xco_timer_t   timer;
    936     xco_timers_t *src;
    937     uint64_t  period;
    938     xco_event_t   base;
    939     xco_waiter_t  *waiters;
    940     xco_waiter_t   bridge;       /* internal: parks on xco_timer_event */
    941 } xco_ticker_t;
    942 
    943 extern const xco_event_vtable_t xco__ticker_vt;
    944 
    945 void     xco_ticker_init  (xco_ticker_t *t, xco_timers_t *ts,
    946                        uint64_t period, uint64_t first_deadline);
    947 void     xco_ticker_deinit(xco_ticker_t *t);
    948 static inline xco_event_t *xco_ticker_event(xco_ticker_t *t) { return &t->base; }
    949 
    950 /* ---- Task ------------------------------------------------------------- */
    951 
    952 /* Lifecycle handle for a running xco_step. Bundles a done latch (fires when
    953  * the xco_step returns, payload = its return value) with a cancel latch
    954  * (the canonical signal to ask the xco_step to wind down). The xco_step itself
    955  * is caller-allocated; the task holds a pointer to it.
    956  *
    957  * Who fires done:
    958  *   - Hand-coded state machine: call xco_task_done(t, ret) in the same arm
    959  *     that returns XCO_STEP_DEAD.
    960  *   - xco-backed task (see xco_cotask_t below): the xco_trampoline calls
    961  *     xco_task_done automatically with the coroutine's return value.
    962  *
    963  * Cooperation: cancellation only notifies — the xco_step is responsible for
    964  * draining what it owns and reaching XCO_STEP_DEAD. The task's cancel is a
    965  * normal xco_cancel_t, so the xco_step typically composes xco_wait_or_cancel against
    966  * it on every blocking await.
    967  *
    968  * Joining: callers wait on xco_task_done_event with the standard event API
    969  * (try / park, or compose into select / xco_wait_or_cancel). On fire the
    970  * latch's payload is the xco_step's return value. */
    971 
    972 typedef struct xco_task {
    973     xco_mach_t  *mach;
    974     xco_latch_t   done;
    975     xco_cancel_t  cancel;
    976 } xco_task_t;
    977 
    978 static inline void xco_task_init(xco_task_t *t, xco_mach_t *mach) {
    979     t->mach = mach;
    980     xco_latch_init(&t->done);
    981     xco_cancel_init(&t->cancel);
    982 }
    983 
    984 /* Mark the task complete with its return value. Idempotent (xco_latch_set is). */
    985 static inline void xco_task_done(xco_task_t *t, uintptr_t value) {
    986     xco_latch_set(&t->done, value);
    987 }
    988 
    989 static inline xco_event_t  *xco_task_done_event  (xco_task_t *t)       { return &t->done.base; }
    990 static inline xco_cancel_t *xco_task_cancel      (xco_task_t *t)       { return &t->cancel; }
    991 static inline bool      xco_task_finished    (const xco_task_t *t) { return t->done.set; }
    992 static inline bool      xco_task_is_cancelled(const xco_task_t *t) { return xco_cancel_is_set(&t->cancel); }
    993 static inline xco_mach_t  *xco_task_mach        (xco_task_t *t)       { return t->mach; }
    994 
    995 /* ---- Task group ------------------------------------------------------- */
    996 
    997 /* Fan-in join + fan-out cancel for a dynamic set of tasks. Caller
    998  * provides storage for each per-attachment record (xco_group_attach_t), so
    999  * the group itself does no allocation.
   1000  *
   1001  * xco_task_group_attach(g, t, slot):
   1002  *   xco_countdown_add(g->pending, 1); slot's bridge waiter parks on
   1003  *   xco_task_done_event(t); slot is appended to g's list. When the task's
   1004  *   done fires, the bridge fires: it splices the slot out of g's list
   1005  *   and calls xco_countdown_done(&g->pending). Re-attaching a finished
   1006  *   task is UB.
   1007  *
   1008  * xco_task_group_cancel(g): walks the attachment list and xco_cancel_set's
   1009  * each &slot->task->cancel, then xco_cancel_set's g->cancel. Bodies that
   1010  * compose xco_wait_or_cancel against xco_task_cancel(t) wind down cooperatively;
   1011  * meanwhile, anything waiting on g->cancel observes the group-level
   1012  * signal directly.
   1013  *
   1014  * xco_task_group_join_event(g): fires when every attached task has reached
   1015  * xco_task_done. Compose with select / xco_wait_or_cancel like any event. */
   1016 
   1017 typedef struct xco_group_attach xco_group_attach_t;
   1018 
   1019 typedef struct xco_task_group {
   1020     xco_countdown_t     pending;
   1021     xco_cancel_t        cancel;
   1022     xco_group_attach_t *head, *tail;
   1023 } xco_task_group_t;
   1024 
   1025 struct xco_group_attach {
   1026     xco_waiter_t         bridge;     /* parked on xco_task_done_event(task) */
   1027     xco_task_t         *task;
   1028     xco_task_group_t   *group;
   1029     xco_group_attach_t *next, *prev;
   1030 };
   1031 
   1032 void      xco_task_group_init   (xco_task_group_t *g);
   1033 void      xco_task_group_attach (xco_task_group_t *g, xco_task_t *t,
   1034                              xco_group_attach_t *slot);
   1035 void      xco_task_group_cancel (xco_task_group_t *g);
   1036 static inline xco_event_t  *xco_task_group_join_event   (xco_task_group_t *g) {
   1037     return xco_countdown_event(&g->pending);
   1038 }
   1039 static inline xco_cancel_t *xco_task_group_cancel_handle(xco_task_group_t *g) {
   1040     return &g->cancel;
   1041 }
   1042 
   1043 /* ====================================================================
   1044  * xco — stack-switching coroutines.
   1045  *
   1046  * Teardown: this layer does not unwind a suspended coroutine's stack.
   1047  * Drive a coroutine to return (e.g. by passing a cancel sentinel it
   1048  * is expected to handle) before freeing its stack memory.
   1049  * ==================================================================== */
   1050 
   1051 /* Coroutine entry point. The argument is the value supplied to the
   1052  * first xco_step on this xco. The return value is delivered to the resumer
   1053  * as the final xco_step_result, with status XCO_STEP_DEAD. */
   1054 typedef uintptr_t (*xco_fn)(uintptr_t arg);
   1055 
   1056 /* Coroutine control block. Allocate anywhere — on a stack, in a
   1057  * struct, on the heap. xco_mach_t base is first so xco_coro_t * can be passed
   1058  * directly to xco_step() or any generic xco_mach_t * consumer. The trailing
   1059  * priv storage holds the saved register context and bookkeeping;
   1060  * its contents are private to the implementation. */
   1061 typedef struct xco_coro {
   1062     xco_mach_t base;
   1063     _Alignas(XCO_ALIGN) unsigned char priv[XCO_SIZE];
   1064 } xco_coro_t;
   1065 
   1066 /* Initialize *c to run fn on [stack_base, stack_base + stack_len).
   1067  * stack_base must be XCO_STACK_ALIGN-aligned; the runtime picks the
   1068  * starting SP based on the architecture's stack growth direction.
   1069  * Status after init is XCO_STEP_INIT. */
   1070 void xco_init(xco_coro_t *c, xco_fn fn,
   1071               void *stack_base, size_t stack_len);
   1072 
   1073 /* Suspend the currently running coroutine, returning value to its
   1074  * resumer. Returns the value passed by the next resume. Undefined if
   1075  * called outside a coroutine. */
   1076 uintptr_t xco_suspend(uintptr_t value);
   1077 
   1078 /* The currently running coroutine, or NULL if the caller is not in
   1079  * one. The runtime maintains this for xco_suspend. */
   1080 xco_coro_t *xco_self(void);
   1081 
   1082 /* Resuming a coroutine is just driving its xco_step: callers use
   1083  * xco_step(&c->base, v) directly. Reading status without resuming is
   1084  * xco_mach_status(&c->base). The xco layer adds no separate vocabulary
   1085  * for these — that's the unification with hand-coded state machines.
   1086  * Resuming a coroutine that is not XCO_STEP_INIT or XCO_STEP_SUSPENDED is
   1087  * undefined. */
   1088 
   1089 /* Convenience: init then first-step in one call. */
   1090 static inline xco_step_result_t xco_spawn(xco_coro_t *c, xco_fn fn,
   1091                                        void *stack_base, size_t stack_len,
   1092                                        uintptr_t arg) {
   1093     xco_init(c, fn, stack_base, stack_len);
   1094     return xco_step(&c->base, arg);
   1095 }
   1096 
   1097 /* Cooperative yield. Enqueues self on rt's ready queue and suspends;
   1098  * the next xco_rt_run pass resumes us. Useful for fairness when a coroutine
   1099  * wants to give other ready work a turn between long-running steps.
   1100  * Must be called from inside a coroutine driven by rt. */
   1101 static inline void xco_yield(xco_runtime_t *rt) {
   1102     xco_coro_t *self = xco_self();
   1103     assert(self != NULL);
   1104     xco_waker_t sw;
   1105     xco_waker_init(&sw, rt, &self->base);
   1106     xco_rt_enqueue(rt, &sw.base);
   1107     xco_suspend(0);
   1108 }
   1109 
   1110 /* Await an event from inside a coroutine. The standard poll-suspend
   1111  * dance, in one call. Returns the event's value (delivered by fire on
   1112  * the slow path, by the inline poll on the fast path). Must be called
   1113  * from inside a coroutine driven by rt. */
   1114 static inline uintptr_t xco_await(xco_runtime_t *rt, xco_event_t *e) {
   1115     xco_coro_t *self = xco_self();
   1116     assert(self != NULL);
   1117     uintptr_t v;
   1118     xco_waker_t sw;
   1119     xco_waker_init(&sw, rt, &self->base);
   1120     if (xco_event_poll(e, &v, &sw.base)) return v;
   1121     xco_suspend(0);
   1122     (void)xco_event_poll(e, &v, NULL);
   1123     return v;
   1124 }
   1125 
   1126 /* Await ev or be cancelled by c. Returns true if ev fired (its payload
   1127  * is written to *out, which may be NULL); false if cancelled. The
   1128  * internal select_event is always cleaned up before return.
   1129  *
   1130  * The canonical shape for cooperative work inside a task body — pair
   1131  * with xco_task_cancel(self) on every blocking await. */
   1132 static inline bool xco_await_or_cancel(xco_runtime_t *rt, xco_event_t *ev,
   1133                                        xco_cancel_t *c, uintptr_t *out) {
   1134     xco_select_event_t sel;
   1135     xco_select_input_t inputs[2];
   1136     xco_wait_or_cancel(&sel, inputs, ev, c);
   1137     uintptr_t winner = xco_await(rt, &sel.done.base);
   1138     bool ok = (winner == XCO_WAIT_OK);
   1139     if (ok && out) *out = inputs[XCO_WAIT_OK].value;
   1140     xco_select_event_deinit(&sel);
   1141     return ok;
   1142 }
   1143 
   1144 /* ---- xco-backed task ------------------------------------------------- */
   1145 
   1146 /* xco specialization of xco_task_t. Bundles the user-visible task handle
   1147  * with the xco that runs it; the xco_trampoline calls fn(&xt->task, arg)
   1148  * and then xco_task_done with its return value, so xco_wait_or_cancel-style
   1149  * teardown works without the user wiring anything.
   1150  *
   1151  * The xco_trampoline recovers xt from xco_self() at first entry (container_of
   1152  * on the embedded co), so the first-resume uintptr_t is preserved as
   1153  * fn's arg under normal xco semantics. Subsequent resumes pass values
   1154  * to the coroutine in the usual way.
   1155  *
   1156  * Storage shape: caller allocates xco_cotask_t and a stack. The xco_task_t
   1157  * inside is the handle to wait/cancel on; cancel via xco_cancel_set on
   1158  * &xt->task.cancel and the body is expected to observe it (typically
   1159  * by composing xco_wait_or_cancel against xco_task_cancel(&xt->task)). */
   1160 
   1161 typedef uintptr_t (*xco_cotask_fn)(xco_task_t *t, uintptr_t arg);
   1162 
   1163 typedef struct xco_cotask {
   1164     xco_task_t      task;
   1165     xco_coro_t       co;
   1166     xco_cotask_fn fn;
   1167 } xco_cotask_t;
   1168 
   1169 /* Initialize xt to run fn on the given stack. After this the embedded
   1170  * xco is XCO_STEP_INIT; drive it with xco_step(&xt->co.base, v) or use
   1171  * xco_cotask_spawn for the init-and-first-step convenience. */
   1172 void xco_cotask_init(xco_cotask_t *xt, xco_cotask_fn fn,
   1173                    void *stack_base, size_t stack_len);
   1174 
   1175 /* Convenience: init then first-step in one call. arg is delivered as
   1176  * fn's argument. */
   1177 static inline xco_step_result_t xco_cotask_spawn(xco_cotask_t *xt, xco_cotask_fn fn,
   1178                                             void *stack_base, size_t stack_len,
   1179                                             uintptr_t arg) {
   1180     xco_cotask_init(xt, fn, stack_base, stack_len);
   1181     return xco_step(&xt->co.base, arg);
   1182 }
   1183 
   1184 /* ====================================================================
   1185  * xco_op — generic effect/IO layer.
   1186  *
   1187  * Coroutines submit ops describing work to do; the runtime accumulates
   1188  * them on a pending list; after xco_rt_run quiesces, the host pulls the
   1189  * batch via xco_rt_take_ops, executes them however it likes (io_uring,
   1190  * threads, mocks, replay), and injects completions back via
   1191  * xco_op_complete. The xco library itself reads no clocks and makes no
   1192  * syscalls — that property extends to IO via this layer.
   1193  *
   1194  * An op is just an event with a side-channel describing what to do.
   1195  * Embed xco_op_t as the first member of a kind-specific payload struct;
   1196  * the host pattern-matches on `kind` (an open tag space — the runtime
   1197  * never inspects it).
   1198  *
   1199  * State machine:
   1200  *   PENDING   op->epoch == op->rt->op_epoch, done unset   (on rt's list)
   1201  *   IN_FLIGHT op->epoch != op->rt->op_epoch, done unset   (host has taken)
   1202  *   RESOLVED  done.set                                     (terminal)
   1203  *
   1204  * The epoch counter on the runtime is bumped each xco_rt_take_ops, so
   1205  * "still on the pending list" is a generation match — submit + take are
   1206  * both O(1) and the runtime never walks the batch.
   1207  *
   1208  * Threading: single-threaded, same as the rest of xco. submit, cancel,
   1209  * and complete must all be called on the runtime's thread.
   1210  * ==================================================================== */
   1211 
   1212 typedef enum {
   1213     XCO_OP_PENDING,        /* not used as a fire payload, but conceptual */
   1214     XCO_OP_COMPLETED,      /* host finished; result lives in embedder fields */
   1215     XCO_OP_CANCELLED,      /* resolved without a real result */
   1216 } xco_op_status_t;
   1217 
   1218 struct xco_op {
   1219     xco_latch_t      done;              /* fires once, payload = status */
   1220     xco_op_t        *next, *prev;       /* intrusive on rt's pending list */
   1221     xco_runtime_t   *rt;                /* set on submit; stays set after take */
   1222     uint64_t         epoch;             /* matches rt->op_epoch iff PENDING */
   1223     uint32_t         kind;              /* open tag space; host pattern-matches */
   1224     bool             cancel_requested;  /* advisory to host post-take */
   1225 };
   1226 
   1227 /* True iff op is still on rt's current pending batch (i.e. PENDING).
   1228  * False for IN_FLIGHT or RESOLVED. O(1). */
   1229 static inline bool xco_op_is_pending(const xco_op_t *op) {
   1230     return op->rt && op->epoch == op->rt->op_epoch && !op->done.set;
   1231 }
   1232 
   1233 /* Submit op to rt's pending list. Initializes done, clears
   1234  * cancel_requested, and sets op->rt = rt. The caller pre-populates op->kind
   1235  * and any embedder fields (fd, buf, len, ...) before submit. After submit
   1236  * the awaiter typically waits on &op->done.base — composes with select,
   1237  * xco_wait_or_cancel, etc. */
   1238 void xco_op_submit  (xco_runtime_t *rt, xco_op_t *op);
   1239 
   1240 /* Request cancellation. Behavior depends on state:
   1241  *   PENDING   splice from rt list, fire done(CANCELLED).
   1242  *   IN_FLIGHT set cancel_requested=true (advisory; host decides).
   1243  *   RESOLVED  no-op.
   1244  * Idempotent. */
   1245 void xco_op_cancel  (xco_op_t *op);
   1246 
   1247 /* Host's final word for an op it took via xco_rt_take_ops. Fires done
   1248  * with the given status. Idempotent (the latch is). */
   1249 void xco_op_complete(xco_op_t *op, xco_op_status_t status);
   1250 
   1251 /* Detach the runtime's pending ops list and return its head. Bumps the
   1252  * runtime's op_epoch, transitioning the whole batch to IN_FLIGHT in O(1)
   1253  * (no walk — each op's stored epoch now no longer matches). The host
   1254  * owns the returned list — the next/prev fields are the host's to reuse
   1255  * however it wants for in-flight tracking. Returns NULL if the list is
   1256  * empty.
   1257  *
   1258  * If tail_out is non-NULL, *tail_out receives the list's tail (or NULL
   1259  * for an empty list) — handy for splicing the batch onto a host-side
   1260  * in-flight list in O(1) without walking. */
   1261 xco_op_t *xco_rt_take_ops(xco_runtime_t *rt, xco_op_t **tail_out);
   1262 
   1263 #endif /* XCO_H */