xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit 76cdbc983cbfd158c6114e33a7252c576ed6b63c
parent 4f8d1cf83170dc3899d1147c5113da8b5e495b72
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 02:18:48 -0700

Add cancellation: cancel_t alias plus wait_or_cancel helper

A cancel token is just a sticky latch — these are vocabulary aliases
plus a 2-input select builder so call sites can express "await X, or
be cancelled" without open-coding the composition. Cancel notifies;
in-flight values are released by the caller's deinit chain, not
dropped by the runtime.

Diffstat:
Mevent.h | 44++++++++++++++++++++++++++++++++++++++++++++
Mtests/test_event.c | 171+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 215 insertions(+), 0 deletions(-)

diff --git a/event.h b/event.h @@ -315,4 +315,48 @@ void _chan_send_op_fire(waker_t *w, uintptr_t value); void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value); void chan_send_op_deinit(chan_send_op_t *op); +/* ---- Cancellation ----------------------------------------------------- */ + +/* A cancellation token is a sticky latch — these aliases exist for + * vocabulary at call sites. cancel_set fires every parked waiter; the + * idempotency of latch_set means racing cancellers are fine. + * + * Pair a cancel_t with any blocking event via wait_or_cancel to get + * "await X, or be cancelled." + * + * Discipline: cancellation notifies; it never drops in-flight values. + * A cancelled await returns control to its caller, which is responsible + * for draining whatever it owns — deinit a pending chan_send_op so its + * value goes back to the sender, deinit a select_event so input wakers + * detach, drive a cancellable coroutine to XSTEP_DEAD before freeing + * its stack. The xco layer does no unwinding; the coroutine cooperates. */ + +typedef latch_t cancel_t; + +static inline void cancel_init(cancel_t *c) { latch_init(c); } +static inline void cancel_set(cancel_t *c) { latch_set(c, 0); } +static inline bool cancel_is_set(const cancel_t *c) { return c->set; } +static inline event_t *cancel_event(cancel_t *c) { return &c->base; } + +/* Outcome indices for wait_or_cancel — match the inputs[] order so the + * value the resumer receives from the latched select reads as one of + * these directly. */ +enum { + WAIT_OK = 0, /* ev fired; inputs[0].value holds its payload */ + WAIT_CANCELLED = 1, /* cancel fired before ev */ +}; + +/* Build a select over (ev, cancel) using caller-provided storage. If + * either is already ready at init the select fast-paths and never parks + * anyone (ev is checked first, so an event that has already resolved + * wins over a concurrent cancel). Treat &out->done.base as the event + * to wait on. Always pair with select_event_deinit before storage + * leaves scope (no-op once fired). */ +static inline void wait_or_cancel(select_event_t *out, + select_input_t inputs[2], + event_t *ev, cancel_t *c) { + event_t *srcs[2] = {ev, cancel_event(c)}; + select_event_init(out, inputs, 2, srcs); +} + #endif /* EVENT_H */ diff --git a/tests/test_event.c b/tests/test_event.c @@ -778,6 +778,169 @@ static void test_chan_select_recv_fast_path(void) { select_event_deinit(&sel); } +/* ---- Cancellation -------------------------------------------------- */ + +static void test_cancel_basic(void) { + /* The alias is a thin rename over latch: not-set, set, idempotent set, + * and event_try yields the latch's payload of 0. */ + cancel_t c; cancel_init(&c); + assert(!cancel_is_set(&c)); + cancel_set(&c); + assert(cancel_is_set(&c)); + cancel_set(&c); /* idempotent */ + assert(cancel_is_set(&c)); + + uintptr_t v = 0xBADD; + assert(event_try(cancel_event(&c), &v)); + assert(v == 0); +} + +static void test_wait_or_cancel_ev_wins(void) { + /* Event resolves first: waiter sees WAIT_OK and inputs[0].value + * carries the event's payload. The cancel side is disarmed. */ + runtime_t rt; rt_init(&rt); + latch_t l; latch_init(&l); + cancel_t c; cancel_init(&c); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, &l.base, &c); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + assert(l.waiters && c.waiters); /* both armed */ + + latch_set(&l, 0xF00D); + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == WAIT_OK); + assert(inputs[WAIT_OK].value == 0xF00D); + assert(c.waiters == NULL); /* cancel disarmed */ + + select_event_deinit(&sel); +} + +static void test_wait_or_cancel_cancel_wins(void) { + /* Cancel fires while parked: waiter sees WAIT_CANCELLED, ev disarmed. */ + runtime_t rt; rt_init(&rt); + latch_t l; latch_init(&l); + cancel_t c; cancel_init(&c); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, &l.base, &c); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + cancel_set(&c); + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == WAIT_CANCELLED); + assert(l.waiters == NULL); /* ev disarmed */ + + select_event_deinit(&sel); +} + +static void test_wait_or_cancel_already_cancelled(void) { + /* Pre-set cancel: select fast-path fires WAIT_CANCELLED at init time; + * ev is never parked, so deinit has nothing to disarm. */ + latch_t l; latch_init(&l); + cancel_t c; cancel_init(&c); + cancel_set(&c); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, &l.base, &c); + + uintptr_t v; + assert(event_try(&sel.done.base, &v)); + assert(v == WAIT_CANCELLED); + assert(l.waiters == NULL); /* never parked */ + + select_event_deinit(&sel); +} + +static void test_wait_or_cancel_ev_precedes_cancel(void) { + /* Both ready at init: ev wins (it's checked first in the fast path). + * This is the right semantic — if the work has already resolved, + * a concurrent cancel doesn't retroactively undo it. */ + latch_t l; latch_init(&l); + cancel_t c; cancel_init(&c); + latch_set(&l, 0x600D); + cancel_set(&c); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, &l.base, &c); + + uintptr_t v; + assert(event_try(&sel.done.base, &v)); + assert(v == WAIT_OK); + assert(inputs[WAIT_OK].value == 0x600D); + + select_event_deinit(&sel); +} + +static void test_wait_or_cancel_chan_recv(void) { + /* Waiter blocks on chan_recv via wait_or_cancel; cancel fires while + * parked. After resume + deinit, the chan's recv list must be empty + * so a future send doesn't deliver to a freed waker. */ + runtime_t rt; rt_init(&rt); + chan_t ch; chan_init(&ch); + cancel_t c; cancel_init(&c); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, &ch.recv, &c); + assert(ch.recv_head == &inputs[0].w); /* select's input parked */ + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + cancel_set(&c); + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == WAIT_CANCELLED); + assert(ch.recv_head == NULL); /* select_input_fire disarmed */ + + select_event_deinit(&sel); + /* No stale waiter lingering: try_send fails, doesn't fire anything. */ + assert(!chan_try_send(&ch, 1)); +} + +static void test_wait_or_cancel_send_op(void) { + /* Selectable send under cancel: cancel wins, the send_op is still + * parked on the chan — chan_send_op_deinit must release it so the + * sender's value is not silently dropped from the chan's list. */ + chan_t ch; chan_init(&ch); + cancel_t c; cancel_init(&c); + + chan_send_op_t op; + chan_send_op_init(&op, &ch, 0xABBA); + assert(!op.done.set); + assert(ch.send_head == &op.csw.sw.base); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, &op.done.base, &c); + + cancel_set(&c); /* fires sel synchronously */ + uintptr_t v; + assert(event_try(&sel.done.base, &v)); + assert(v == WAIT_CANCELLED); + + /* Send didn't happen — op still parked on the chan. The select + * winning doesn't drain the chan's send list; deinit does. */ + assert(ch.send_head == &op.csw.sw.base); + select_event_deinit(&sel); + chan_send_op_deinit(&op); + assert(ch.send_head == NULL); +} + /* ---- Runtime test -------------------------------------------------- */ static void test_runtime_drains(void) { @@ -833,6 +996,14 @@ int main(void) { test_chan_select_send(); test_chan_select_send_loses(); + test_cancel_basic(); + test_wait_or_cancel_ev_wins(); + test_wait_or_cancel_cancel_wins(); + test_wait_or_cancel_already_cancelled(); + test_wait_or_cancel_ev_precedes_cancel(); + test_wait_or_cancel_chan_recv(); + test_wait_or_cancel_send_op(); + test_runtime_drains(); printf("ok\n");