xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit 4f8d1cf83170dc3899d1147c5113da8b5e495b72
parent c8fcdddc76d080bee2555c69513e555134ae6768
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 02:07:38 -0700

Add allof: all-of waitset sharing select's storage

Folds an all-of combinator into select_event_t/select_input_t — caller
switches between any-of and all-of by changing only the init call. A
shared remaining counter (select inits to 1, allof to n) drives one
fire callback; done's payload is the closing input's index in both modes.

Diffstat:
Mevent.c | 69++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
Mevent.h | 44+++++++++++++++++++++++++++++++-------------
Mtests/test_event.c | 163+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 250 insertions(+), 26 deletions(-)

diff --git a/event.c b/event.c @@ -105,35 +105,42 @@ void latch_set(latch_t *l, uintptr_t value) { } } -/* ---- Select ----------------------------------------------------------- */ - +/* ---- Select / all-of -------------------------------------------------- */ + +/* One fire callback serves both modes. A counter `remaining` is decremented + * on each fire; done is set when it hits 0. select inits remaining=1 (any + * one fire closes); allof inits remaining=n (every input must fire). The + * disarm-siblings loop is a no-op for already-fired wakers, so it runs + * uniformly: for select it cleans up still-parked losers, for allof it + * does nothing (every sibling is already detached by its source). */ static void select_input_fire(waker_t *w, uintptr_t value) { select_input_t *in = (select_input_t *)w; select_event_t *s = in->parent; - size_t i = (size_t)(in - s->inputs); - /* Sticky: only the first input to fire records its index. Later - * stragglers (if any escape unparking) hit the idempotent guard. */ + /* Defensive: guard against any straggler that escaped disarm. */ if (s->done.set) return; /* Capture the input's payload before resuming anyone. Sticky * sources also keep it on themselves; transient sources (channels) * deliver only here, so this is the only durable record. */ in->value = value; - latch_set(&s->done, i); + if (--s->remaining > 0) return; - /* Disarm everyone else so their wakers don't dangle on input - * waitlists past the select's lifetime. */ + size_t i = (size_t)(in - s->inputs); + /* Disarm anyone still parked so their wakers don't dangle on input + * waitlists past s's lifetime. Idempotent on already-detached wakers. */ for (size_t j = 0; j < s->n; j++) { if (j != i) event_unpark(s->inputs[j].src, &s->inputs[j].w); } + latch_set(&s->done, i); } void select_event_init(select_event_t *s, select_input_t *inputs, size_t n, event_t *const *srcs) { latch_init(&s->done); - s->inputs = inputs; - s->n = n; + s->inputs = inputs; + s->n = n; + s->remaining = 1; /* any one fire closes the wait */ /* Fast path: an input already ready. Fire and skip parking entirely * so deinit has nothing to disarm. */ @@ -157,10 +164,46 @@ void select_event_init(select_event_t *s, } } +void allof_event_init(select_event_t *s, + select_input_t *inputs, size_t n, + event_t *const *srcs) { + latch_init(&s->done); + s->inputs = inputs; + s->n = n; + s->remaining = n; /* every input must fire to close */ + + if (n == 0) { latch_set(&s->done, 0); return; } + + /* Initialize each input then try-or-park. An already-ready input is + * consumed inline (value captured, remaining--, no parking); the + * rest park. If everyone was inline, fire done at the end. */ + for (size_t i = 0; i < n; i++) { + inputs[i].w.next = NULL; + inputs[i].w.prev = NULL; + inputs[i].w.fire = select_input_fire; + inputs[i].src = srcs[i]; + inputs[i].parent = s; + inputs[i].value = 0; + + uintptr_t v; + if (event_try(srcs[i], &v)) { + inputs[i].value = v; + s->remaining--; + } else { + event_park(srcs[i], &inputs[i].w); + } + } + + /* All inline-ready: fire done with the last input's index, matching + * the "closing index" semantics of the parked path. */ + if (s->remaining == 0) latch_set(&s->done, n - 1); +} + void select_event_deinit(select_event_t *s) { - /* If the select fired, survivors were already disarmed by - * select_input_fire. If the fast path fired, nothing was ever - * parked. Either way, set => nothing to do. */ + /* done.set => the closing fire already disarmed everyone (or the + * fast path skipped parking entirely). Otherwise — possible after a + * partial allof — some inputs may still be parked; unpark is + * idempotent for already-detached wakers. */ if (s->done.set) return; for (size_t i = 0; i < s->n; i++) { event_unpark(s->inputs[i].src, &s->inputs[i].w); diff --git a/event.h b/event.h @@ -173,13 +173,20 @@ static inline void latch_init(latch_t *l) { void latch_set(latch_t *l, uintptr_t value); -/* ---- Select ----------------------------------------------------------- */ +/* ---- Select / all-of -------------------------------------------------- */ -/* Fires when any of N input events fires. The winning index is the - * payload of the underlying latch; the winning input's payload is - * captured in inputs[winner].value (works uniformly for sticky and - * transient sources, where re-trying the input would either succeed - * or fail). Composes: a select_event is itself an event. */ +/* Wait over N input events. Two semantics share the same storage shape, + * so a caller can switch between them by changing only the init call: + * + * select_event_init fires when ANY input fires (any-of) + * allof_event_init fires when ALL inputs fire (all-of) + * + * In both cases done's payload is the index of the input whose firing + * closed the wait — the winner for select, the last-to-fire for allof — + * and inputs[i].value carries each fired input's payload (works + * uniformly for sticky and transient sources, where re-trying the input + * would either succeed or fail). Composes: a select_event is itself an + * event. */ typedef struct select_event select_event_t; @@ -194,21 +201,32 @@ typedef struct { } select_input_t; struct select_event { - latch_t done; /* fired with the winner's index */ + latch_t done; /* fires with the closing input's index */ select_input_t *inputs; size_t n; + size_t remaining; /* counts down; done fires at 0 + * (select: starts at 1, allof: at n) */ }; -/* Initialize. inputs[] is caller-provided storage for n nodes; srcs[] - * is the array of n input event pointers (read once during init). - * If any input is already ready, the select fires immediately and no - * wakers are parked. Use &s->done.base as the resulting event_t. */ +/* Initialize as a select (any-of). inputs[] is caller-provided storage + * for n nodes; srcs[] is the array of n input event pointers (read + * once during init). If any input is already ready, the select fires + * immediately and no wakers are parked. Use &s->done.base as the + * resulting event_t. */ void select_event_init(select_event_t *s, select_input_t *inputs, size_t n, event_t *const *srcs); -/* Disarm any inputs still parked. Safe to call after fire (no-op). - * Required before s leaves scope if it has not yet fired. */ +/* Initialize as an allof (all-of). Inputs already ready at init are + * consumed inline (no parking, value captured); if every input is + * ready, done fires immediately. n == 0 fires done with payload 0. */ +void allof_event_init(select_event_t *s, + select_input_t *inputs, size_t n, + event_t *const *srcs); + +/* Disarm any inputs still parked. Safe to call after fire (no-op) and + * after partial completion (allof). Required before s leaves scope if + * it has not yet fired. */ void select_event_deinit(select_event_t *s); /* ---- Channel ---------------------------------------------------------- */ diff --git a/tests/test_event.c b/tests/test_event.c @@ -275,6 +275,162 @@ static void test_select_compose(void) { select_event_deinit(&inner); } +/* ---- All-of -------------------------------------------------------- */ + +static void test_allof_basic(void) { + /* Three latches: done fires only after all three set. Storage is the + * same select_event_t / select_input_t — only the init call differs. */ + runtime_t rt; rt_init(&rt); + latch_t a, b, c; + latch_init(&a); latch_init(&b); latch_init(&c); + + select_event_t s; + select_input_t inputs[3]; + event_t *srcs[3] = {&a.base, &b.base, &c.base}; + allof_event_init(&s, inputs, 3, srcs); + assert(!s.done.set); + assert(a.waiters && b.waiters && c.waiters); + + waiter_t w; waiter_init(&w, &rt, &s.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + latch_set(&a, 0xAAA); + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); /* still waiting */ + assert(!s.done.set); + + latch_set(&b, 0xBBB); + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + assert(!s.done.set); + + latch_set(&c, 0xCCC); + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 2); /* closing index = c */ + + assert(inputs[0].value == 0xAAA); + assert(inputs[1].value == 0xBBB); + assert(inputs[2].value == 0xCCC); + + select_event_deinit(&s); +} + +static void test_allof_fast_path_partial(void) { + /* Some ready at init: those are captured inline, no parking. */ + latch_t a, b; + latch_init(&a); latch_init(&b); + latch_set(&a, 0xAAA); + + select_event_t s; + select_input_t inputs[2]; + event_t *srcs[2] = {&a.base, &b.base}; + allof_event_init(&s, inputs, 2, srcs); + + assert(!s.done.set); /* still need b */ + assert(a.waiters == NULL); /* a was inline */ + assert(b.waiters == &inputs[1].w); /* b is parked */ + assert(inputs[0].value == 0xAAA); + + latch_set(&b, 0xBBB); + assert(s.done.set); + assert(inputs[1].value == 0xBBB); + uintptr_t v; + assert(event_try(&s.done.base, &v)); + assert(v == 1); /* b closed the wait */ + + select_event_deinit(&s); +} + +static void test_allof_fast_path_all(void) { + /* All ready at init: done fires immediately, no waker is ever parked. */ + latch_t a, b; + latch_init(&a); latch_init(&b); + latch_set(&a, 0xAAA); + latch_set(&b, 0xBBB); + + select_event_t s; + select_input_t inputs[2]; + event_t *srcs[2] = {&a.base, &b.base}; + allof_event_init(&s, inputs, 2, srcs); + + assert(s.done.set); + uintptr_t v; + assert(event_try(&s.done.base, &v)); + assert(v == 1); /* last input's index */ + assert(inputs[0].value == 0xAAA); + assert(inputs[1].value == 0xBBB); + + select_event_deinit(&s); +} + +static void test_allof_empty(void) { + /* n=0: fires immediately with payload 0. */ + select_event_t s; + allof_event_init(&s, NULL, 0, NULL); + assert(s.done.set); + uintptr_t v; + assert(event_try(&s.done.base, &v)); + assert(v == 0); + select_event_deinit(&s); +} + +static void test_allof_deinit_partial(void) { + /* One input fired, one still parked: deinit must release the parked + * waker without disturbing the fired one. */ + latch_t a, b; + latch_init(&a); latch_init(&b); + + select_event_t s; + select_input_t inputs[2]; + event_t *srcs[2] = {&a.base, &b.base}; + allof_event_init(&s, inputs, 2, srcs); + + latch_set(&a, 0xAAA); + assert(!s.done.set); + assert(a.waiters == NULL); /* a's waker detached on fire */ + assert(b.waiters == &inputs[1].w); /* b still parked */ + + select_event_deinit(&s); + assert(b.waiters == NULL); /* released */ +} + +static void test_allof_compose_with_select(void) { + /* An allof-mode event is still a select_event_t — feed it into a + * select-mode wait. */ + runtime_t rt; rt_init(&rt); + latch_t a, b, c; + latch_init(&a); latch_init(&b); latch_init(&c); + + select_event_t all; + select_input_t all_inputs[2]; + event_t *all_srcs[2] = {&a.base, &b.base}; + allof_event_init(&all, all_inputs, 2, all_srcs); + + select_event_t sel; + select_input_t sel_inputs[2]; + event_t *sel_srcs[2] = {&all.done.base, &c.base}; + select_event_init(&sel, sel_inputs, 2, sel_srcs); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + /* Fire both halves of the allof; the select then sees its first input ready. */ + latch_set(&a, 1); + latch_set(&b, 2); + rt_run(&rt); + + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 0); /* allof side won the select */ + assert(all.done.set); + assert(c.waiters == NULL); /* select disarmed the loser */ + + select_event_deinit(&sel); + select_event_deinit(&all); +} + /* ---- Channel ------------------------------------------------------- */ /* Sender state machine: try inline; if blocked, park with value. */ @@ -657,6 +813,13 @@ int main(void) { test_select_deinit_unparks(); test_select_compose(); + test_allof_basic(); + test_allof_fast_path_partial(); + test_allof_fast_path_all(); + test_allof_empty(); + test_allof_deinit_partial(); + test_allof_compose_with_select(); + test_chan_send_blocks_until_recv(); test_chan_recv_blocks_until_send(); test_chan_try_send_no_recv();