xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs | README

xco.c (44826B)


      1 /*
      2  * xco.c — implementation for xco.h.
      3  *
      4  * Two parts:
      5  *   - Event substrate (runtime, latch, semaphore, select/allof, queue
      6  *     (chan is a typedef alias for the cap=0+BLOCK case), broadcast,
      7  *     notify, timer, pairing heap, timeout, ticker, task group). Each
      8  *     event type is a small struct with a static vtable.
      9  *     Waitlists are intrusive linked lists; fire-all detaches the whole
     10  *     list before iterating so callbacks can do anything, including
     11  *     re-park or unpark sibling waiters, without iterator hazards.
     12  *
     13  *   - Stack-switching coroutines (xco). The platform layer
     14  *     (arch/<name>/xco_arch.c) supplies the register save/restore
     15  *     primitive (xco_platform_switch) and the initial-context setup
     16  *     (xco_platform_init). Everything else — the state machine, the
     17  *     resume chain, the xco_trampoline wrapping user fn entry/exit, and
     18  *     the xco_self() TLS pointer — lives here.
     19  *
     20  * This translation unit is architecture-neutral. The platform context
     21  * type is forward-declared (in xco_platform.h) and only ever referred
     22  * to by pointer, so its actual size and layout never cross into this
     23  * file. We reserve raw space for it inside xco_impl_t using
     24  * XCO__CTX_SIZE / XCO__CTX_ALIGN from the arch's xco_arch.h, and cast
     25  * to xco_platform_ctx_t * when calling into the platform layer.
     26  */
     27 
     28 #include "xco.h"
     29 #include "xco_platform_internal.h"
     30 
     31 #include <assert.h>
     32 #include <stddef.h>
     33 #include <stdint.h>
     34 
     35 /* ====================================================================
     36  * Runtime
     37  * ==================================================================== */
     38 
     39 /* xco_rt_init and xco_rt_enqueue are defined inline in xco.h. */
     40 
     41 static xco_waiter_t *xco_rt_dequeue(xco_runtime_t *rt) {
     42     xco_waiter_t *w = rt->head;
     43     if (!w) return NULL;
     44     rt->head = w->next;
     45     if (!rt->head) rt->tail = NULL;
     46     /* Hand the waiter back fully detached. Every fire path already clears
     47      * prev before firing (and we just walked off the ready-queue), so
     48      * w->prev is NULL in practice — making it explicit here means
     49      * waker users can re-park without re-init, regardless of which
     50      * fire path resumed them. */
     51     w->next = NULL;
     52     w->prev = NULL;
     53     return w;
     54 }
     55 
     56 void xco_rt_run(xco_runtime_t *rt, uint64_t now) {
     57     /* The runtime ready queue holds only wakers. Other waiter
     58      * shapes (e.g. select_input) fire synchronously inside event
     59      * notify paths and never reach here.
     60      *
     61      * If a timer source is attached, advance it at the top of each
     62      * iteration: any timer whose deadline <= now fires, enqueueing its
     63      * wakers, which the inner loop then drains. A step may insert
     64      * a fresh already-expired timer; the outer loop catches it on the
     65      * next pass. Termination: each pass either drains a non-empty
     66      * queue or exits, and advance only fires timers it then removes,
     67      * so total work is bounded. */
     68     for (;;) {
     69         if (rt->timers) xco_timers_advance(rt->timers, now);
     70         if (!rt->head) return;
     71         for (xco_waiter_t *w; (w = xco_rt_dequeue(rt));) {
     72             xco_waker_t *sw = (xco_waker_t *)w;
     73             xco_step(sw->mach, sw->resume_value);
     74         }
     75     }
     76 }
     77 
     78 /* ---- Waker ------------------------------------------------------------ */
     79 
     80 /* Exposed (with leading underscore) so the inline xco_waker_init in
     81  * xco.h can install it without dragging the body into the header. */
     82 void xco__waker_fire(xco_waiter_t *w, uintptr_t value) {
     83     xco_waker_t *sw = (xco_waker_t *)w;
     84     sw->resume_value = value;
     85     xco_rt_enqueue(sw->rt, w);
     86 }
     87 
     88 /* ====================================================================
     89  * Latch
     90  * ==================================================================== */
     91 
     92 static bool xco_latch_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) {
     93     xco_latch_t *l = (xco_latch_t *)e;
     94     if (l->set) {
     95         if (out) *out = l->value;
     96         return true;
     97     }
     98     if (!w) return false;
     99     /* One-waiter invariant: w must arrive clean. Detach paths (xco_latch_set
    100      * iterator, xco_latch_unpark, xco_select_event_deinit) all leave waiters in
    101      * this state. A trip here means a double-park bug. */
    102     assert(!w->prev && !w->next);
    103     w->next = l->waiters;
    104     if (l->waiters) l->waiters->prev = w;
    105     l->waiters = w;
    106     return false;
    107 }
    108 
    109 static void xco_latch_unpark(xco_event_t *e, xco_waiter_t *w) {
    110     xco_latch_t *l = (xco_latch_t *)e;
    111     /* Detect "not on this list" via the invariant maintained by park
    112      * and the detach paths: a parked waiter has prev set OR is the
    113      * head; a detached one has prev == NULL and is not the head. */
    114     if (!w->prev && l->waiters != w) return;
    115     if (w->prev) w->prev->next = w->next;
    116     else         l->waiters    = w->next;
    117     if (w->next) w->next->prev = w->prev;
    118     w->prev = w->next = NULL;
    119 }
    120 
    121 /* Exposed so the inline xco_latch_init in xco.h can reference it. */
    122 const xco_event_vtable_t xco__latch_vt = {
    123     .poll   = xco_latch_poll,
    124     .unpark = xco_latch_unpark,
    125 };
    126 
    127 void xco_latch_set(xco_latch_t *l, uintptr_t value) {
    128     if (l->set) return;
    129     l->set   = true;
    130     l->value = value;
    131 
    132     /* Detach the whole waitlist before firing. A waiter's fire callback
    133      * might do anything (including unpark a sibling on another event),
    134      * but it cannot mutate this list — it's already gone. */
    135     xco_waiter_t *w = l->waiters;
    136     l->waiters = NULL;
    137     while (w) {
    138         xco_waiter_t *next = w->next;       /* save before xco_waiter_fire clears */
    139         xco_waiter_fire(w, value);
    140         w = next;
    141     }
    142 }
    143 
    144 /* ====================================================================
    145  * Semaphore
    146  *
    147  * FIFO doubly-linked waitlist, same shape as the chan_q_* helpers below
    148  * but specialized to a xco_semaphore_t (so we don't have to thread the
    149  * head/tail pair through chan_q_*).
    150  * ==================================================================== */
    151 
    152 static void xco_sem_q_push(xco_semaphore_t *s, xco_waiter_t *w) {
    153     assert(!w->prev && !w->next);
    154     w->prev = s->tail;
    155     w->next = NULL;
    156     if (s->tail) s->tail->next = w;
    157     else         s->head       = w;
    158     s->tail = w;
    159 }
    160 
    161 static xco_waiter_t *xco_sem_q_pop(xco_semaphore_t *s) {
    162     xco_waiter_t *w = s->head;
    163     if (!w) return NULL;
    164     s->head = w->next;
    165     if (s->head) s->head->prev = NULL;
    166     else         s->tail       = NULL;
    167     w->prev = w->next = NULL;
    168     return w;
    169 }
    170 
    171 static void xco_sem_q_remove(xco_semaphore_t *s, xco_waiter_t *w) {
    172     if (!w->prev && s->head != w) return;
    173     if (w->prev) w->prev->next = w->next;
    174     else         s->head       = w->next;
    175     if (w->next) w->next->prev = w->prev;
    176     else         s->tail       = w->prev;
    177     w->prev = w->next = NULL;
    178 }
    179 
    180 static bool xco_semaphore_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) {
    181     xco_semaphore_t *s = (xco_semaphore_t *)e;
    182     if (s->permits > 0) {
    183         s->permits--;
    184         if (out) *out = 1;
    185         return true;
    186     }
    187     if (!w) return false;
    188     xco_sem_q_push(s, w);
    189     return false;
    190 }
    191 
    192 static void xco_semaphore_unpark(xco_event_t *e, xco_waiter_t *w) {
    193     xco_semaphore_t *s = (xco_semaphore_t *)e;
    194     xco_sem_q_remove(s, w);
    195 }
    196 
    197 const xco_event_vtable_t xco__semaphore_acquire_vt = {
    198     .poll   = xco_semaphore_poll,
    199     .unpark = xco_semaphore_unpark,
    200 };
    201 
    202 void xco_semaphore_release(xco_semaphore_t *s, size_t n) {
    203     /* Hand a permit directly to each FIFO waiter, then drop any leftover
    204      * into the count. Direct handoff prevents a fresh try from jumping
    205      * the queue: an arriving acquirer that called try_ would see permits=0
    206      * and park behind the existing waiters until everyone ahead has been
    207      * served. */
    208     while (n > 0) {
    209         xco_waiter_t *w = xco_sem_q_pop(s);
    210         if (!w) break;
    211         n--;
    212         /* Fire value is conventional 1 — "you got a permit". Step-waiter
    213          * users ignore the value; select inputs capture it as the input's
    214          * value field. */
    215         xco_waiter_fire(w, 1);
    216     }
    217     s->permits += n;
    218 }
    219 
    220 /* ====================================================================
    221  * Select / all-of
    222  * ==================================================================== */
    223 
    224 /* One fire callback serves both modes. A counter `remaining` is decremented
    225  * on each fire; done is set when it hits 0. select inits remaining=1 (any
    226  * one fire closes); allof inits remaining=n (every input must fire). The
    227  * disarm-siblings loop is a no-op for already-fired waiters, so it runs
    228  * uniformly: for select it cleans up still-parked losers, for allof it
    229  * does nothing (every sibling is already detached by its source). */
    230 static void xco_select_input_fire(xco_waiter_t *w, uintptr_t value) {
    231     xco_select_input_t *in = (xco_select_input_t *)w;
    232     xco_select_event_t *s  = in->parent;
    233 
    234     /* Defensive: guard against any straggler that escaped disarm. */
    235     if (s->done.set) return;
    236     /* Capture the input's payload before resuming anyone. Sticky
    237      * sources also keep it on themselves; transient sources (channels)
    238      * deliver only here, so this is the only durable record. */
    239     in->value = value;
    240     if (--s->remaining > 0) return;
    241 
    242     size_t i = (size_t)(in - s->inputs);
    243     /* Disarm anyone still parked so their waiters don't dangle on input
    244      * waitlists past s's lifetime. Idempotent on already-detached waiters. */
    245     for (size_t j = 0; j < s->n; j++) {
    246         if (j != i) xco_event_unpark(s->inputs[j].src, &s->inputs[j].w);
    247     }
    248     xco_latch_set(&s->done, i);
    249 }
    250 
    251 void xco_select_event_init(xco_select_event_t *s,
    252                        xco_select_input_t *inputs, size_t n,
    253                        xco_event_t *const *srcs) {
    254     xco_latch_init(&s->done);
    255     s->inputs    = inputs;
    256     s->n         = n;
    257     s->remaining = 1;       /* any one fire closes the wait */
    258 
    259     /* Fast path: an input already ready. Fire and skip parking entirely
    260      * so deinit has nothing to disarm. */
    261     for (size_t i = 0; i < n; i++) {
    262         uintptr_t v;
    263         if (xco_event_poll(srcs[i], &v, NULL)) {
    264             inputs[i].value = v;       /* captured for inputs[winner].value */
    265             xco_latch_set(&s->done, i);
    266             return;
    267         }
    268     }
    269 
    270     for (size_t i = 0; i < n; i++) {
    271         inputs[i].w.next = NULL;
    272         inputs[i].w.prev = NULL;
    273         inputs[i].w.fire = xco_select_input_fire;
    274         inputs[i].src    = srcs[i];
    275         inputs[i].parent = s;
    276         inputs[i].value  = 0;
    277         (void)xco_event_poll(srcs[i], NULL, &inputs[i].w);
    278     }
    279 }
    280 
    281 void xco_allof_event_init(xco_select_event_t *s,
    282                       xco_select_input_t *inputs, size_t n,
    283                       xco_event_t *const *srcs) {
    284     xco_latch_init(&s->done);
    285     s->inputs    = inputs;
    286     s->n         = n;
    287     s->remaining = n;       /* every input must fire to close */
    288 
    289     if (n == 0) { xco_latch_set(&s->done, 0); return; }
    290 
    291     /* Initialize each input then poll. Fused: an already-ready input is
    292      * consumed inline (value captured, remaining--, no parking); the
    293      * rest end up parked by the same call. If everyone was inline, fire
    294      * done at the end. */
    295     for (size_t i = 0; i < n; i++) {
    296         inputs[i].w.next = NULL;
    297         inputs[i].w.prev = NULL;
    298         inputs[i].w.fire = xco_select_input_fire;
    299         inputs[i].src    = srcs[i];
    300         inputs[i].parent = s;
    301         inputs[i].value  = 0;
    302 
    303         uintptr_t v;
    304         if (xco_event_poll(srcs[i], &v, &inputs[i].w)) {
    305             inputs[i].value = v;
    306             s->remaining--;
    307         }
    308     }
    309 
    310     /* All inline-ready: fire done with the last input's index, matching
    311      * the "closing index" semantics of the parked path. */
    312     if (s->remaining == 0) xco_latch_set(&s->done, n - 1);
    313 }
    314 
    315 void xco_select_event_deinit(xco_select_event_t *s) {
    316     /* done.set => the closing fire already disarmed everyone (or the
    317      * fast path skipped parking entirely). Otherwise — possible after a
    318      * partial allof — some inputs may still be parked; unpark is
    319      * idempotent for already-detached waiters. */
    320     if (s->done.set) return;
    321     for (size_t i = 0; i < s->n; i++) {
    322         xco_event_unpark(s->inputs[i].src, &s->inputs[i].w);
    323     }
    324 }
    325 
    326 /* ====================================================================
    327  * FIFO waitlist helpers (shared by queue and notify)
    328  *
    329  * Doubly-linked FIFO push/pop. Same shape as latch's list operations
    330  * but with an explicit tail so arrival order is preserved (unlike
    331  * latch, where waiter order is irrelevant). The xco_chan_q_* names are
    332  * historical — the chan code that originally used them is now folded
    333  * into queue (chan = queue at cap=0 + BLOCK).
    334  * ==================================================================== */
    335 
    336 static void xco_chan_q_push(xco_waiter_t **head, xco_waiter_t **tail, xco_waiter_t *w) {
    337     assert(!w->prev && !w->next);
    338     w->prev = *tail;
    339     w->next = NULL;
    340     if (*tail) (*tail)->next = w;
    341     else       *head         = w;
    342     *tail = w;
    343 }
    344 
    345 static xco_waiter_t *xco_chan_q_pop(xco_waiter_t **head, xco_waiter_t **tail) {
    346     xco_waiter_t *w = *head;
    347     if (!w) return NULL;
    348     *head = w->next;
    349     if (*head) (*head)->prev = NULL;
    350     else       *tail         = NULL;
    351     w->prev = w->next = NULL;
    352     return w;
    353 }
    354 
    355 static void xco_chan_q_remove(xco_waiter_t **head, xco_waiter_t **tail, xco_waiter_t *w) {
    356     /* Same not-on-list test as xco_latch_unpark: a queued waiter has prev
    357      * set OR is the head; a detached one has prev == NULL and isn't
    358      * the head. */
    359     if (!w->prev && *head != w) return;
    360     if (w->prev) w->prev->next = w->next;
    361     else         *head         = w->next;
    362     if (w->next) w->next->prev = w->prev;
    363     else         *tail         = w->prev;
    364     w->prev = w->next = NULL;
    365 }
    366 
    367 
    368 /* ====================================================================
    369  * Queue
    370  *
    371  * The FIFO list helpers (xco_chan_q_push/pop/remove) are reused for the
    372  * queue's send and recv waitlists — same shape, same invariants. The
    373  * ring buffer lives in caller-provided storage; we just track head and
    374  * len. cap == 0 leaves the buffer logic dormant: every send either
    375  * direct-hands or parks, every recv either takes from a parked sender
    376  * or parks — i.e. it degenerates to chan rendezvous.
    377  * ==================================================================== */
    378 
    379 static inline xco_queue_t *xco_queue_of_recv(xco_event_t *e) {
    380     return (xco_queue_t *)((char *)e - offsetof(xco_queue_t, recv));
    381 }
    382 
    383 static inline void xco_queue_push_buf(xco_queue_t *q, uintptr_t v) {
    384     assert(q->len < q->cap);
    385     q->buf[(q->head + q->len) % q->cap] = v;
    386     q->len++;
    387 }
    388 
    389 static inline uintptr_t xco_queue_pop_buf(xco_queue_t *q) {
    390     assert(q->len > 0);
    391     uintptr_t v = q->buf[q->head];
    392     q->head = (q->head + 1) % q->cap;
    393     q->len--;
    394     return v;
    395 }
    396 
    397 /* Pop one parked sender's value into the now-free buffer slot, firing
    398  * the sender. Caller must have ensured a free slot exists (just popped
    399  * from the buffer, or cap > len). Maintains FIFO across the buffer +
    400  * sender-waitlist boundary: oldest buffered values come out before any
    401  * sender's value (which was queued later). No-op if no sender parked. */
    402 static void xco_queue_drain_one_sender(xco_queue_t *q) {
    403     if (!q->send_head) return;
    404     xco_waiter_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail);
    405     xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w;
    406     xco_queue_push_buf(q, qsw->value);
    407     qsw->delivered = true;
    408     /* Fire after pushing so the sender sees its delivery as complete. */
    409     xco_waiter_fire(w, 0);
    410 }
    411 
    412 static bool xco_queue_recv_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) {
    413     xco_queue_t *q = xco_queue_of_recv(e);
    414     if (q->len > 0) {
    415         uintptr_t v = xco_queue_pop_buf(q);
    416         if (out) *out = v;
    417         xco_queue_drain_one_sender(q);
    418         return true;
    419     }
    420     /* Empty buffer. If a sender is parked here it can only mean cap==0
    421      * (otherwise the sender would have used the buffer). Hand directly. */
    422     if (q->send_head) {
    423         xco_waiter_t *sender = xco_chan_q_pop(&q->send_head, &q->send_tail);
    424         xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)sender;
    425         if (out) *out = qsw->value;
    426         qsw->delivered = true;
    427         xco_waiter_fire(sender, 0);
    428         return true;
    429     }
    430     /* Closed and drained: receivers learn EOF via xco_queue_recv; out is
    431      * undefined. */
    432     if (q->closed) {
    433         if (out) *out = 0;
    434         return true;
    435     }
    436     if (!w) return false;
    437     xco_chan_q_push(&q->recv_head, &q->recv_tail, w);
    438     return false;
    439 }
    440 
    441 static void xco_queue_recv_unpark(xco_event_t *e, xco_waiter_t *w) {
    442     xco_queue_t *q = xco_queue_of_recv(e);
    443     xco_chan_q_remove(&q->recv_head, &q->recv_tail, w);
    444 }
    445 
    446 const xco_event_vtable_t xco__queue_recv_vt = {
    447     .poll   = xco_queue_recv_poll,
    448     .unpark = xco_queue_recv_unpark,
    449 };
    450 
    451 xco_queue_send_status_t xco_queue_send_poll(xco_queue_t *q, uintptr_t value,
    452                                             xco_queue_send_waiter_t *qsw) {
    453     /* Closed is closed regardless of policy: caller learns the truth. */
    454     if (q->closed) return XCO_QSEND_CLOSED;
    455 
    456     /* Direct handoff first: parked receivers always win over the buffer.
    457      * This is the rendezvous case and the cap==0 case. */
    458     xco_waiter_t *w = xco_chan_q_pop(&q->recv_head, &q->recv_tail);
    459     if (w) {
    460         xco_waiter_fire(w, value);
    461         return XCO_QSEND_ACCEPTED;
    462     }
    463     if (q->len < q->cap) {
    464         xco_queue_push_buf(q, value);
    465         return XCO_QSEND_ACCEPTED;
    466     }
    467     /* Buffer full and no waiting receiver. */
    468     switch (q->policy) {
    469     case XCO_QUEUE_BLOCK:
    470         if (!qsw) return XCO_QSEND_BLOCKED;
    471         qsw->value = value;
    472         xco_chan_q_push(&q->send_head, &q->send_tail, &qsw->sw.base);
    473         return XCO_QSEND_BLOCKED;
    474     case XCO_QUEUE_DROP_NEWEST:
    475         return XCO_QSEND_ACCEPTED;
    476     case XCO_QUEUE_DROP_OLDEST:
    477         (void)xco_queue_pop_buf(q);
    478         xco_queue_push_buf(q, value);
    479         return XCO_QSEND_ACCEPTED;
    480     }
    481     __builtin_unreachable();
    482 }
    483 
    484 void xco_queue_send_unpark(xco_queue_t *q, xco_queue_send_waiter_t *qsw) {
    485     xco_chan_q_remove(&q->send_head, &q->send_tail, &qsw->sw.base);
    486 }
    487 
    488 xco_recv_status_t xco_queue_recv(xco_queue_t *q, uintptr_t *out) {
    489     if (q->len > 0) {
    490         uintptr_t v = xco_queue_pop_buf(q);
    491         if (out) *out = v;
    492         xco_queue_drain_one_sender(q);
    493         return XCO_RECV_GOT;
    494     }
    495     if (q->send_head) {
    496         xco_waiter_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail);
    497         xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w;
    498         if (out) *out = qsw->value;
    499         qsw->delivered = true;
    500         xco_waiter_fire(w, 0);
    501         return XCO_RECV_GOT;
    502     }
    503     if (q->closed) return XCO_RECV_CLOSED;
    504     return XCO_RECV_EMPTY;
    505 }
    506 
    507 void xco_queue_close(xco_queue_t *q) {
    508     if (q->closed) return;
    509     q->closed = true;
    510 
    511     /* Drain parked senders with delivered=false. Senders only park
    512      * under BLOCK, so this is no-op for DROP_* (their waitlist is
    513      * always empty). */
    514     xco_waiter_t *w;
    515     while ((w = xco_chan_q_pop(&q->send_head, &q->send_tail)) != NULL) {
    516         xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w;
    517         qsw->delivered = false;
    518         xco_waiter_fire(w, 0);
    519     }
    520     /* Wake parked receivers so they can observe closed via xco_queue_recv.
    521      * Receivers may still drain buffered values first — xco_queue_recv's
    522      * XCO_RECV_GOT path is hit before the XCO_RECV_CLOSED branch. */
    523     while ((w = xco_chan_q_pop(&q->recv_head, &q->recv_tail)) != NULL) {
    524         xco_waiter_fire(w, 0);
    525     }
    526 }
    527 
    528 /* ---- Queue send op (selectable send) ---------------------------------- */
    529 
    530 void xco__queue_send_op_fire(xco_waiter_t *w, uintptr_t value) {
    531     (void)value;
    532     xco_queue_send_op_t *op = (xco_queue_send_op_t *)w;
    533     xco_latch_set(&op->done, op->qsw.delivered ? 1 : 0);
    534 }
    535 
    536 void xco_queue_send_op_init(xco_queue_send_op_t *op, xco_queue_t *q, uintptr_t value) {
    537     xco_queue_send_waiter_init(&op->qsw, NULL, NULL);
    538     op->qsw.sw.base.fire = xco__queue_send_op_fire;
    539     op->queue = q;
    540     xco_latch_init(&op->done);
    541 
    542     switch (xco_queue_send_poll(q, value, &op->qsw)) {
    543     case XCO_QSEND_ACCEPTED:
    544         op->qsw.delivered = true;
    545         xco_latch_set(&op->done, 1);
    546         return;
    547     case XCO_QSEND_CLOSED:
    548         xco_latch_set(&op->done, 0);
    549         return;
    550     case XCO_QSEND_BLOCKED:
    551         /* Only BLOCK + full buffer; parked. */
    552         return;
    553     }
    554 }
    555 
    556 /* ====================================================================
    557  * Broadcast (slot)
    558  *
    559  * The waitlist uses the same doubly-linked LIFO shape as latch — there
    560  * is no FIFO requirement because publish wakes everyone at once. The
    561  * key difference from latch is publish vs set: publish never marks a
    562  * sticky bit on the event, so try always returns false (subscribers
    563  * wait for the *next* publish), and the waitlist is reusable across
    564  * publishes — subscribers re-park to receive subsequent values.
    565  * ==================================================================== */
    566 
    567 static bool xco_broadcast_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) {
    568     (void)out;
    569     /* Transient: never reports ready; just parks if asked. */
    570     if (!w) return false;
    571     xco_broadcast_t *b = (xco_broadcast_t *)e;
    572     assert(!w->prev && !w->next);
    573     w->next = b->waiters;
    574     if (b->waiters) b->waiters->prev = w;
    575     b->waiters = w;
    576     return false;
    577 }
    578 
    579 static void xco_broadcast_unpark(xco_event_t *e, xco_waiter_t *w) {
    580     xco_broadcast_t *b = (xco_broadcast_t *)e;
    581     if (!w->prev && b->waiters != w) return;
    582     if (w->prev) w->prev->next = w->next;
    583     else         b->waiters    = w->next;
    584     if (w->next) w->next->prev = w->prev;
    585     w->prev = w->next = NULL;
    586 }
    587 
    588 const xco_event_vtable_t xco__broadcast_vt = {
    589     .poll   = xco_broadcast_poll,
    590     .unpark = xco_broadcast_unpark,
    591 };
    592 
    593 void xco_broadcast_publish(xco_broadcast_t *b, uintptr_t value) {
    594     b->has_value = true;
    595     b->value     = value;
    596 
    597     /* Detach the waitlist before iterating — same hazard-free pattern as
    598      * xco_latch_set. A waiter's fire callback may re-park itself on us (the
    599      * common case for a re-arming subscriber); decoupling means that
    600      * re-park lands on a fresh waitlist, not on the snapshot we're
    601      * walking. */
    602     xco_waiter_t *w = b->waiters;
    603     b->waiters = NULL;
    604     while (w) {
    605         xco_waiter_t *next = w->next;       /* save before xco_waiter_fire clears */
    606         xco_waiter_fire(w, value);
    607         w = next;
    608     }
    609 }
    610 
    611 /* ====================================================================
    612  * Notify
    613  *
    614  * Doubly-linked FIFO waitlist (same shape as the chan/queue waitlists).
    615  * xco_notify_one fires the head; xco_notify_all detaches the whole list before
    616  * iterating so callbacks can re-park onto a fresh waitlist without
    617  * iterator hazards (same pattern as xco_latch_set). xco_event_poll never
    618  * reports ready: notify is purely transient.
    619  * ==================================================================== */
    620 
    621 static bool xco_notify_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) {
    622     (void)out;
    623     if (!w) return false;
    624     xco_notify_t *n = (xco_notify_t *)e;
    625     xco_chan_q_push(&n->head, &n->tail, w);
    626     return false;
    627 }
    628 
    629 static void xco_notify_unpark(xco_event_t *e, xco_waiter_t *w) {
    630     xco_notify_t *n = (xco_notify_t *)e;
    631     xco_chan_q_remove(&n->head, &n->tail, w);
    632 }
    633 
    634 const xco_event_vtable_t xco__notify_vt = {
    635     .poll   = xco_notify_poll,
    636     .unpark = xco_notify_unpark,
    637 };
    638 
    639 void xco_notify_one(xco_notify_t *n) {
    640     xco_waiter_t *w = xco_chan_q_pop(&n->head, &n->tail);
    641     if (!w) return;
    642     xco_waiter_fire(w, 0);
    643 }
    644 
    645 void xco_notify_all(xco_notify_t *n) {
    646     /* Detach before iterating: re-parking inside fire lands on a fresh
    647      * (empty) list. Walk the snapshot via saved next pointers. */
    648     xco_waiter_t *w = n->head;
    649     n->head = n->tail = NULL;
    650     while (w) {
    651         xco_waiter_t *next = w->next;
    652         xco_waiter_fire(w, 0);
    653         w = next;
    654     }
    655 }
    656 
    657 /* ====================================================================
    658  * Pairing heap
    659  *
    660  * Standard intrusive pairing heap. Each node carries three link fields:
    661  *
    662  *   child : head of children sibling list (NULL if leaf).
    663  *   prev  : parent if this node is its parent's first child;
    664  *           previous sibling otherwise; NULL only for a detached tree
    665  *           root (including h->root).
    666  *   next  : next sibling, or NULL for the last child / a detached root.
    667  *
    668  * The "prev points to either parent or a sibling" trick lets us splice
    669  * a node out in O(1) without an extra parent pointer: parent->child==n
    670  * distinguishes "first child" from "non-first sibling."
    671  *
    672  * meld picks the smaller-deadline root as winner and grafts the loser
    673  * as its new first child. Pop-min and remove both rebuild via the
    674  * classic two-pass pairwise merge of the resulting children list.
    675  * ==================================================================== */
    676 
    677 /* Merge two detached subtree roots (each with prev=next=NULL). Returns
    678  * the merged root (also detached: prev=next=NULL). */
    679 static xco_timer_t *xco_ph_meld(xco_timer_t *a, xco_timer_t *b) {
    680     if (!a) return b;
    681     if (!b) return a;
    682     xco_timer_t *small, *large;
    683     if (a->deadline <= b->deadline) { small = a; large = b; }
    684     else                            { small = b; large = a; }
    685     /* Graft `large` as the new first child of `small`. */
    686     large->next = small->child;
    687     if (small->child) small->child->prev = large;
    688     large->prev  = small;       /* parent link via prev */
    689     small->child = large;
    690     small->prev  = NULL;
    691     small->next  = NULL;
    692     return small;
    693 }
    694 
    695 /* Two-pass pairwise meld of a children sibling list. Detaches each node
    696  * before melding so meld inputs satisfy its prev=next=NULL contract.
    697  * The output has prev=next=NULL. */
    698 static xco_timer_t *xco_ph_merge_pairs(xco_timer_t *first) {
    699     /* Pass 1: walk the sibling list left-to-right, melding consecutive
    700      * pairs. Chain results via `next` (ab)use as a temporary list link. */
    701     xco_timer_t *list = NULL;
    702     while (first) {
    703         xco_timer_t *a = first;
    704         xco_timer_t *b = a->next;
    705         xco_timer_t *rest = b ? b->next : NULL;
    706         a->prev = a->next = NULL;
    707         if (b) { b->prev = b->next = NULL; }
    708         xco_timer_t *m = xco_ph_meld(a, b);
    709         m->next = list;          /* prepend to pass-1 list */
    710         list = m;
    711         first = rest;
    712     }
    713     /* Pass 2: meld the pass-1 list into a single root. */
    714     xco_timer_t *acc = NULL;
    715     while (list) {
    716         xco_timer_t *nxt = list->next;
    717         list->prev = NULL;
    718         list->next = NULL;
    719         acc = xco_ph_meld(acc, list);
    720         list = nxt;
    721     }
    722     return acc;
    723 }
    724 
    725 /* Detach n from the tree (must currently be in the heap). Returns the
    726  * (possibly new) main-heap root. n is left fully detached: child still
    727  * points to its subtree, but prev/next are NULL — caller decides what
    728  * to do with that subtree. */
    729 static xco_timer_t *xco_ph_detach(xco_pairing_heap_t *h, xco_timer_t *n) {
    730     if (h->root == n) {
    731         /* n is the main root; pop it and rebuild from its children. */
    732         xco_timer_t *new_root = xco_ph_merge_pairs(n->child);
    733         n->child = NULL;
    734         n->prev  = NULL;
    735         n->next  = NULL;
    736         return new_root;
    737     }
    738     /* n has a parent (recorded via prev — either as its parent's first
    739      * child or as some sibling's successor). Splice out of the sibling
    740      * list. */
    741     if (n->prev->child == n) {
    742         /* First child: parent's child link skips us. */
    743         n->prev->child = n->next;
    744     } else {
    745         /* Mid/last sibling: previous sibling's next skips us. */
    746         n->prev->next = n->next;
    747     }
    748     if (n->next) n->next->prev = n->prev;
    749     n->prev = NULL;
    750     n->next = NULL;
    751     /* The main-heap root is unchanged structurally; the caller of this
    752      * function decides how (or whether) to reintroduce n's subtree. */
    753     return h->root;
    754 }
    755 
    756 static void xco_ph_insert(xco_timers_t *ts, xco_timer_t *t) {
    757     xco_pairing_heap_t *h = (xco_pairing_heap_t *)ts;
    758     /* Singleton tree (prev/next/child already NULL via xco_timer_init). */
    759     h->root = xco_ph_meld(h->root, t);
    760     t->in_heap = true;
    761 }
    762 
    763 static void xco_ph_cancel(xco_timers_t *ts, xco_timer_t *t) {
    764     xco_pairing_heap_t *h = (xco_pairing_heap_t *)ts;
    765     if (!t->in_heap) return;
    766     h->root = xco_ph_detach(h, t);
    767     /* Now meld t's subtree (its children) back into the main heap. */
    768     xco_timer_t *sub = xco_ph_merge_pairs(t->child);
    769     t->child = NULL;
    770     h->root = xco_ph_meld(h->root, sub);
    771     t->in_heap = false;
    772 }
    773 
    774 static void xco_ph_advance(xco_timers_t *ts, uint64_t now) {
    775     xco_pairing_heap_t *h = (xco_pairing_heap_t *)ts;
    776     /* Pop while the min-key timer is due. Each fire may run callbacks
    777      * that insert *new* timers (with later deadlines, normally) — those
    778      * land back in the heap, and we keep checking the root. */
    779     while (h->root && h->root->deadline <= now) {
    780         xco_timer_t *t = h->root;
    781         h->root = xco_ph_merge_pairs(t->child);
    782         t->child   = NULL;
    783         t->prev    = NULL;
    784         t->next    = NULL;
    785         t->in_heap = false;
    786         /* Trigger the latch: drains the waitlist and delivers the
    787          * deadline as the fire payload. */
    788         xco_latch_set(&t->done, (uintptr_t)t->deadline);
    789     }
    790 }
    791 
    792 static uint64_t xco_ph_peek(const xco_timers_t *ts) {
    793     const xco_pairing_heap_t *h = (const xco_pairing_heap_t *)ts;
    794     return h->root ? h->root->deadline : UINT64_MAX;
    795 }
    796 
    797 const xco_timers_vtable_t xco__pairing_heap_vt = {
    798     .insert  = xco_ph_insert,
    799     .cancel  = xco_ph_cancel,
    800     .advance = xco_ph_advance,
    801     .peek    = xco_ph_peek,
    802 };
    803 
    804 /* ====================================================================
    805  * Timeout
    806  * ==================================================================== */
    807 
    808 /* Bridge waiter: parked on the timer's latch, fires the cancel when the
    809  * timer fires. Two-step indirection so cancel can already have its own
    810  * waiters (e.g. a xco_wait_or_cancel select) without the timer's waitlist
    811  * caring about cancel internals. */
    812 static void xco__timeout_bridge_fire(xco_waiter_t *w, uintptr_t value) {
    813     (void)value;
    814     xco_timeout_t *to = (xco_timeout_t *)((char *)w - offsetof(xco_timeout_t, bridge));
    815     xco_cancel_set(&to->cancel);
    816 }
    817 
    818 void xco_timeout_init(xco_timeout_t *to, xco_timers_t *ts, uint64_t deadline) {
    819     xco_timer_init(&to->timer, ts, deadline);
    820     xco_cancel_init(&to->cancel);
    821     to->bridge.next = NULL;
    822     to->bridge.prev = NULL;
    823     to->bridge.fire = xco__timeout_bridge_fire;
    824     /* Park the bridge on the timer. If the timer fires, xco_latch_set
    825      * detaches the bridge and calls our fire callback inline. The timer
    826      * was just inserted into the heap and hasn't been advanced, so poll
    827      * always parks here (return value ignored). */
    828     (void)xco_event_poll(xco_timer_event(&to->timer), NULL, &to->bridge);
    829 }
    830 
    831 /* ====================================================================
    832  * Ticker
    833  *
    834  * The ticker's event surface uses the broadcast-style LIFO doubly-linked
    835  * waitlist: subscribers are fired all-at-once on each tick, so order
    836  * doesn't matter; doubly-linked gives O(1) unpark for cancellation.
    837  * ==================================================================== */
    838 
    839 static bool xco_ticker_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) {
    840     (void)out;
    841     /* Transient — never reports ready; just parks if asked. */
    842     if (!w) return false;
    843     xco_ticker_t *t = (xco_ticker_t *)((char *)e - offsetof(xco_ticker_t, base));
    844     assert(!w->prev && !w->next);
    845     w->next = t->waiters;
    846     if (t->waiters) t->waiters->prev = w;
    847     t->waiters = w;
    848     return false;
    849 }
    850 
    851 static void xco_ticker_unpark(xco_event_t *e, xco_waiter_t *w) {
    852     xco_ticker_t *t = (xco_ticker_t *)((char *)e - offsetof(xco_ticker_t, base));
    853     if (!w->prev && t->waiters != w) return;
    854     if (w->prev) w->prev->next = w->next;
    855     else         t->waiters    = w->next;
    856     if (w->next) w->next->prev = w->prev;
    857     w->prev = w->next = NULL;
    858 }
    859 
    860 const xco_event_vtable_t xco__ticker_vt = {
    861     .poll   = xco_ticker_poll,
    862     .unpark = xco_ticker_unpark,
    863 };
    864 
    865 /* Bridge waiter: parks on the underlying timer's latch. On fire, compute
    866  * the next deadline (skip-ahead-safe), reinstall the timer, re-park the
    867  * bridge on the new timer, then fire every parked subscriber with the
    868  * just-fired deadline. The waitlist is detached before iteration so
    869  * subscribers can re-park inside their fire callbacks. */
    870 static void xco__ticker_bridge_fire(xco_waiter_t *w, uintptr_t value) {
    871     xco_ticker_t *t = (xco_ticker_t *)((char *)w - offsetof(xco_ticker_t, bridge));
    872     uint64_t  fired  = (uint64_t)value;
    873     uint64_t  next   = fired + t->period;
    874     /* Skip-ahead: in the rare overflow case (period = 0 or wraparound),
    875      * step forward enough to keep next > fired. */
    876     if (next <= fired) {
    877         next += ((fired - next) / t->period + 1) * t->period;
    878     }
    879     /* Reinstall the timer for the next tick. The latch's storage is
    880      * reused — xco_timer_init runs xco_latch_init on it. */
    881     xco_timer_init(&t->timer, t->src, next);
    882     /* Bridge waiter is fully detached (xco_waiter_fire just cleared its
    883      * links); park it on the freshly-armed timer (which has not yet been
    884      * advanced, so poll parks). */
    885     (void)xco_event_poll(xco_timer_event(&t->timer), NULL, &t->bridge);
    886 
    887     /* Fire the subscribers. Detach the waitlist first so re-park inside
    888      * fire lands on the now-empty list. */
    889     xco_waiter_t *waiters = t->waiters;
    890     t->waiters = NULL;
    891     while (waiters) {
    892         xco_waiter_t *nxt = waiters->next;
    893         xco_waiter_fire(waiters, (uintptr_t)fired);
    894         waiters = nxt;
    895     }
    896 }
    897 
    898 void xco_ticker_init(xco_ticker_t *t, xco_timers_t *ts,
    899                  uint64_t period, uint64_t first_deadline) {
    900     /* period must be positive — the skip-ahead computation in the bridge
    901      * divides by period, and a zero-period ticker would loop forever
    902      * inside xco_ph_advance. */
    903     assert(period > 0);
    904     t->base.vt   = &xco__ticker_vt;
    905     t->src       = ts;
    906     t->period    = period;
    907     t->waiters   = NULL;
    908 
    909     xco_timer_init(&t->timer, ts, first_deadline);
    910 
    911     t->bridge.next = NULL;
    912     t->bridge.prev = NULL;
    913     t->bridge.fire = xco__ticker_bridge_fire;
    914     (void)xco_event_poll(xco_timer_event(&t->timer), NULL, &t->bridge);
    915 }
    916 
    917 void xco_ticker_deinit(xco_ticker_t *t) {
    918     /* Pull the bridge off the timer (no-op if already fired) and cancel
    919      * the timer. Subscribers' waiters are the caller's storage; nothing
    920      * to free here. */
    921     xco_event_unpark(xco_timer_event(&t->timer), &t->bridge);
    922     xco_timer_deinit(&t->timer);
    923 }
    924 
    925 /* ====================================================================
    926  * Task group
    927  *
    928  * Each attach contributes one to the countdown and parks a bridge waiter
    929  * on the task's done event. Bridge fire splices the slot out of the
    930  * group's list and decrements the countdown — the join event fires when
    931  * the last attached task finishes.
    932  *
    933  * Cancellation is fan-out: walk the list, set each task's cancel, then
    934  * set the group-level cancel. Bodies cooperate by composing their work
    935  * with xco_task_cancel(self); the group-level cancel is for non-task
    936  * waiters that want to react to "the group has been told to stop."
    937  * ==================================================================== */
    938 
    939 static void xco__task_group_detach_slot(xco_task_group_t *g, xco_group_attach_t *slot) {
    940     /* Doubly-linked, head/tail tracked; same shape as other waitlists. */
    941     if (slot->prev) slot->prev->next = slot->next;
    942     else            g->head          = slot->next;
    943     if (slot->next) slot->next->prev = slot->prev;
    944     else            g->tail          = slot->prev;
    945     slot->prev = slot->next = NULL;
    946 }
    947 
    948 static void xco__task_group_bridge_fire(xco_waiter_t *w, uintptr_t value) {
    949     (void)value;
    950     xco_group_attach_t *slot = (xco_group_attach_t *)((char *)w - offsetof(xco_group_attach_t, bridge));
    951     xco_task_group_t   *g    = slot->group;
    952     xco__task_group_detach_slot(g, slot);
    953     xco_countdown_done(&g->pending);
    954 }
    955 
    956 void xco_task_group_init(xco_task_group_t *g) {
    957     /* Don't go through xco_countdown_init(0) — that fires the latch
    958      * immediately, which would make the very first attach's
    959      * xco_countdown_add UB. The group's join must remain not-fired until at
    960      * least one attached task has finished, so we open with
    961      * remaining=0 and an unset latch. The first attach lifts remaining
    962      * to 1, and matching countdown_dones bring it back to 0, firing
    963      * the latch. */
    964     xco_latch_init(&g->pending.done);
    965     g->pending.remaining = 0;
    966     xco_cancel_init(&g->cancel);
    967     g->head = g->tail = NULL;
    968 }
    969 
    970 void xco_task_group_attach(xco_task_group_t *g, xco_task_t *t, xco_group_attach_t *slot) {
    971     xco_countdown_add(&g->pending, 1);
    972 
    973     slot->task  = t;
    974     slot->group = g;
    975 
    976     /* Append to the group's list (FIFO; ordering doesn't affect cancel
    977      * fan-out semantics, but consistent with other waitlists in the
    978      * codebase). */
    979     slot->prev = g->tail;
    980     slot->next = NULL;
    981     if (g->tail) g->tail->next = slot;
    982     else         g->head       = slot;
    983     g->tail = slot;
    984 
    985     slot->bridge.next = NULL;
    986     slot->bridge.prev = NULL;
    987     slot->bridge.fire = xco__task_group_bridge_fire;
    988 
    989     /* Park on the task's done event. Re-attaching a finished task is UB
    990      * per the contract; the latch's clean-waiter assert inside poll
    991      * would catch a double-park. */
    992     (void)xco_event_poll(xco_task_done_event(t), NULL, &slot->bridge);
    993 }
    994 
    995 void xco_task_group_cancel(xco_task_group_t *g) {
    996     /* Fan-out cancel: signal each attached task. Walk the snapshot
    997      * (cancel doesn't detach the slot — only task done does — so the
    998      * list is stable across iteration). */
    999     for (xco_group_attach_t *s = g->head; s; s = s->next) {
   1000         xco_cancel_set(&s->task->cancel);
   1001     }
   1002     /* Group-level cancel for anyone awaiting "the group as a whole." */
   1003     xco_cancel_set(&g->cancel);
   1004 }
   1005 
   1006 /* ====================================================================
   1007  * xco — coroutines
   1008  * ==================================================================== */
   1009 
   1010 typedef struct xco_impl {
   1011     xco_mach_t             base;        /* must be first; aliases xco_coro_t.base */
   1012     _Alignas(XCO__CTX_ALIGN) unsigned char ctx_buf[XCO__CTX_SIZE];
   1013     xco_platform_ctx_t *resumer_ctx; /* where suspend/return goes */
   1014     xco_fn              fn;
   1015 } xco_impl_t;
   1016 
   1017 _Static_assert(sizeof(xco_impl_t)   <= sizeof(xco_coro_t),   "xco_coro_t too small");
   1018 _Static_assert(_Alignof(xco_impl_t) <= _Alignof(xco_coro_t), "xco_coro_t under-aligned");
   1019 
   1020 static inline xco_impl_t         *xco_impl_of(xco_coro_t *c)         { return (xco_impl_t *)c; }
   1021 static inline xco_platform_ctx_t *xco_ctx_of(xco_impl_t *ci) {
   1022     return (xco_platform_ctx_t *)ci->ctx_buf;
   1023 }
   1024 
   1025 /* Per-thread state. */
   1026 static _Thread_local xco_impl_t *xco_coro_current = NULL;
   1027 static _Thread_local _Alignas(XCO__CTX_ALIGN)
   1028        unsigned char xco_t_main_ctx_buf[XCO__CTX_SIZE];
   1029 static inline xco_platform_ctx_t *xco_main_ctx(void) {
   1030     return (xco_platform_ctx_t *)xco_t_main_ctx_buf;
   1031 }
   1032 
   1033 /* Trampoline: runs on the coroutine's own stack, invoked by the
   1034  * platform layer on the first switch into a fresh context. The
   1035  * argument is the value passed to that first xco_step. The coroutine
   1036  * identifies itself via xco_coro_current, set by the resumer just before
   1037  * switching. */
   1038 static void xco_trampoline(uintptr_t arg) {
   1039     xco_impl_t *self = xco_coro_current;
   1040     uintptr_t   ret  = self->fn(arg);
   1041 
   1042     self->base.status = XCO_STEP_DEAD;
   1043     (void)xco_platform_switch(xco_ctx_of(self), self->resumer_ctx, ret);
   1044     __builtin_unreachable();
   1045 }
   1046 
   1047 /* xco_step_fn entry point wired into base.step at init time. All callers
   1048  * — generic xco_step consumers and xco-aware code alike — route through
   1049  * here. */
   1050 static xco_step_result_t xco_co_step(xco_mach_t *s, uintptr_t value) {
   1051     xco_impl_t *next = (xco_impl_t *)s;
   1052     assert(next->base.status == XCO_STEP_INIT || next->base.status == XCO_STEP_SUSPENDED);
   1053 
   1054     xco_impl_t *prev  = xco_coro_current;
   1055     next->resumer_ctx = prev ? xco_ctx_of(prev) : xco_main_ctx();
   1056     next->base.status = XCO_STEP_RUNNING;
   1057     xco_coro_current         = next;
   1058 
   1059     uintptr_t back = xco_platform_switch(next->resumer_ctx,
   1060                                          xco_ctx_of(next), value);
   1061 
   1062     /* Coroutine has either suspended or returned; status is already
   1063      * set correctly by xco_suspend or by the xco_trampoline. */
   1064     xco_coro_current = prev;
   1065     return (xco_step_result_t){ .value = back, .status = next->base.status };
   1066 }
   1067 
   1068 void xco_init(xco_coro_t *c, xco_fn fn,
   1069               void *stack_base, size_t stack_len) {
   1070     xco_impl_t *ci  = xco_impl_of(c);
   1071     ci->base.step   = xco_co_step;
   1072     ci->base.status = XCO_STEP_INIT;
   1073     ci->fn          = fn;
   1074     ci->resumer_ctx = NULL;
   1075     xco_platform_init(xco_ctx_of(ci), stack_base, stack_len, xco_trampoline);
   1076 }
   1077 
   1078 uintptr_t xco_suspend(uintptr_t value) {
   1079     xco_impl_t *self = xco_coro_current;
   1080     assert(self != NULL);
   1081     self->base.status = XCO_STEP_SUSPENDED;
   1082     return xco_platform_switch(xco_ctx_of(self), self->resumer_ctx, value);
   1083 }
   1084 
   1085 xco_coro_t *xco_self(void) {
   1086     return (xco_coro_t *)xco_coro_current;
   1087 }
   1088 
   1089 /* ---- xco-backed task -------------------------------------------------- */
   1090 
   1091 /* The xco_trampoline: runs as the coroutine's xco_fn. Recovers the owning
   1092  * xco_cotask_t via container_of on the embedded co (xco_self() returns
   1093  * the running xco), dispatches the user fn, and surfaces the return
   1094  * value through xco_task_done — so a joiner waiting on xco_task_done_event
   1095  * wakes with the body's return without the body needing to know about
   1096  * the task surface at all. */
   1097 static uintptr_t xco_cotask_trampoline(uintptr_t arg) {
   1098     xco_coro_t      *self = xco_self();
   1099     xco_cotask_t *xt   = (xco_cotask_t *)((char *)self - offsetof(xco_cotask_t, co));
   1100     uintptr_t   r    = xt->fn(&xt->task, arg);
   1101     xco_task_done(&xt->task, r);
   1102     return r;
   1103 }
   1104 
   1105 void xco_cotask_init(xco_cotask_t *xt, xco_cotask_fn fn,
   1106                    void *stack_base, size_t stack_len) {
   1107     xco_task_init(&xt->task, &xt->co.base);
   1108     xt->fn = fn;
   1109     xco_init(&xt->co, xco_cotask_trampoline, stack_base, stack_len);
   1110 }
   1111 
   1112 /* ====================================================================
   1113  * xco_op — generic effect/IO layer
   1114  *
   1115  * The runtime owns a doubly-linked pending-ops list (op_head/op_tail).
   1116  * Submit appends; cancel-while-PENDING splices in O(1); take detaches
   1117  * the whole list in O(1) and lets the host iterate via the next pointers
   1118  * (which are then the host's to reuse for in-flight tracking).
   1119  *
   1120  * Resolution always goes through xco_latch_set on op->done; awaiters
   1121  * compose with the standard event API. Status is delivered as the latch
   1122  * payload.
   1123  * ==================================================================== */
   1124 
   1125 void xco_op_submit(xco_runtime_t *rt, xco_op_t *op) {
   1126     xco_latch_init(&op->done);
   1127     op->cancel_requested = false;
   1128     op->rt    = rt;
   1129     op->epoch = rt->op_epoch;       /* matches → PENDING */
   1130     op->next  = NULL;
   1131     op->prev  = rt->op_tail;
   1132     if (rt->op_tail) rt->op_tail->next = op;
   1133     else             rt->op_head       = op;
   1134     rt->op_tail = op;
   1135 }
   1136 
   1137 void xco_op_cancel(xco_op_t *op) {
   1138     if (op->done.set) return;            /* RESOLVED — no-op */
   1139     if (xco_op_is_pending(op)) {
   1140         /* PENDING: splice from rt's pending list and resolve as cancelled. */
   1141         xco_runtime_t *rt = op->rt;
   1142         if (op->prev) op->prev->next = op->next;
   1143         else          rt->op_head    = op->next;
   1144         if (op->next) op->next->prev = op->prev;
   1145         else          rt->op_tail    = op->prev;
   1146         op->prev = op->next = NULL;
   1147         xco_latch_set(&op->done, (uintptr_t)XCO_OP_CANCELLED);
   1148         return;
   1149     }
   1150     /* IN_FLIGHT: advisory. Host sees cancel_requested and decides whether
   1151      * to honor it; xco_op_complete is still the final word either way. */
   1152     op->cancel_requested = true;
   1153 }
   1154 
   1155 void xco_op_complete(xco_op_t *op, xco_op_status_t status) {
   1156     /* xco_latch_set is idempotent; second/late completion is a no-op.
   1157      * Per the contract, host calls this only after take, so op is not
   1158      * on rt's pending list. */
   1159     xco_latch_set(&op->done, (uintptr_t)status);
   1160 }
   1161 
   1162 xco_op_t *xco_rt_take_ops(xco_runtime_t *rt, xco_op_t **tail_out) {
   1163     xco_op_t *head = rt->op_head;
   1164     if (tail_out) *tail_out = rt->op_tail;
   1165     rt->op_head = rt->op_tail = NULL;
   1166     /* Bump the epoch: every op currently on the (now-detached) batch had
   1167      * its epoch field set to the old value at submit, so they all flip to
   1168      * IN_FLIGHT in O(1). Submits after this point land in the new
   1169      * generation. */
   1170     rt->op_epoch++;
   1171     return head;
   1172 }