xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit ac11fcddc0cf1f7eaf1610374e5e975b3168c16d
parent 192570edb8035ef5a3233906c46490b64b1a013d
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 14:46:30 -0700

Rename types and fuse try/park into poll

Three changes that reshape the public API:

- Type renames separate the state-machine concept from the scheduler
  bridge: xco_step_t -> xco_mach_t, xco_t -> xco_coro_t, the waitlist
  node xco_waker_t -> xco_waiter_t, and the scheduler bridge
  xco_step_waker_t -> xco_waker_t. Send-side waker types follow suit.

- Fuse xco_event_try + xco_event_park into xco_event_poll(e, out, w).
  Passing w=NULL is a pure try; passing out=NULL parks unconditionally.
  Same fusion for chan/queue send: xco_chan_send_poll / xco_queue_send_poll
  return DELIVERED/ACCEPTED, BLOCKED, or CLOSED; closed queues now
  report CLOSED under DROP_* policies (was: silent accept).

- Collapse xco_chan_t into xco_queue_t at cap=0+BLOCK, with thin alias
  wrappers preserving the rendezvous-naming at call sites.

Also: xco_timers_peek returns UINT64_MAX sentinel instead of bool+out;
timer source tracks now with monotonic assert and exposes xco_timers_now.

Diffstat:
Mplatform/arm64/xco_platform.h | 2+-
Mtests/test_event.c | 751++++++++++++++++++++++++++++++++++++++++---------------------------------------
Mtests/test_xco.c | 46+++++++++++++++++++++++-----------------------
Mxco.c | 585++++++++++++++++++++++++++-----------------------------------------------------
Mxco.h | 668+++++++++++++++++++++++++++++++++++++++++--------------------------------------
5 files changed, 941 insertions(+), 1111 deletions(-)

diff --git a/platform/arm64/xco_platform.h b/platform/arm64/xco_platform.h @@ -18,7 +18,7 @@ #define XCO__CTX_SIZE 176 /* sizeof(struct xco_platform_ctx) */ #define XCO__CTX_ALIGN 16 /* _Alignof(struct xco_platform_ctx) */ -/* Size and alignment of xco_t.priv. Must hold an xco_impl_t +/* Size and alignment of xco_coro_t.priv. Must hold an xco_impl_t * (defined in xco.c): a platform context + ~3 words of bookkeeping. * xco.c verifies sufficiency with _Static_assert. */ #define XCO_SIZE (XCO__CTX_SIZE + 48) diff --git a/tests/test_event.c b/tests/test_event.c @@ -14,26 +14,26 @@ /* ---- Waiter: blocks on one event, captures its value -------------- */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_event_t *e; xco_runtime_t *rt; - xco_step_waker_t sw; + xco_waker_t sw; int phase; uintptr_t got; } waiter_t; -static xco_step_result_t waiter_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t waiter_step(xco_mach_t *s, uintptr_t v) { waiter_t *w = (waiter_t *)s; switch (w->phase) { case 0: { uintptr_t out; - if (xco_event_try(w->e, &out)) { + if (xco_event_poll(w->e, &out, NULL)) { w->got = out; w->phase = 2; return (xco_step_result_t){out, XCO_STEP_DEAD}; } - xco_step_waker_init(&w->sw, w->rt, &w->base); - xco_event_park(w->e, &w->sw.base); + xco_waker_init(&w->sw, w->rt, &w->base); + xco_event_poll(w->e, NULL, &w->sw.base); w->phase = 1; return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } @@ -48,7 +48,7 @@ static xco_step_result_t waiter_step(xco_step_t *s, uintptr_t v) { } static void waiter_init(waiter_t *w, xco_runtime_t *rt, xco_event_t *e) { - w->base = (xco_step_t){.step = waiter_step, .status = XCO_STEP_INIT}; + w->base = (xco_mach_t){.step = waiter_step, .status = XCO_STEP_INIT}; w->e = e; w->rt = rt; w->phase = 0; @@ -73,7 +73,7 @@ static void test_latch_wake(void) { assert(rt.head != NULL); /* fire enqueued the step */ xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 42); assert(rt.head == NULL); } @@ -106,16 +106,16 @@ static void test_latch_multi_waiter(void) { xco_step(&a.base, 0); xco_step(&b.base, 0); xco_step(&c.base, 0); - assert(xco_step_status(&a.base) == XCO_STEP_SUSPENDED); - assert(xco_step_status(&b.base) == XCO_STEP_SUSPENDED); - assert(xco_step_status(&c.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&c.base) == XCO_STEP_SUSPENDED); xco_latch_set(&l, 99); xco_rt_run(&rt, 0); - assert(xco_step_status(&a.base) == XCO_STEP_DEAD && a.got == 99); - assert(xco_step_status(&b.base) == XCO_STEP_DEAD && b.got == 99); - assert(xco_step_status(&c.base) == XCO_STEP_DEAD && c.got == 99); + assert(xco_mach_status(&a.base) == XCO_STEP_DEAD && a.got == 99); + assert(xco_mach_status(&b.base) == XCO_STEP_DEAD && b.got == 99); + assert(xco_mach_status(&c.base) == XCO_STEP_DEAD && c.got == 99); } static void test_latch_set_idempotent(void) { @@ -124,41 +124,41 @@ static void test_latch_set_idempotent(void) { xco_latch_set(&l, 2); /* ignored */ uintptr_t v = 0; - assert(xco_event_try(&l.base, &v)); + assert(xco_event_poll(&l.base, &v, NULL)); assert(v == 1); } static void test_latch_unpark(void) { - /* Manually park then unpark — verify the waker is removed cleanly. */ + /* Manually park then unpark — verify the waiter is removed cleanly. */ xco_runtime_t rt; xco_rt_init(&rt); xco_latch_t l; xco_latch_init(&l); - xco_step_waker_t sw1, sw2, sw3; + xco_waker_t sw1, sw2, sw3; /* Step pointer is just a sentinel here; we never run them. */ - xco_step_waker_init(&sw1, &rt, (xco_step_t *)0x1); - xco_step_waker_init(&sw2, &rt, (xco_step_t *)0x2); - xco_step_waker_init(&sw3, &rt, (xco_step_t *)0x3); - xco_event_park(&l.base, &sw1.base); - xco_event_park(&l.base, &sw2.base); - xco_event_park(&l.base, &sw3.base); + xco_waker_init(&sw1, &rt, (xco_mach_t *)0x1); + xco_waker_init(&sw2, &rt, (xco_mach_t *)0x2); + xco_waker_init(&sw3, &rt, (xco_mach_t *)0x3); + xco_event_poll(&l.base, NULL, &sw1.base); + xco_event_poll(&l.base, NULL, &sw2.base); + xco_event_poll(&l.base, NULL, &sw3.base); /* Remove the middle one. */ xco_event_unpark(&l.base, &sw2.base); assert(sw2.base.next == NULL); - /* Unpark of an already-removed waker is a no-op. */ + /* Unpark of an already-removed waiter is a no-op. */ xco_event_unpark(&l.base, &sw2.base); /* sw1 and sw3 still on the list. */ int seen1 = 0, seen3 = 0; - for (xco_waker_t *w = l.waiters; w; w = w->next) { + for (xco_waiter_t *w = l.waiters; w; w = w->next) { if (w == &sw1.base) seen1 = 1; if (w == &sw3.base) seen3 = 1; assert(w != &sw2.base); } assert(seen1 && seen3); - /* Drain so we don't leave dangling armed wakers. */ + /* Drain so we don't leave dangling armed waiters. */ xco_event_unpark(&l.base, &sw1.base); xco_event_unpark(&l.base, &sw3.base); assert(l.waiters == NULL); @@ -178,13 +178,13 @@ static void test_select_winner(void) { waiter_t w; waiter_init(&w, &rt, &s.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); assert(a.waiters && b.waiters && c.waiters); /* all armed */ xco_latch_set(&b, 0xBBB); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 1); /* b's index */ /* Losers were disarmed by xco_select_input_fire. */ @@ -193,7 +193,7 @@ static void test_select_winner(void) { /* Re-trying the winning input yields its actual payload. */ uintptr_t v; - assert(xco_event_try(&b.base, &v)); + assert(xco_event_poll(&b.base, &v, NULL)); assert(v == 0xBBB); xco_select_event_deinit(&s); @@ -211,7 +211,7 @@ static void test_select_fast_path(void) { xco_select_event_init(&s, inputs, 2, srcs); uintptr_t v; - assert(xco_event_try(&s.done.base, &v)); + assert(xco_event_poll(&s.done.base, &v, NULL)); assert(v == 0); /* a wins */ assert(b.waiters == NULL); /* nothing parked on the loser */ @@ -219,7 +219,7 @@ static void test_select_fast_path(void) { } static void test_select_deinit_unparks(void) { - /* Selecting then dropping (no input fires) must release input wakers. */ + /* Selecting then dropping (no input fires) must release input waiters. */ xco_latch_t a, b; xco_latch_init(&a); xco_latch_init(&b); @@ -253,16 +253,16 @@ static void test_select_compose(void) { waiter_t w; waiter_init(&w, &rt, &outer.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); xco_latch_set(&a, 0); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 0); /* outer winner: inner (index 0) */ uintptr_t v; - assert(xco_event_try(&inner.done.base, &v)); + assert(xco_event_poll(&inner.done.base, &v, NULL)); assert(v == 0); /* inner winner: a (index 0) */ /* The unrelated source c was disarmed when outer fired. */ @@ -292,21 +292,21 @@ static void test_allof_basic(void) { waiter_t w; waiter_init(&w, &rt, &s.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); xco_latch_set(&a, 0xAAA); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); /* still waiting */ + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); /* still waiting */ assert(!s.done.set); xco_latch_set(&b, 0xBBB); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); assert(!s.done.set); xco_latch_set(&c, 0xCCC); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 2); /* closing index = c */ assert(inputs[0].value == 0xAAA); @@ -336,14 +336,14 @@ static void test_allof_fast_path_partial(void) { assert(s.done.set); assert(inputs[1].value == 0xBBB); uintptr_t v; - assert(xco_event_try(&s.done.base, &v)); + assert(xco_event_poll(&s.done.base, &v, NULL)); assert(v == 1); /* b closed the wait */ xco_select_event_deinit(&s); } static void test_allof_fast_path_all(void) { - /* All ready at init: done fires immediately, no waker is ever parked. */ + /* All ready at init: done fires immediately, no waiter is ever parked. */ xco_latch_t a, b; xco_latch_init(&a); xco_latch_init(&b); xco_latch_set(&a, 0xAAA); @@ -356,7 +356,7 @@ static void test_allof_fast_path_all(void) { assert(s.done.set); uintptr_t v; - assert(xco_event_try(&s.done.base, &v)); + assert(xco_event_poll(&s.done.base, &v, NULL)); assert(v == 1); /* last input's index */ assert(inputs[0].value == 0xAAA); assert(inputs[1].value == 0xBBB); @@ -370,14 +370,14 @@ static void test_allof_empty(void) { xco_allof_event_init(&s, NULL, 0, NULL); assert(s.done.set); uintptr_t v; - assert(xco_event_try(&s.done.base, &v)); + assert(xco_event_poll(&s.done.base, &v, NULL)); assert(v == 0); xco_select_event_deinit(&s); } static void test_allof_deinit_partial(void) { /* One input fired, one still parked: deinit must release the parked - * waker without disturbing the fired one. */ + * waiter without disturbing the fired one. */ xco_latch_t a, b; xco_latch_init(&a); xco_latch_init(&b); @@ -388,7 +388,7 @@ static void test_allof_deinit_partial(void) { xco_latch_set(&a, 0xAAA); assert(!s.done.set); - assert(a.waiters == NULL); /* a's waker detached on fire */ + assert(a.waiters == NULL); /* a's waiter detached on fire */ assert(b.waiters == &inputs[1].w); /* b still parked */ xco_select_event_deinit(&s); @@ -414,14 +414,14 @@ static void test_allof_compose_with_select(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); /* Fire both halves of the allof; the select then sees its first input ready. */ xco_latch_set(&a, 1); xco_latch_set(&b, 2); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 0); /* allof side won the select */ assert(all.done.set); assert(c.waiters == NULL); /* select disarmed the loser */ @@ -432,31 +432,34 @@ static void test_allof_compose_with_select(void) { /* ---- Channel ------------------------------------------------------- */ -/* Sender state machine: try inline; if blocked, park with value. */ +/* Sender state machine: poll; if blocked, the poll already parked us. */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_chan_t *c; xco_runtime_t *rt; - xco_chan_send_waker_t csw; + xco_chan_send_waiter_t csw; uintptr_t value; int phase; bool done; } sender_t; -static xco_step_result_t sender_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t sender_step(xco_mach_t *s, uintptr_t v) { sender_t *snd = (sender_t *)s; (void)v; switch (snd->phase) { case 0: - if (xco_chan_try_send(snd->c, snd->value)) { + xco_chan_send_waiter_init(&snd->csw, snd->rt, &snd->base); + switch (xco_chan_send_poll(snd->c, snd->value, &snd->csw)) { + case XCO_SEND_DELIVERED: + case XCO_SEND_CLOSED: snd->done = true; snd->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; + case XCO_SEND_BLOCKED: + snd->phase = 1; + return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } - xco_chan_send_waker_init(&snd->csw, snd->rt, &snd->base, snd->value); - xco_chan_park_send(snd->c, &snd->csw); - snd->phase = 1; - return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; + __builtin_unreachable(); case 1: snd->done = true; snd->phase = 2; @@ -466,7 +469,7 @@ static xco_step_result_t sender_step(xco_step_t *s, uintptr_t v) { } static void sender_init(sender_t *snd, xco_runtime_t *rt, xco_chan_t *c, uintptr_t value) { - snd->base = (xco_step_t){.step = sender_step, .status = XCO_STEP_INIT}; + snd->base = (xco_mach_t){.step = sender_step, .status = XCO_STEP_INIT}; snd->c = c; snd->rt = rt; snd->value = value; @@ -481,7 +484,7 @@ static void test_chan_send_blocks_until_recv(void) { sender_t snd; sender_init(&snd, &rt, &c, 0xDEADBEEF); xco_step(&snd.base, 0); - assert(xco_step_status(&snd.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&snd.base) == XCO_STEP_SUSPENDED); assert(c.send_head == &snd.csw.sw.base); assert(c.send_tail == &snd.csw.sw.base); @@ -494,7 +497,7 @@ static void test_chan_send_blocks_until_recv(void) { /* Sender's resumption is queued by the recv-side fire. */ xco_rt_run(&rt, 0); - assert(xco_step_status(&snd.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&snd.base) == XCO_STEP_DEAD); assert(snd.done); } @@ -505,22 +508,21 @@ static void test_chan_recv_blocks_until_send(void) { waiter_t r; waiter_init(&r, &rt, &c.recv); xco_step(&r.base, 0); - assert(xco_step_status(&r.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); assert(c.recv_head == &r.sw.base); - bool delivered = xco_chan_try_send(&c, 0xCAFE); - assert(delivered); + assert(xco_chan_send_poll(&c, 0xCAFE, NULL) == XCO_SEND_DELIVERED); assert(c.recv_head == NULL); xco_rt_run(&rt, 0); - assert(xco_step_status(&r.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); assert(r.got == 0xCAFE); } -static void test_chan_try_send_no_recv(void) { - /* No receiver parked: try_send fails without modifying state. */ +static void test_chan_send_try_no_recv(void) { + /* No receiver parked: pure-try poll reports BLOCKED without parking. */ xco_chan_t c; xco_chan_init(&c); - assert(!xco_chan_try_send(&c, 1)); + assert(xco_chan_send_poll(&c, 1, NULL) == XCO_SEND_BLOCKED); assert(c.send_head == NULL && c.recv_head == NULL); } @@ -533,26 +535,26 @@ static void test_chan_fifo(void) { for (int i = 0; i < 3; i++) { sender_init(&s[i], &rt, &c, (uintptr_t)(100 + i)); xco_step(&s[i].base, 0); - assert(xco_step_status(&s[i].base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&s[i].base) == XCO_STEP_SUSPENDED); } assert(c.send_head == &s[0].csw.sw.base); assert(c.send_tail == &s[2].csw.sw.base); for (int i = 0; i < 3; i++) { uintptr_t v; - assert(xco_event_try(&c.recv, &v)); + assert(xco_event_poll(&c.recv, &v, NULL)); assert(v == (uintptr_t)(100 + i)); } assert(c.send_head == NULL); xco_rt_run(&rt, 0); for (int i = 0; i < 3; i++) { - assert(xco_step_status(&s[i].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s[i].base) == XCO_STEP_DEAD); assert(s[i].done); } } -static void test_chan_unpark_send(void) { +static void test_chan_send_unpark(void) { /* Cancel a parked sender — list is repaired; other waiters intact. */ xco_runtime_t rt; xco_rt_init(&rt); xco_chan_t c; xco_chan_init(&c); @@ -565,22 +567,22 @@ static void test_chan_unpark_send(void) { assert(c.send_head == &a.csw.sw.base); assert(c.send_tail == &d.csw.sw.base); - xco_chan_unpark_send(&c, &b.csw); /* middle */ + xco_chan_send_unpark(&c, &b.csw); /* middle */ /* Idempotent: removing again is a no-op. */ - xco_chan_unpark_send(&c, &b.csw); + xco_chan_send_unpark(&c, &b.csw); /* Order preserved: a, then d. */ uintptr_t v; - assert(xco_event_try(&c.recv, &v) && v == 1); - assert(xco_event_try(&c.recv, &v) && v == 3); + assert(xco_event_poll(&c.recv, &v, NULL) && v == 1); + assert(xco_event_poll(&c.recv, &v, NULL) && v == 3); assert(c.send_head == NULL); - /* a and d resume; b stays SUSPENDED (its waker was unparked + /* a and d resume; b stays SUSPENDED (its waiter was unparked * without firing). Drain so it doesn't dangle. */ xco_rt_run(&rt, 0); - assert(xco_step_status(&a.base) == XCO_STEP_DEAD); - assert(xco_step_status(&d.base) == XCO_STEP_DEAD); - assert(xco_step_status(&b.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&d.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); } static void test_chan_select_recv(void) { @@ -597,14 +599,13 @@ static void test_chan_select_recv(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); - assert(c.recv_head != NULL); /* select's input waker parked here */ + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); + assert(c.recv_head != NULL); /* select's input waiter parked here */ - bool delivered = xco_chan_try_send(&c, 0xABCDEF); - assert(delivered); + assert(xco_chan_send_poll(&c, 0xABCDEF, NULL) == XCO_SEND_DELIVERED); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 0); /* xco_chan_recv won (index 0) */ assert(inputs[0].value == 0xABCDEF); /* captured value */ @@ -625,21 +626,20 @@ static void test_chan_recv_fifo(void) { for (int i = 0; i < 3; i++) { waiter_init(&r[i], &rt, &c.recv); xco_step(&r[i].base, 0); - assert(xco_step_status(&r[i].base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&r[i].base) == XCO_STEP_SUSPENDED); } assert(c.recv_head == &r[0].sw.base); assert(c.recv_tail == &r[2].sw.base); assert(c.send_head == NULL); /* never both sides */ for (int i = 0; i < 3; i++) { - bool delivered = xco_chan_try_send(&c, (uintptr_t)(200 + i)); - assert(delivered); + assert(xco_chan_send_poll(&c, (uintptr_t)(200 + i), NULL) == XCO_SEND_DELIVERED); } assert(c.recv_head == NULL); xco_rt_run(&rt, 0); for (int i = 0; i < 3; i++) { - assert(xco_step_status(&r[i].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&r[i].base) == XCO_STEP_DEAD); assert(r[i].got == (uintptr_t)(200 + i)); } } @@ -653,7 +653,7 @@ static void test_chan_send_op_inline(void) { waiter_t r; waiter_init(&r, &rt, &c.recv); xco_step(&r.base, 0); - assert(xco_step_status(&r.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); xco_chan_send_op_t op; xco_chan_send_op_init(&op, &c, 0xFEED); @@ -661,7 +661,7 @@ static void test_chan_send_op_inline(void) { assert(c.send_head == NULL); /* nothing parked */ xco_rt_run(&rt, 0); - assert(xco_step_status(&r.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); assert(r.got == 0xFEED); xco_chan_send_op_deinit(&op); @@ -674,11 +674,11 @@ static void test_chan_send_op_blocks(void) { xco_chan_send_op_t op; xco_chan_send_op_init(&op, &c, 0xBEAD); assert(!op.done.set); - assert(c.send_head == &op.csw.sw.base); + assert(c.send_head == &op.qsw.sw.base); - /* Recv pulls value and fires op's waker → sets op.done. */ + /* Recv pulls value and fires op's waiter → sets op.done. */ uintptr_t v; - assert(xco_event_try(&c.recv, &v)); + assert(xco_event_poll(&c.recv, &v, NULL)); assert(v == 0xBEAD); assert(op.done.set); assert(c.send_head == NULL); @@ -703,15 +703,15 @@ static void test_chan_select_send(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); /* A receiver arrives and pulls. op.done fires → select fires. */ uintptr_t v; - assert(xco_event_try(&c.recv, &v)); + assert(xco_event_poll(&c.recv, &v, NULL)); assert(v == 0x5EED); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 0); /* send op won */ assert(timeout.waiters == NULL); /* loser disarmed */ @@ -721,14 +721,14 @@ static void test_chan_select_send(void) { static void test_chan_select_send_loses(void) { /* select(send_op, latch). Latch fires first; send is canceled - * cleanly via op_deinit (chan side disarmed, no dangling waker). */ + * cleanly via op_deinit (chan side disarmed, no dangling waiter). */ xco_runtime_t rt; xco_rt_init(&rt); xco_chan_t c; xco_chan_init(&c); xco_latch_t l; xco_latch_init(&l); xco_chan_send_op_t op; xco_chan_send_op_init(&op, &c, 0xABBA); - assert(c.send_head == &op.csw.sw.base); + assert(c.send_head == &op.qsw.sw.base); xco_select_event_t sel; xco_select_input_t inputs[2]; @@ -740,12 +740,12 @@ static void test_chan_select_send_loses(void) { xco_latch_set(&l, 0xDEAD); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 1); /* latch (index 1) won */ assert(inputs[1].value == 0xDEAD); /* send didn't happen — op still parked on chan. deinit unparks. */ - assert(c.send_head == &op.csw.sw.base); + assert(c.send_head == &op.qsw.sw.base); xco_select_event_deinit(&sel); xco_chan_send_op_deinit(&op); assert(c.send_head == NULL); @@ -758,7 +758,7 @@ static void test_chan_select_recv_fast_path(void) { sender_t snd; sender_init(&snd, &rt, &c, 0x12345); xco_step(&snd.base, 0); - assert(xco_step_status(&snd.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&snd.base) == XCO_STEP_SUSPENDED); xco_latch_t l; xco_latch_init(&l); xco_select_event_t sel; @@ -768,12 +768,12 @@ static void test_chan_select_recv_fast_path(void) { /* Fast path fired immediately. */ uintptr_t winner; - assert(xco_event_try(&sel.done.base, &winner)); + assert(xco_event_poll(&sel.done.base, &winner, NULL)); assert(winner == 0); assert(inputs[0].value == 0x12345); xco_rt_run(&rt, 0); - assert(xco_step_status(&snd.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&snd.base) == XCO_STEP_DEAD); xco_select_event_deinit(&sel); } @@ -781,7 +781,7 @@ static void test_chan_select_recv_fast_path(void) { static void test_cancel_basic(void) { /* The alias is a thin rename over latch: not-set, set, idempotent set, - * and xco_event_try yields the latch's payload of 0. */ + * and xco_event_poll yields the latch's payload of 0. */ xco_cancel_t c; xco_cancel_init(&c); assert(!xco_cancel_is_set(&c)); xco_cancel_set(&c); @@ -790,7 +790,7 @@ static void test_cancel_basic(void) { assert(xco_cancel_is_set(&c)); uintptr_t v = 0xBADD; - assert(xco_event_try(xco_cancel_event(&c), &v)); + assert(xco_event_poll(xco_cancel_event(&c), &v, NULL)); assert(v == 0); } @@ -807,12 +807,12 @@ static void test_wait_or_cancel_ev_wins(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); assert(l.waiters && c.waiters); /* both armed */ xco_latch_set(&l, 0xF00D); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == XCO_WAIT_OK); assert(inputs[XCO_WAIT_OK].value == 0xF00D); assert(c.waiters == NULL); /* cancel disarmed */ @@ -832,11 +832,11 @@ static void test_wait_or_cancel_cancel_wins(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); xco_cancel_set(&c); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == XCO_WAIT_CANCELLED); assert(l.waiters == NULL); /* ev disarmed */ @@ -855,7 +855,7 @@ static void test_wait_or_cancel_already_cancelled(void) { xco_wait_or_cancel(&sel, inputs, &l.base, &c); uintptr_t v; - assert(xco_event_try(&sel.done.base, &v)); + assert(xco_event_poll(&sel.done.base, &v, NULL)); assert(v == XCO_WAIT_CANCELLED); assert(l.waiters == NULL); /* never parked */ @@ -876,7 +876,7 @@ static void test_wait_or_cancel_ev_precedes_cancel(void) { xco_wait_or_cancel(&sel, inputs, &l.base, &c); uintptr_t v; - assert(xco_event_try(&sel.done.base, &v)); + assert(xco_event_poll(&sel.done.base, &v, NULL)); assert(v == XCO_WAIT_OK); assert(inputs[XCO_WAIT_OK].value == 0x600D); @@ -886,7 +886,7 @@ static void test_wait_or_cancel_ev_precedes_cancel(void) { static void test_wait_or_cancel_chan_recv(void) { /* Waiter blocks on xco_chan_recv via xco_wait_or_cancel; cancel fires while * parked. After resume + deinit, the chan's recv list must be empty - * so a future send doesn't deliver to a freed waker. */ + * so a future send doesn't deliver to a freed waiter. */ xco_runtime_t rt; xco_rt_init(&rt); xco_chan_t ch; xco_chan_init(&ch); xco_cancel_t c; xco_cancel_init(&c); @@ -898,17 +898,17 @@ static void test_wait_or_cancel_chan_recv(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); xco_cancel_set(&c); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == XCO_WAIT_CANCELLED); assert(ch.recv_head == NULL); /* xco_select_input_fire disarmed */ xco_select_event_deinit(&sel); - /* No stale waiter lingering: try_send fails, doesn't fire anything. */ - assert(!xco_chan_try_send(&ch, 1)); + /* No stale waiter lingering: pure-try poll reports BLOCKED, fires nothing. */ + assert(xco_chan_send_poll(&ch, 1, NULL) == XCO_SEND_BLOCKED); } static void test_wait_or_cancel_send_op(void) { @@ -921,7 +921,7 @@ static void test_wait_or_cancel_send_op(void) { xco_chan_send_op_t op; xco_chan_send_op_init(&op, &ch, 0xABBA); assert(!op.done.set); - assert(ch.send_head == &op.csw.sw.base); + assert(ch.send_head == &op.qsw.sw.base); xco_select_event_t sel; xco_select_input_t inputs[2]; @@ -929,12 +929,12 @@ static void test_wait_or_cancel_send_op(void) { xco_cancel_set(&c); /* fires sel synchronously */ uintptr_t v; - assert(xco_event_try(&sel.done.base, &v)); + assert(xco_event_poll(&sel.done.base, &v, NULL)); assert(v == XCO_WAIT_CANCELLED); /* Send didn't happen — op still parked on the chan. The select * winning doesn't drain the chan's send list; deinit does. */ - assert(ch.send_head == &op.csw.sw.base); + assert(ch.send_head == &op.qsw.sw.base); xco_select_event_deinit(&sel); xco_chan_send_op_deinit(&op); assert(ch.send_head == NULL); @@ -961,7 +961,7 @@ static void test_timer_basic(void) { assert(!t.in_heap); uintptr_t v; - assert(xco_event_try(xco_timer_event(&t), &v)); + assert(xco_event_poll(xco_timer_event(&t), &v, NULL)); assert(v == 100); /* Idempotent deinit (already fired). */ @@ -970,25 +970,21 @@ static void test_timer_basic(void) { static void test_timer_peek(void) { xco_pairing_heap_t h; xco_pairing_heap_init(&h); - uint64_t out = 0xDEAD; - assert(!xco_timers_peek(&h.base, &out)); - assert(out == 0xDEAD); /* untouched on empty */ + assert(xco_timers_peek(&h.base) == UINT64_MAX); xco_timer_t a, b, c; xco_timer_init(&a, &h.base, 300); xco_timer_init(&b, &h.base, 100); xco_timer_init(&c, &h.base, 200); - assert(xco_timers_peek(&h.base, &out)); - assert(out == 100); /* earliest deadline */ + assert(xco_timers_peek(&h.base) == 100); /* earliest deadline */ xco_timers_advance(&h.base, 250); /* b and c fired; a remains. */ assert(xco_timer_fired(&b)); assert(xco_timer_fired(&c)); assert(!xco_timer_fired(&a)); - assert(xco_timers_peek(&h.base, &out)); - assert(out == 300); + assert(xco_timers_peek(&h.base) == 300); xco_timer_deinit(&a); xco_timer_deinit(&b); @@ -1025,14 +1021,14 @@ static void test_timer_park_wake(void) { waiter_t w; waiter_init(&w, &rt, xco_timer_event(&t)); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); assert(t.done.waiters == &w.sw.base); xco_timers_advance(&h.base, 500); /* fire enqueued the step. */ assert(rt.head != NULL); xco_rt_run(&rt, 500); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 500); /* deadline as payload */ xco_timer_deinit(&t); @@ -1052,11 +1048,11 @@ static void test_timer_select(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); xco_timers_advance(&h.base, 200); xco_rt_run(&rt, 200); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 0); /* timer (input 0) won */ assert(inputs[0].value == 200); /* deadline captured */ assert(l.waiters == NULL); /* loser disarmed */ @@ -1186,13 +1182,13 @@ static void test_timeout_with_wait_or_cancel(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); /* xco_rt_run advances the timer source itself: now=50 fires the timer, * which fires the bridge, which sets cancel, which fires sel, * which enqueues the waiter — all in one xco_rt_run call. */ xco_rt_run(&rt, 50); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == XCO_WAIT_CANCELLED); xco_select_event_deinit(&sel); @@ -1216,11 +1212,11 @@ static void test_timeout_ev_wins(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); xco_latch_set(&ev, 0xF00D); xco_rt_run(&rt, 50); /* now < deadline; timer untouched */ - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == XCO_WAIT_OK); assert(inputs[XCO_WAIT_OK].value == 0xF00D); assert(!xco_cancel_is_set(&to.cancel)); @@ -1236,7 +1232,7 @@ static void test_timeout_ev_wins(void) { static void test_rt_run_advances_timers(void) { /* A waiter parked on a timer; xco_rt_run with now=deadline advances the - * source, fires the timer, drains the waker — all in one call. */ + * source, fires the timer, drains the waiter — all in one call. */ xco_runtime_t rt; xco_rt_init(&rt); xco_pairing_heap_t h; xco_pairing_heap_init(&h); xco_rt_attach_timers(&rt, &h.base); @@ -1244,11 +1240,11 @@ static void test_rt_run_advances_timers(void) { xco_timer_t t; xco_timer_init(&t, &h.base, 42); waiter_t w; waiter_init(&w, &rt, xco_timer_event(&t)); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); assert(rt.head == NULL); /* parked, not queued */ xco_rt_run(&rt, 42); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 42); assert(rt.head == NULL); @@ -1263,11 +1259,11 @@ static void test_rt_run_no_timers(void) { waiter_t w; waiter_init(&w, &rt, &l.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); xco_latch_set(&l, 7); xco_rt_run(&rt, 99999); /* now ignored */ - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 7); } @@ -1277,26 +1273,26 @@ static void test_rt_run_no_timers(void) { * but specialized for the "permit handed at fire time" semantics — the * resume is itself the proof of acquisition. */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_semaphore_t *sem; xco_runtime_t *rt; - xco_step_waker_t sw; + xco_waker_t sw; int phase; bool got; } sem_acquirer_t; -static xco_step_result_t sem_acquirer_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t sem_acquirer_step(xco_mach_t *s, uintptr_t v) { sem_acquirer_t *a = (sem_acquirer_t *)s; (void)v; switch (a->phase) { case 0: - if (xco_event_try(xco_semaphore_event(a->sem), NULL)) { + if (xco_event_poll(xco_semaphore_event(a->sem), NULL, NULL)) { a->got = true; a->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; } - xco_step_waker_init(&a->sw, a->rt, &a->base); - xco_event_park(xco_semaphore_event(a->sem), &a->sw.base); + xco_waker_init(&a->sw, a->rt, &a->base); + xco_event_poll(xco_semaphore_event(a->sem), NULL, &a->sw.base); a->phase = 1; return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; case 1: @@ -1308,7 +1304,7 @@ static xco_step_result_t sem_acquirer_step(xco_step_t *s, uintptr_t v) { } static void sem_acquirer_init(sem_acquirer_t *a, xco_runtime_t *rt, xco_semaphore_t *s) { - a->base = (xco_step_t){.step = sem_acquirer_step, .status = XCO_STEP_INIT}; + a->base = (xco_mach_t){.step = sem_acquirer_step, .status = XCO_STEP_INIT}; a->sem = s; a->rt = rt; a->phase = 0; @@ -1318,11 +1314,11 @@ static void sem_acquirer_init(sem_acquirer_t *a, xco_runtime_t *rt, xco_semaphor static void test_semaphore_inline_acquire(void) { /* permits > 0: try succeeds and decrements. */ xco_semaphore_t s; xco_semaphore_init(&s, 2); - assert(xco_event_try(xco_semaphore_event(&s), NULL)); + assert(xco_event_poll(xco_semaphore_event(&s), NULL, NULL)); assert(s.permits == 1); - assert(xco_event_try(xco_semaphore_event(&s), NULL)); + assert(xco_event_poll(xco_semaphore_event(&s), NULL, NULL)); assert(s.permits == 0); - assert(!xco_event_try(xco_semaphore_event(&s), NULL)); + assert(!xco_event_poll(xco_semaphore_event(&s), NULL, NULL)); assert(s.permits == 0); } @@ -1333,7 +1329,7 @@ static void test_semaphore_park_then_release(void) { sem_acquirer_t a; sem_acquirer_init(&a, &rt, &s); xco_step(&a.base, 0); - assert(xco_step_status(&a.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED); assert(s.head == &a.sw.base); xco_semaphore_release(&s, 1); @@ -1342,7 +1338,7 @@ static void test_semaphore_park_then_release(void) { assert(s.head == NULL); xco_rt_run(&rt, 0); - assert(xco_step_status(&a.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); assert(a.got); } @@ -1362,14 +1358,14 @@ static void test_semaphore_fifo(void) { xco_semaphore_release(&s, 2); xco_rt_run(&rt, 0); - assert(xco_step_status(&a[0].base) == XCO_STEP_DEAD); - assert(xco_step_status(&a[1].base) == XCO_STEP_DEAD); - assert(xco_step_status(&a[2].base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&a[0].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&a[1].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&a[2].base) == XCO_STEP_SUSPENDED); assert(s.head == &a[2].sw.base); xco_semaphore_release(&s, 1); xco_rt_run(&rt, 0); - assert(xco_step_status(&a[2].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&a[2].base) == XCO_STEP_DEAD); assert(s.head == NULL); assert(s.permits == 0); } @@ -1403,12 +1399,12 @@ static void test_semaphore_select_acquire(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); - assert(s.head != NULL); /* sem-side waker parked */ + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); + assert(s.head != NULL); /* sem-side waiter parked */ xco_cancel_set(&c); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == XCO_WAIT_CANCELLED); assert(s.head == NULL); /* xco_select_input_fire detached us */ @@ -1425,18 +1421,18 @@ static void test_semaphore_binary_mutex(void) { xco_semaphore_t mu; xco_semaphore_init(&mu, 1); /* Holder takes it inline. */ - assert(xco_event_try(xco_semaphore_event(&mu), NULL)); + assert(xco_event_poll(xco_semaphore_event(&mu), NULL, NULL)); assert(mu.permits == 0); /* Contender parks. */ sem_acquirer_t b; sem_acquirer_init(&b, &rt, &mu); xco_step(&b.base, 0); - assert(xco_step_status(&b.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); /* Holder releases on exit. */ xco_semaphore_release(&mu, 1); xco_rt_run(&rt, 0); - assert(xco_step_status(&b.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&b.base) == XCO_STEP_DEAD); assert(b.got); } @@ -1444,29 +1440,32 @@ static void test_semaphore_binary_mutex(void) { /* Queue sender: same shape as the chan sender_t. */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_queue_t *q; xco_runtime_t *rt; - xco_queue_send_waker_t qsw; + xco_queue_send_waiter_t qsw; uintptr_t value; int phase; bool done; } queue_sender_t; -static xco_step_result_t queue_sender_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t queue_sender_step(xco_mach_t *s, uintptr_t v) { queue_sender_t *snd = (queue_sender_t *)s; (void)v; switch (snd->phase) { case 0: - if (xco_queue_try_send(snd->q, snd->value)) { + xco_queue_send_waiter_init(&snd->qsw, snd->rt, &snd->base); + switch (xco_queue_send_poll(snd->q, snd->value, &snd->qsw)) { + case XCO_QSEND_ACCEPTED: + case XCO_QSEND_CLOSED: snd->done = true; snd->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; + case XCO_QSEND_BLOCKED: + snd->phase = 1; + return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } - xco_queue_send_waker_init(&snd->qsw, snd->rt, &snd->base, snd->value); - xco_queue_park_send(snd->q, &snd->qsw); - snd->phase = 1; - return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; + __builtin_unreachable(); case 1: snd->done = true; snd->phase = 2; @@ -1476,7 +1475,7 @@ static xco_step_result_t queue_sender_step(xco_step_t *s, uintptr_t v) { } static void queue_sender_init(queue_sender_t *s, xco_runtime_t *rt, xco_queue_t *q, uintptr_t value) { - s->base = (xco_step_t){.step = queue_sender_step, .status = XCO_STEP_INIT}; + s->base = (xco_mach_t){.step = queue_sender_step, .status = XCO_STEP_INIT}; s->q = q; s->rt = rt; s->value = value; @@ -1490,18 +1489,18 @@ static void test_queue_block_buffered(void) { uintptr_t buf[3]; xco_queue_t q; xco_queue_init(&q, buf, 3, XCO_QUEUE_BLOCK); - assert(xco_queue_try_send(&q, 10)); - assert(xco_queue_try_send(&q, 20)); - assert(xco_queue_try_send(&q, 30)); + assert(xco_queue_send_poll(&q, 10, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 20, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 30, NULL) == XCO_QSEND_ACCEPTED); assert(q.len == 3); - /* Buffer full: try_send fails. */ - assert(!xco_queue_try_send(&q, 40)); + /* Buffer full under BLOCK: pure-try poll reports BLOCKED. */ + assert(xco_queue_send_poll(&q, 40, NULL) == XCO_QSEND_BLOCKED); uintptr_t v; - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 10); - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 20); - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 30); - assert(!xco_event_try(xco_queue_recv_event(&q), &v)); /* empty */ + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 10); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 20); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 30); + assert(!xco_event_poll(xco_queue_recv_event(&q), &v, NULL)); /* empty */ assert(q.len == 0); (void)rt; } @@ -1512,62 +1511,62 @@ static void test_queue_block_sender_parks(void) { uintptr_t buf[2]; xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_BLOCK); - assert(xco_queue_try_send(&q, 1)); - assert(xco_queue_try_send(&q, 2)); + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 2, NULL) == XCO_QSEND_ACCEPTED); queue_sender_t s; queue_sender_init(&s, &rt, &q, 3); xco_step(&s.base, 0); - assert(xco_step_status(&s.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); assert(q.send_head == &s.qsw.sw.base); /* Recv pops 1; 3 should slot into the buffer and the parked sender wakes. */ uintptr_t v; - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 1); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 1); xco_rt_run(&rt, 0); - assert(xco_step_status(&s.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); assert(s.done); assert(q.send_head == NULL); /* Buffer should now hold 2, 3. */ - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 2); - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 3); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 2); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 3); assert(q.len == 0); } static void test_queue_drop_newest(void) { - /* When full, try_send returns true but silently discards. */ + /* When full, send_try returns true but silently discards. */ uintptr_t buf[2]; xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_DROP_NEWEST); - assert(xco_queue_try_send(&q, 1)); - assert(xco_queue_try_send(&q, 2)); - assert(xco_queue_try_send(&q, 3)); /* full → drop 3 */ - assert(xco_queue_try_send(&q, 4)); /* still full → drop 4 */ + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 2, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 3, NULL) == XCO_QSEND_ACCEPTED); /* full → drop 3 */ + assert(xco_queue_send_poll(&q, 4, NULL) == XCO_QSEND_ACCEPTED); /* still full → drop 4 */ uintptr_t v; - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 1); - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 2); - assert(!xco_event_try(xco_queue_recv_event(&q), &v)); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 1); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 2); + assert(!xco_event_poll(xco_queue_recv_event(&q), &v, NULL)); } static void test_queue_drop_oldest(void) { - /* When full, try_send returns true and evicts head, pushes new tail. */ + /* When full, send_try returns true and evicts head, pushes new tail. */ uintptr_t buf[2]; xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_DROP_OLDEST); - assert(xco_queue_try_send(&q, 1)); - assert(xco_queue_try_send(&q, 2)); - assert(xco_queue_try_send(&q, 3)); /* full → evict 1, buffer = [2, 3] */ - assert(xco_queue_try_send(&q, 4)); /* full → evict 2, buffer = [3, 4] */ + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 2, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 3, NULL) == XCO_QSEND_ACCEPTED); /* full → evict 1, buffer = [2, 3] */ + assert(xco_queue_send_poll(&q, 4, NULL) == XCO_QSEND_ACCEPTED); /* full → evict 2, buffer = [3, 4] */ uintptr_t v; - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 3); - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 4); - assert(!xco_event_try(xco_queue_recv_event(&q), &v)); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 3); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 4); + assert(!xco_event_poll(xco_queue_recv_event(&q), &v, NULL)); } static void test_queue_direct_handoff(void) { - /* Receiver parked; try_send hands off directly, bypassing the buffer. + /* Receiver parked; send_try hands off directly, bypassing the buffer. * Works the same regardless of policy — exercise BLOCK here. */ xco_runtime_t rt; xco_rt_init(&rt); uintptr_t buf[2]; @@ -1575,15 +1574,15 @@ static void test_queue_direct_handoff(void) { waiter_t r; waiter_init(&r, &rt, xco_queue_recv_event(&q)); xco_step(&r.base, 0); - assert(xco_step_status(&r.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); assert(q.recv_head == &r.sw.base); - assert(xco_queue_try_send(&q, 0xCAFE)); + assert(xco_queue_send_poll(&q, 0xCAFE, NULL) == XCO_QSEND_ACCEPTED); assert(q.len == 0); /* didn't touch the buffer */ assert(q.recv_head == NULL); xco_rt_run(&rt, 0); - assert(xco_step_status(&r.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); assert(r.got == 0xCAFE); } @@ -1602,13 +1601,13 @@ static void test_queue_recv_blocks_then_drains_buffered(void) { xco_step(&r2.base, 0); /* Two sends → direct handoff to the two parked receivers, not buffered. */ - assert(xco_queue_try_send(&q, 100)); - assert(xco_queue_try_send(&q, 200)); + assert(xco_queue_send_poll(&q, 100, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 200, NULL) == XCO_QSEND_ACCEPTED); assert(q.len == 0); xco_rt_run(&rt, 0); - assert(xco_step_status(&r1.base) == XCO_STEP_DEAD && r1.got == 100); - assert(xco_step_status(&r2.base) == XCO_STEP_DEAD && r2.got == 200); + assert(xco_mach_status(&r1.base) == XCO_STEP_DEAD && r1.got == 100); + assert(xco_mach_status(&r2.base) == XCO_STEP_DEAD && r2.got == 200); } static void test_queue_select_recv(void) { @@ -1625,24 +1624,24 @@ static void test_queue_select_recv(void) { waiter_t w; waiter_init(&w, &rt, &sel.done.base); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); - assert(xco_queue_try_send(&q, 0xBEEF)); + assert(xco_queue_send_poll(&q, 0xBEEF, NULL) == XCO_QSEND_ACCEPTED); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 0); assert(inputs[0].value == 0xBEEF); xco_select_event_deinit(&sel); } -static void test_queue_unpark_send(void) { +static void test_queue_send_unpark(void) { /* Cancel a parked sender; remaining FIFO order intact. */ xco_runtime_t rt; xco_rt_init(&rt); uintptr_t buf[1]; xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); - assert(xco_queue_try_send(&q, 1)); /* fills buffer */ + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); /* fills buffer */ queue_sender_t a, b, d; queue_sender_init(&a, &rt, &q, 2); @@ -1652,20 +1651,20 @@ static void test_queue_unpark_send(void) { xco_step(&b.base, 0); xco_step(&d.base, 0); - xco_queue_unpark_send(&q, &b.qsw); - xco_queue_unpark_send(&q, &b.qsw); /* idempotent */ + xco_queue_send_unpark(&q, &b.qsw); + xco_queue_send_unpark(&q, &b.qsw); /* idempotent */ /* Drain: 1, then a's 2 (slots into buffer when 1 popped), then d's 4. */ uintptr_t v; - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 1); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 1); xco_rt_run(&rt, 0); - assert(xco_step_status(&a.base) == XCO_STEP_DEAD); - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 2); + assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 2); xco_rt_run(&rt, 0); - assert(xco_step_status(&d.base) == XCO_STEP_DEAD); - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 4); + assert(xco_mach_status(&d.base) == XCO_STEP_DEAD); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 4); /* b never sent its value, stayed SUSPENDED. */ - assert(xco_step_status(&b.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); } /* ---- Broadcast ----------------------------------------------------- */ @@ -1673,16 +1672,16 @@ static void test_queue_unpark_send(void) { /* Subscriber: parks on the broadcast, captures fire payload, re-parks * on resume. n_seen counts how many publishes it received. */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_broadcast_t *b; xco_runtime_t *rt; - xco_step_waker_t sw; + xco_waker_t sw; uintptr_t last; int n_seen; int target; /* unsubscribe after this many */ } bcast_sub_t; -static xco_step_result_t bcast_sub_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t bcast_sub_step(xco_mach_t *s, uintptr_t v) { bcast_sub_t *b = (bcast_sub_t *)s; if (b->n_seen > 0) { /* Resume from a fire — capture and decide whether to re-park. */ @@ -1691,30 +1690,30 @@ static xco_step_result_t bcast_sub_step(xco_step_t *s, uintptr_t v) { if (b->n_seen >= b->target) { return (xco_step_result_t){b->last, XCO_STEP_DEAD}; } - /* Re-park: the runtime returned sw fully detached, so xco_event_park + /* Re-park: the runtime returned sw fully detached, so xco_event_poll * works without re-init. (Init was done once in bcast_sub_init.) */ - xco_event_park(xco_broadcast_event(b->b), &b->sw.base); + xco_event_poll(xco_broadcast_event(b->b), NULL, &b->sw.base); b->n_seen++; return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } static void bcast_sub_init(bcast_sub_t *s, xco_runtime_t *rt, xco_broadcast_t *b, int target) { - s->base = (xco_step_t){.step = bcast_sub_step, .status = XCO_STEP_INIT}; + s->base = (xco_mach_t){.step = bcast_sub_step, .status = XCO_STEP_INIT}; s->b = b; s->rt = rt; s->last = 0; s->n_seen = 0; s->target = target; - xco_step_waker_init(&s->sw, rt, &s->base); + xco_waker_init(&s->sw, rt, &s->base); } static void test_broadcast_try_always_false(void) { /* No "ready now" state; subscribers always wait for the next publish. */ xco_broadcast_t b; xco_broadcast_init(&b); - assert(!xco_event_try(xco_broadcast_event(&b), NULL)); + assert(!xco_event_poll(xco_broadcast_event(&b), NULL, NULL)); xco_broadcast_publish(&b, 7); /* Even after a publish, try is still false — value is read via accessor. */ - assert(!xco_event_try(xco_broadcast_event(&b), NULL)); + assert(!xco_event_poll(xco_broadcast_event(&b), NULL, NULL)); assert(xco_broadcast_has_value(&b)); assert(xco_broadcast_value(&b) == 7); } @@ -1728,7 +1727,7 @@ static void test_broadcast_publish_wakes_all(void) { for (int i = 0; i < 3; i++) { bcast_sub_init(&s[i], &rt, &b, 1); /* one publish then exit */ xco_step(&s[i].base, 0); - assert(xco_step_status(&s[i].base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&s[i].base) == XCO_STEP_SUSPENDED); } xco_broadcast_publish(&b, 0xBEEF); @@ -1736,7 +1735,7 @@ static void test_broadcast_publish_wakes_all(void) { xco_rt_run(&rt, 0); for (int i = 0; i < 3; i++) { - assert(xco_step_status(&s[i].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s[i].base) == XCO_STEP_DEAD); assert(s[i].last == 0xBEEF); } } @@ -1751,12 +1750,12 @@ static void test_broadcast_rearm(void) { xco_broadcast_publish(&b, 11); xco_rt_run(&rt, 0); - assert(xco_step_status(&s.base) == XCO_STEP_SUSPENDED); /* re-armed */ + assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); /* re-armed */ assert(s.last == 11); xco_broadcast_publish(&b, 22); xco_rt_run(&rt, 0); - assert(xco_step_status(&s.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); assert(s.last == 22); } @@ -1772,11 +1771,11 @@ static void test_broadcast_missed_publish(void) { bcast_sub_t s; bcast_sub_init(&s, &rt, &b, 1); xco_step(&s.base, 0); - assert(xco_step_status(&s.base) == XCO_STEP_SUSPENDED); /* didn't see the prior */ + assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); /* didn't see the prior */ xco_broadcast_publish(&b, 2); xco_rt_run(&rt, 0); - assert(xco_step_status(&s.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); assert(s.last == 2); } @@ -1785,19 +1784,19 @@ static void test_broadcast_unpark(void) { xco_runtime_t rt; xco_rt_init(&rt); xco_broadcast_t b; xco_broadcast_init(&b); - xco_step_waker_t sw1, sw2, sw3; - xco_step_waker_init(&sw1, &rt, (xco_step_t *)0x1); - xco_step_waker_init(&sw2, &rt, (xco_step_t *)0x2); - xco_step_waker_init(&sw3, &rt, (xco_step_t *)0x3); - xco_event_park(xco_broadcast_event(&b), &sw1.base); - xco_event_park(xco_broadcast_event(&b), &sw2.base); - xco_event_park(xco_broadcast_event(&b), &sw3.base); + xco_waker_t sw1, sw2, sw3; + xco_waker_init(&sw1, &rt, (xco_mach_t *)0x1); + xco_waker_init(&sw2, &rt, (xco_mach_t *)0x2); + xco_waker_init(&sw3, &rt, (xco_mach_t *)0x3); + xco_event_poll(xco_broadcast_event(&b), NULL, &sw1.base); + xco_event_poll(xco_broadcast_event(&b), NULL, &sw2.base); + xco_event_poll(xco_broadcast_event(&b), NULL, &sw3.base); xco_event_unpark(xco_broadcast_event(&b), &sw2.base); xco_event_unpark(xco_broadcast_event(&b), &sw2.base); /* idempotent */ int seen1 = 0, seen3 = 0; - for (xco_waker_t *w = b.waiters; w; w = w->next) { + for (xco_waiter_t *w = b.waiters; w; w = w->next) { if (w == &sw1.base) seen1 = 1; if (w == &sw3.base) seen3 = 1; assert(w != &sw2.base); @@ -1811,17 +1810,17 @@ static void test_broadcast_unpark(void) { /* ---- Task (state-machine xco_step) ------------------------------------ */ /* A countdown SM that completes after N steps, returning a final value. - * On every step it checks task->cancel via xco_event_try and bails early + * On every step it checks task->cancel via xco_event_poll and bails early * (winding down to XCO_STEP_DEAD) if it's set. Demonstrates the cooperation * pattern: cancel is a signal, the SM owns the unwind. */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_task_t *task; int remaining; uintptr_t final; } taskbody_t; -static xco_step_result_t taskbody_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t taskbody_step(xco_mach_t *s, uintptr_t v) { taskbody_t *cd = (taskbody_t *)s; (void)v; if (xco_task_is_cancelled(cd->task)) { @@ -1836,7 +1835,7 @@ static xco_step_result_t taskbody_step(xco_step_t *s, uintptr_t v) { } static void taskbody_init(taskbody_t *cd, xco_task_t *t, int n, uintptr_t final) { - cd->base = (xco_step_t){.step = taskbody_step, .status = XCO_STEP_INIT}; + cd->base = (xco_mach_t){.step = taskbody_step, .status = XCO_STEP_INIT}; cd->task = t; cd->remaining = n; cd->final = final; @@ -1854,14 +1853,14 @@ static void test_task_state_machine_join(void) { assert(!xco_task_finished(&t)); /* Pump it manually. */ - while (xco_step_status(&cd.base) != XCO_STEP_DEAD) { + while (xco_mach_status(&cd.base) != XCO_STEP_DEAD) { xco_step(&cd.base, 0); } assert(xco_task_finished(&t)); /* A latched join: try the done event for the return value. */ uintptr_t v; - assert(xco_event_try(xco_task_done_event(&t), &v)); + assert(xco_event_poll(xco_task_done_event(&t), &v, NULL)); assert(v == 0xAAAA); } @@ -1879,7 +1878,7 @@ static void test_task_state_machine_cancel(void) { * shape — here, just a direct waiter on the done event. */ waiter_t joiner; waiter_init(&joiner, &rt, xco_task_done_event(&t)); xco_step(&joiner.base, 0); - assert(xco_step_status(&joiner.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&joiner.base) == XCO_STEP_SUSPENDED); /* Step the task a few times. */ xco_step(&cd.base, 0); @@ -1889,12 +1888,12 @@ static void test_task_state_machine_cancel(void) { xco_cancel_set(xco_task_cancel(&t)); /* Next step observes cancel and dies. */ xco_step(&cd.base, 0); - assert(xco_step_status(&cd.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&cd.base) == XCO_STEP_DEAD); assert(xco_task_finished(&t)); /* Joiner wakes with the done payload (0 from the cooperative unwind). */ xco_rt_run(&rt, 0); - assert(xco_step_status(&joiner.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&joiner.base) == XCO_STEP_DEAD); assert(joiner.got == 0); } @@ -1910,7 +1909,7 @@ static void test_countdown_basic(void) { assert(xco_countdown_fired(&cd)); uintptr_t v = 0xBAD; - assert(xco_event_try(xco_countdown_event(&cd), &v)); + assert(xco_event_poll(xco_countdown_event(&cd), &v, NULL)); assert(v == 0); } @@ -1936,13 +1935,13 @@ static void test_countdown_park_wake(void) { waiter_t w; waiter_init(&w, &rt, xco_countdown_event(&cd)); xco_step(&w.base, 0); - assert(xco_step_status(&w.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); xco_countdown_done(&cd); assert(rt.head == NULL); xco_countdown_done(&cd); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 0); } @@ -1950,10 +1949,10 @@ static void test_countdown_park_wake(void) { static void test_notify_try_always_false(void) { xco_notify_t n; xco_notify_init(&n); - assert(!xco_event_try(xco_notify_event(&n), NULL)); + assert(!xco_event_poll(xco_notify_event(&n), NULL, NULL)); xco_notify_one(&n); /* empty: no-op */ xco_notify_all(&n); /* empty: no-op */ - assert(!xco_event_try(xco_notify_event(&n), NULL)); + assert(!xco_event_poll(xco_notify_event(&n), NULL, NULL)); } static void test_notify_one_fifo(void) { @@ -1968,17 +1967,17 @@ static void test_notify_one_fifo(void) { xco_notify_one(&n); xco_rt_run(&rt, 0); - assert(xco_step_status(&a.base) == XCO_STEP_DEAD); - assert(xco_step_status(&b.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); xco_notify_one(&n); xco_rt_run(&rt, 0); - assert(xco_step_status(&b.base) == XCO_STEP_DEAD); - assert(xco_step_status(&c.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&b.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&c.base) == XCO_STEP_SUSPENDED); xco_notify_one(&n); xco_rt_run(&rt, 0); - assert(xco_step_status(&c.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&c.base) == XCO_STEP_DEAD); assert(n.head == NULL); xco_notify_one(&n); /* empty: no-op */ @@ -1998,9 +1997,9 @@ static void test_notify_all(void) { xco_notify_all(&n); assert(n.head == NULL); xco_rt_run(&rt, 0); - assert(xco_step_status(&a.base) == XCO_STEP_DEAD); - assert(xco_step_status(&b.base) == XCO_STEP_DEAD); - assert(xco_step_status(&c.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&b.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&c.base) == XCO_STEP_DEAD); } static void test_notify_select(void) { @@ -2018,7 +2017,7 @@ static void test_notify_select(void) { xco_notify_one(&n); xco_rt_run(&rt, 0); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == 0); assert(l.waiters == NULL); @@ -2029,13 +2028,13 @@ static void test_notify_unpark(void) { xco_runtime_t rt; xco_rt_init(&rt); xco_notify_t n; xco_notify_init(&n); - xco_step_waker_t s1, s2, s3; - xco_step_waker_init(&s1, &rt, (xco_step_t *)0x1); - xco_step_waker_init(&s2, &rt, (xco_step_t *)0x2); - xco_step_waker_init(&s3, &rt, (xco_step_t *)0x3); - xco_event_park(xco_notify_event(&n), &s1.base); - xco_event_park(xco_notify_event(&n), &s2.base); - xco_event_park(xco_notify_event(&n), &s3.base); + xco_waker_t s1, s2, s3; + xco_waker_init(&s1, &rt, (xco_mach_t *)0x1); + xco_waker_init(&s2, &rt, (xco_mach_t *)0x2); + xco_waker_init(&s3, &rt, (xco_mach_t *)0x3); + xco_event_poll(xco_notify_event(&n), NULL, &s1.base); + xco_event_poll(xco_notify_event(&n), NULL, &s2.base); + xco_event_poll(xco_notify_event(&n), NULL, &s3.base); xco_event_unpark(xco_notify_event(&n), &s2.base); xco_event_unpark(xco_notify_event(&n), &s2.base); /* idempotent */ @@ -2053,16 +2052,16 @@ static void test_mutex_basic(void) { xco_runtime_t rt; xco_rt_init(&rt); xco_mutex_t mu; xco_mutex_init(&mu); - assert(xco_event_try(xco_mutex_event(&mu), NULL)); - assert(!xco_event_try(xco_mutex_event(&mu), NULL)); + assert(xco_event_poll(xco_mutex_event(&mu), NULL, NULL)); + assert(!xco_event_poll(xco_mutex_event(&mu), NULL, NULL)); sem_acquirer_t b; sem_acquirer_init(&b, &rt, &mu); xco_step(&b.base, 0); - assert(xco_step_status(&b.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); xco_mutex_release(&mu); xco_rt_run(&rt, 0); - assert(xco_step_status(&b.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&b.base) == XCO_STEP_DEAD); assert(b.got); } @@ -2077,7 +2076,7 @@ static void test_queue_send_op_inline(void) { xco_queue_send_op_init(&op, &q, 0xAA); assert(op.done.set); uintptr_t v; - assert(xco_event_try(&op.done.base, &v)); + assert(xco_event_poll(&op.done.base, &v, NULL)); assert(v == 1); assert(q.len == 1); @@ -2088,7 +2087,7 @@ static void test_queue_send_op_blocks(void) { /* BLOCK full: parks. Recv frees a slot, sender drains, op fires. */ uintptr_t buf[1]; xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); - assert(xco_queue_try_send(&q, 1)); + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); xco_queue_send_op_t op; xco_queue_send_op_init(&op, &q, 2); @@ -2096,12 +2095,12 @@ static void test_queue_send_op_blocks(void) { assert(q.send_head == &op.qsw.sw.base); uintptr_t v; - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 1); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 1); assert(op.done.set); uintptr_t pv; - assert(xco_event_try(&op.done.base, &pv)); + assert(xco_event_poll(&op.done.base, &pv, NULL)); assert(pv == 1); - assert(xco_event_try(xco_queue_recv_event(&q), &v) && v == 2); + assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 2); xco_queue_send_op_deinit(&op); } @@ -2110,13 +2109,13 @@ static void test_queue_send_op_drop_inline(void) { /* DROP_NEWEST: op resolves inline regardless of fullness. */ uintptr_t buf[1]; xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_DROP_NEWEST); - assert(xco_queue_try_send(&q, 1)); + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); xco_queue_send_op_t op; xco_queue_send_op_init(&op, &q, 2); /* dropped */ assert(op.done.set); uintptr_t v; - assert(xco_event_try(&op.done.base, &v)); + assert(xco_event_poll(&op.done.base, &v, NULL)); assert(v == 1); xco_queue_send_op_deinit(&op); @@ -2127,7 +2126,7 @@ static void test_queue_send_op_select_loses(void) { uintptr_t buf[1]; xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); xco_latch_t l; xco_latch_init(&l); - assert(xco_queue_try_send(&q, 1)); + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); xco_queue_send_op_t op; xco_queue_send_op_init(&op, &q, 2); @@ -2154,16 +2153,16 @@ static void test_queue_send_op_select_loses(void) { /* ---- Chan close ---------------------------------------------------- */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_chan_t *c; xco_runtime_t *rt; - xco_step_waker_t sw; + xco_waker_t sw; int phase; xco_recv_status_t status; uintptr_t value; } chan_rx_t; -static xco_step_result_t chan_rx_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t chan_rx_step(xco_mach_t *s, uintptr_t v) { chan_rx_t *r = (chan_rx_t *)s; (void)v; switch (r->phase) { @@ -2174,8 +2173,8 @@ static xco_step_result_t chan_rx_step(xco_step_t *s, uintptr_t v) { r->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; } - xco_step_waker_init(&r->sw, r->rt, &r->base); - xco_event_park(&r->c->recv, &r->sw.base); + xco_waker_init(&r->sw, r->rt, &r->base); + xco_event_poll(&r->c->recv, NULL, &r->sw.base); r->phase = 1; return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } @@ -2188,7 +2187,7 @@ static xco_step_result_t chan_rx_step(xco_step_t *s, uintptr_t v) { } static void chan_rx_init(chan_rx_t *r, xco_runtime_t *rt, xco_chan_t *c) { - r->base = (xco_step_t){.step = chan_rx_step, .status = XCO_STEP_INIT}; + r->base = (xco_mach_t){.step = chan_rx_step, .status = XCO_STEP_INIT}; r->c = c; r->rt = rt; r->phase = 0; @@ -2213,48 +2212,49 @@ static void test_chan_close_wakes_receiver(void) { chan_rx_t r; chan_rx_init(&r, &rt, &c); xco_step(&r.base, 0); - assert(xco_step_status(&r.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); assert(c.recv_head == &r.sw.base); xco_chan_close(&c); assert(c.recv_head == NULL); xco_rt_run(&rt, 0); - assert(xco_step_status(&r.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); assert(r.status == XCO_RECV_CLOSED); } typedef struct { - xco_step_t base; + xco_mach_t base; xco_chan_t *c; xco_runtime_t *rt; - xco_chan_send_waker_t csw; + xco_chan_send_waiter_t csw; uintptr_t value; int phase; bool done; bool delivered; } chan_tx_t; -static xco_step_result_t chan_tx_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t chan_tx_step(xco_mach_t *s, uintptr_t v) { chan_tx_t *snd = (chan_tx_t *)s; (void)v; switch (snd->phase) { case 0: - if (xco_chan_try_send(snd->c, snd->value)) { + xco_chan_send_waiter_init(&snd->csw, snd->rt, &snd->base); + switch (xco_chan_send_poll(snd->c, snd->value, &snd->csw)) { + case XCO_SEND_DELIVERED: snd->done = true; snd->delivered = true; snd->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; - } - if (xco_chan_is_closed(snd->c)) { + case XCO_SEND_CLOSED: snd->done = true; snd->delivered = false; snd->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; + case XCO_SEND_BLOCKED: + snd->phase = 1; + return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } - xco_chan_send_waker_init(&snd->csw, snd->rt, &snd->base, snd->value); - xco_chan_park_send(snd->c, &snd->csw); - snd->phase = 1; - return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; + __builtin_unreachable(); case 1: snd->done = true; snd->delivered = snd->csw.delivered; @@ -2265,7 +2265,7 @@ static xco_step_result_t chan_tx_step(xco_step_t *s, uintptr_t v) { } static void chan_tx_init(chan_tx_t *snd, xco_runtime_t *rt, xco_chan_t *c, uintptr_t value) { - snd->base = (xco_step_t){.step = chan_tx_step, .status = XCO_STEP_INIT}; + snd->base = (xco_mach_t){.step = chan_tx_step, .status = XCO_STEP_INIT}; snd->c = c; snd->rt = rt; snd->value = value; @@ -2289,16 +2289,16 @@ static void test_chan_close_drains_senders(void) { assert(c.send_head == NULL); xco_rt_run(&rt, 0); for (int i = 0; i < 3; i++) { - assert(xco_step_status(&s[i].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s[i].base) == XCO_STEP_DEAD); assert(s[i].done); assert(!s[i].delivered); } } -static void test_chan_try_send_after_close(void) { +static void test_chan_send_try_after_close(void) { xco_chan_t c; xco_chan_init(&c); xco_chan_close(&c); - assert(!xco_chan_try_send(&c, 1)); + assert(xco_chan_send_poll(&c, 1, NULL) == XCO_SEND_CLOSED); } static void test_chan_send_op_close_inline(void) { @@ -2309,7 +2309,7 @@ static void test_chan_send_op_close_inline(void) { xco_chan_send_op_init(&op, &c, 0xABBA); assert(op.done.set); uintptr_t v; - assert(xco_event_try(&op.done.base, &v)); + assert(xco_event_poll(&op.done.base, &v, NULL)); assert(v == 0); /* delivered=false */ xco_chan_send_op_deinit(&op); } @@ -2324,7 +2324,7 @@ static void test_chan_send_op_close_drain(void) { xco_chan_close(&c); assert(op.done.set); uintptr_t v; - assert(xco_event_try(&op.done.base, &v)); + assert(xco_event_poll(&op.done.base, &v, NULL)); assert(v == 0); xco_chan_send_op_deinit(&op); } @@ -2341,7 +2341,7 @@ static void test_chan_send_op_delivered_payload(void) { assert(v == 0xFEED); assert(op.done.set); uintptr_t pv; - assert(xco_event_try(&op.done.base, &pv)); + assert(xco_event_poll(&op.done.base, &pv, NULL)); assert(pv == 1); xco_chan_send_op_deinit(&op); @@ -2350,16 +2350,16 @@ static void test_chan_send_op_delivered_payload(void) { /* ---- Queue close --------------------------------------------------- */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_queue_t *q; xco_runtime_t *rt; - xco_step_waker_t sw; + xco_waker_t sw; int phase; xco_recv_status_t status; uintptr_t value; } queue_rx_t; -static xco_step_result_t queue_rx_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t queue_rx_step(xco_mach_t *s, uintptr_t v) { queue_rx_t *r = (queue_rx_t *)s; (void)v; switch (r->phase) { @@ -2370,8 +2370,8 @@ static xco_step_result_t queue_rx_step(xco_step_t *s, uintptr_t v) { r->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; } - xco_step_waker_init(&r->sw, r->rt, &r->base); - xco_event_park(xco_queue_recv_event(r->q), &r->sw.base); + xco_waker_init(&r->sw, r->rt, &r->base); + xco_event_poll(xco_queue_recv_event(r->q), NULL, &r->sw.base); r->phase = 1; return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } @@ -2384,7 +2384,7 @@ static xco_step_result_t queue_rx_step(xco_step_t *s, uintptr_t v) { } static void queue_rx_init(queue_rx_t *r, xco_runtime_t *rt, xco_queue_t *q) { - r->base = (xco_step_t){.step = queue_rx_step, .status = XCO_STEP_INIT}; + r->base = (xco_mach_t){.step = queue_rx_step, .status = XCO_STEP_INIT}; r->q = q; r->rt = rt; r->phase = 0; @@ -2395,8 +2395,8 @@ static void queue_rx_init(queue_rx_t *r, xco_runtime_t *rt, xco_queue_t *q) { static void test_queue_close_drains_buffered_then_eof(void) { uintptr_t buf[3]; xco_queue_t q; xco_queue_init(&q, buf, 3, XCO_QUEUE_BLOCK); - assert(xco_queue_try_send(&q, 1)); - assert(xco_queue_try_send(&q, 2)); + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); + assert(xco_queue_send_poll(&q, 2, NULL) == XCO_QSEND_ACCEPTED); xco_queue_close(&q); assert(xco_queue_is_closed(&q)); @@ -2416,46 +2416,47 @@ static void test_queue_close_wakes_receiver(void) { queue_rx_t r; queue_rx_init(&r, &rt, &q); xco_step(&r.base, 0); - assert(xco_step_status(&r.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); xco_queue_close(&q); xco_rt_run(&rt, 0); - assert(xco_step_status(&r.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); assert(r.status == XCO_RECV_CLOSED); } typedef struct { - xco_step_t base; + xco_mach_t base; xco_queue_t *q; xco_runtime_t *rt; - xco_queue_send_waker_t qsw; + xco_queue_send_waiter_t qsw; uintptr_t value; int phase; bool done; bool delivered; } queue_tx_t; -static xco_step_result_t queue_tx_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t queue_tx_step(xco_mach_t *s, uintptr_t v) { queue_tx_t *snd = (queue_tx_t *)s; (void)v; switch (snd->phase) { case 0: - if (xco_queue_try_send(snd->q, snd->value)) { + xco_queue_send_waiter_init(&snd->qsw, snd->rt, &snd->base); + switch (xco_queue_send_poll(snd->q, snd->value, &snd->qsw)) { + case XCO_QSEND_ACCEPTED: snd->done = true; snd->delivered = true; snd->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; - } - if (xco_queue_is_closed(snd->q)) { + case XCO_QSEND_CLOSED: snd->done = true; snd->delivered = false; snd->phase = 2; return (xco_step_result_t){0, XCO_STEP_DEAD}; + case XCO_QSEND_BLOCKED: + snd->phase = 1; + return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } - xco_queue_send_waker_init(&snd->qsw, snd->rt, &snd->base, snd->value); - xco_queue_park_send(snd->q, &snd->qsw); - snd->phase = 1; - return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; + __builtin_unreachable(); case 1: snd->done = true; snd->delivered = snd->qsw.delivered; @@ -2466,7 +2467,7 @@ static xco_step_result_t queue_tx_step(xco_step_t *s, uintptr_t v) { } static void queue_tx_init(queue_tx_t *s, xco_runtime_t *rt, xco_queue_t *q, uintptr_t v) { - s->base = (xco_step_t){.step = queue_tx_step, .status = XCO_STEP_INIT}; + s->base = (xco_mach_t){.step = queue_tx_step, .status = XCO_STEP_INIT}; s->q = q; s->rt = rt; s->value = v; @@ -2479,46 +2480,49 @@ static void test_queue_close_drains_senders(void) { xco_runtime_t rt; xco_rt_init(&rt); uintptr_t buf[1]; xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); - assert(xco_queue_try_send(&q, 1)); + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); queue_tx_t s[2]; for (int i = 0; i < 2; i++) { queue_tx_init(&s[i], &rt, &q, (uintptr_t)(100 + i)); xco_step(&s[i].base, 0); - assert(xco_step_status(&s[i].base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&s[i].base) == XCO_STEP_SUSPENDED); } xco_queue_close(&q); xco_rt_run(&rt, 0); for (int i = 0; i < 2; i++) { - assert(xco_step_status(&s[i].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s[i].base) == XCO_STEP_DEAD); assert(!s[i].delivered); } } -static void test_queue_try_send_after_close_block(void) { +static void test_queue_send_try_after_close_block(void) { uintptr_t buf[1]; xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); xco_queue_close(&q); - assert(!xco_queue_try_send(&q, 42)); + assert(xco_queue_send_poll(&q, 42, NULL) == XCO_QSEND_CLOSED); } -static void test_queue_try_send_after_close_drop(void) { +static void test_queue_send_try_after_close_drop(void) { + /* Closed is closed regardless of policy: DROP_* also reports CLOSED + * (behavior change from the old try API, which returned "accepted" + * after close under DROP_*). */ uintptr_t buf[1]; xco_queue_t q1; xco_queue_init(&q1, buf, 1, XCO_QUEUE_DROP_NEWEST); xco_queue_close(&q1); - assert(xco_queue_try_send(&q1, 42)); + assert(xco_queue_send_poll(&q1, 42, NULL) == XCO_QSEND_CLOSED); uintptr_t buf2[1]; xco_queue_t q2; xco_queue_init(&q2, buf2, 1, XCO_QUEUE_DROP_OLDEST); xco_queue_close(&q2); - assert(xco_queue_try_send(&q2, 42)); + assert(xco_queue_send_poll(&q2, 42, NULL) == XCO_QSEND_CLOSED); } static void test_queue_send_op_close_drain(void) { uintptr_t buf[1]; xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); - assert(xco_queue_try_send(&q, 1)); + assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); xco_queue_send_op_t op; xco_queue_send_op_init(&op, &q, 2); @@ -2527,7 +2531,7 @@ static void test_queue_send_op_close_drain(void) { xco_queue_close(&q); assert(op.done.set); uintptr_t v; - assert(xco_event_try(&op.done.base, &v)); + assert(xco_event_poll(&op.done.base, &v, NULL)); assert(v == 0); xco_queue_send_op_deinit(&op); } @@ -2535,32 +2539,32 @@ static void test_queue_send_op_close_drain(void) { /* ---- Ticker -------------------------------------------------------- */ typedef struct { - xco_step_t base; + xco_mach_t base; xco_ticker_t *t; xco_runtime_t *rt; - xco_step_waker_t sw; + xco_waker_t sw; int n_seen; int target; uint64_t last; } ticker_sub_t; -static xco_step_result_t ticker_sub_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t ticker_sub_step(xco_mach_t *s, uintptr_t v) { ticker_sub_t *sub = (ticker_sub_t *)s; if (sub->n_seen > 0) sub->last = (uint64_t)v; if (sub->n_seen >= sub->target) return (xco_step_result_t){v, XCO_STEP_DEAD}; - xco_event_park(xco_ticker_event(sub->t), &sub->sw.base); + xco_event_poll(xco_ticker_event(sub->t), NULL, &sub->sw.base); sub->n_seen++; return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; } static void ticker_sub_init(ticker_sub_t *s, xco_runtime_t *rt, xco_ticker_t *t, int target) { - s->base = (xco_step_t){.step = ticker_sub_step, .status = XCO_STEP_INIT}; + s->base = (xco_mach_t){.step = ticker_sub_step, .status = XCO_STEP_INIT}; s->t = t; s->rt = rt; s->n_seen = 0; s->target = target; s->last = 0; - xco_step_waker_init(&s->sw, rt, &s->base); + xco_waker_init(&s->sw, rt, &s->base); } static void test_ticker_single_tick(void) { @@ -2573,10 +2577,10 @@ static void test_ticker_single_tick(void) { ticker_sub_t s; ticker_sub_init(&s, &rt, &ti, 1); xco_step(&s.base, 0); - assert(xco_step_status(&s.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); xco_rt_run(&rt, 100); - assert(xco_step_status(&s.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); assert(s.last == 100); xco_ticker_deinit(&ti); @@ -2594,11 +2598,11 @@ static void test_ticker_multiple_ticks(void) { xco_step(&s.base, 0); xco_rt_run(&rt, 50); - assert(xco_step_status(&s.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); assert(s.last == 50); xco_rt_run(&rt, 100); - assert(xco_step_status(&s.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); assert(s.last == 100); /* fired = 50 + period */ xco_ticker_deinit(&ti); @@ -2616,8 +2620,7 @@ static void test_ticker_deinit_before_fire(void) { xco_ticker_deinit(&ti); assert(!ti.timer.in_heap); - uint64_t out; - assert(!xco_timers_peek(&h.base, &out)); + assert(xco_timers_peek(&h.base) == UINT64_MAX); xco_rt_run(&rt, 10000); } @@ -2637,7 +2640,7 @@ static void test_ticker_select(void) { xco_step(&w.base, 0); xco_rt_run(&rt, 100); - assert(xco_step_status(&w.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); assert(w.got == XCO_WAIT_OK); assert(inputs[XCO_WAIT_OK].value == 100); @@ -2648,7 +2651,7 @@ static void test_ticker_select(void) { static void test_ticker_try_always_false(void) { xco_pairing_heap_t h; xco_pairing_heap_init(&h); xco_ticker_t ti; xco_ticker_init(&ti, &h.base, 100, 100); - assert(!xco_event_try(xco_ticker_event(&ti), NULL)); + assert(!xco_event_poll(xco_ticker_event(&ti), NULL, NULL)); xco_ticker_deinit(&ti); } @@ -2658,7 +2661,7 @@ static void test_task_group_empty_join(void) { /* An empty (no-attach) group's join is not fired — fan-in semantics * require at least one attach. */ xco_task_group_t g; xco_task_group_init(&g); - assert(!xco_event_try(xco_task_group_join_event(&g), NULL)); + assert(!xco_event_poll(xco_task_group_join_event(&g), NULL, NULL)); } static void test_task_group_join_after_all_finish(void) { @@ -2675,16 +2678,16 @@ static void test_task_group_join_after_all_finish(void) { xco_group_attach_t sa, sb; xco_task_group_attach(&g, &a, &sa); xco_task_group_attach(&g, &b, &sb); - assert(!xco_event_try(xco_task_group_join_event(&g), NULL)); + assert(!xco_event_poll(xco_task_group_join_event(&g), NULL, NULL)); - while (xco_step_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); + while (xco_mach_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); assert(xco_task_finished(&a)); - assert(!xco_event_try(xco_task_group_join_event(&g), NULL)); + assert(!xco_event_poll(xco_task_group_join_event(&g), NULL, NULL)); - while (xco_step_status(&b_body.base) != XCO_STEP_DEAD) xco_step(&b_body.base, 0); + while (xco_mach_status(&b_body.base) != XCO_STEP_DEAD) xco_step(&b_body.base, 0); assert(xco_task_finished(&b)); uintptr_t v; - assert(xco_event_try(xco_task_group_join_event(&g), &v)); + assert(xco_event_poll(xco_task_group_join_event(&g), &v, NULL)); assert(v == 0); } @@ -2708,10 +2711,10 @@ static void test_task_group_cancel_fanout(void) { assert(xco_task_is_cancelled(&b)); assert(xco_cancel_is_set(xco_task_group_cancel_handle(&g))); - while (xco_step_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); - while (xco_step_status(&b_body.base) != XCO_STEP_DEAD) xco_step(&b_body.base, 0); + while (xco_mach_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); + while (xco_mach_status(&b_body.base) != XCO_STEP_DEAD) xco_step(&b_body.base, 0); - assert(xco_event_try(xco_task_group_join_event(&g), NULL)); + assert(xco_event_poll(xco_task_group_join_event(&g), NULL, NULL)); } static void test_task_group_finished_detaches(void) { @@ -2728,7 +2731,7 @@ static void test_task_group_finished_detaches(void) { xco_task_group_attach(&g, &b, &sb); assert(g.head == &sa && g.tail == &sb); - while (xco_step_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); + while (xco_mach_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); /* a's bridge has fired → sa is detached. */ assert(g.head == &sb && g.tail == &sb); assert(sa.next == NULL && sa.prev == NULL); @@ -2756,7 +2759,7 @@ static void test_runtime_drains(void) { assert(rt.head == NULL); for (int i = 0; i < 3; i++) { - assert(xco_step_status(&w[i].base) == XCO_STEP_DEAD); + assert(xco_mach_status(&w[i].base) == XCO_STEP_DEAD); assert(w[i].got == 0xC0DE); } } @@ -2782,10 +2785,10 @@ int main(void) { test_chan_send_blocks_until_recv(); test_chan_recv_blocks_until_send(); - test_chan_try_send_no_recv(); + test_chan_send_try_no_recv(); test_chan_fifo(); test_chan_recv_fifo(); - test_chan_unpark_send(); + test_chan_send_unpark(); test_chan_select_recv(); test_chan_select_recv_fast_path(); test_chan_send_op_inline(); @@ -2834,7 +2837,7 @@ int main(void) { test_queue_direct_handoff(); test_queue_recv_blocks_then_drains_buffered(); test_queue_select_recv(); - test_queue_unpark_send(); + test_queue_send_unpark(); test_broadcast_try_always_false(); test_broadcast_publish_wakes_all(); @@ -2866,7 +2869,7 @@ int main(void) { test_chan_close_empty(); test_chan_close_wakes_receiver(); test_chan_close_drains_senders(); - test_chan_try_send_after_close(); + test_chan_send_try_after_close(); test_chan_send_op_close_inline(); test_chan_send_op_close_drain(); test_chan_send_op_delivered_payload(); @@ -2874,8 +2877,8 @@ int main(void) { test_queue_close_drains_buffered_then_eof(); test_queue_close_wakes_receiver(); test_queue_close_drains_senders(); - test_queue_try_send_after_close_block(); - test_queue_try_send_after_close_drop(); + test_queue_send_try_after_close_block(); + test_queue_send_try_after_close_drop(); test_queue_send_op_close_drain(); test_ticker_single_tick(); diff --git a/tests/test_xco.c b/tests/test_xco.c @@ -3,7 +3,7 @@ * * Exercises: status transitions, value channel in both directions, * multiple suspends, nested resumes, xco_self. Also drives a hand- - * coded xco_step_t state machine through the same generic interface as + * coded xco_mach_t state machine through the same generic interface as * a coroutine, to verify the unification. */ @@ -42,12 +42,12 @@ static uintptr_t counter(uintptr_t n) { * first input n, then yield n+1, n+2, n+3, return n+4. No stack * switch — suspension is just returning XCO_STEP_SUSPENDED. */ typedef struct { - xco_step_t base; + xco_mach_t base; int phase; uintptr_t n; } counter_sm_t; -static xco_step_result_t counter_sm_step(xco_step_t *s, uintptr_t v) { +static xco_step_result_t counter_sm_step(xco_mach_t *s, uintptr_t v) { counter_sm_t *p = (counter_sm_t *)s; switch (p->phase++) { case 0: p->n = v; return (xco_step_result_t){v + 1, XCO_STEP_SUSPENDED}; @@ -59,15 +59,15 @@ static xco_step_result_t counter_sm_step(xco_step_t *s, uintptr_t v) { } static void counter_sm_init(counter_sm_t *p) { - p->base = (xco_step_t){.step = counter_sm_step, .status = XCO_STEP_INIT}; + p->base = (xco_mach_t){.step = counter_sm_step, .status = XCO_STEP_INIT}; p->phase = 0; p->n = 0; } -/* Generic driver: takes any xco_step_t implementing the counter contract +/* Generic driver: takes any xco_mach_t implementing the counter contract * and walks it to completion. Knows nothing about coroutines. */ -static void drive_counter(xco_step_t *s) { - assert(xco_step_status(s) == XCO_STEP_INIT); +static void drive_counter(xco_mach_t *s) { + assert(xco_mach_status(s) == XCO_STEP_INIT); xco_step_result_t r = xco_step(s, 10); assert(r.status == XCO_STEP_SUSPENDED && r.value == 11); r = xco_step(s, 100); @@ -76,10 +76,10 @@ static void drive_counter(xco_step_t *s) { assert(r.status == XCO_STEP_SUSPENDED && r.value == 13); r = xco_step(s, 300); assert(r.status == XCO_STEP_DEAD && r.value == 14); - assert(xco_step_status(s) == XCO_STEP_DEAD); + assert(xco_mach_status(s) == XCO_STEP_DEAD); } -static xco_t inner; +static xco_coro_t inner; /* Spawns `inner` and pumps it once, demonstrating nested resumes. */ static uintptr_t outer(uintptr_t arg) { @@ -130,7 +130,7 @@ static void test_xco_yield_alternates(void) { yielder_args_t a1 = {.id = 1, .n = 3, .rt = &rt}; yielder_args_t a2 = {.id = 2, .n = 3, .rt = &rt}; - xco_t c1, c2; + xco_coro_t c1, c2; /* Spawn each: first step records the id, then yields back to caller * via the runtime ready queue. After spawn, both are SUSPENDED and * enqueued. */ @@ -142,8 +142,8 @@ static void test_xco_yield_alternates(void) { /* Drain the runtime: each yielder runs another step, yields again, * until both reach DEAD. */ xco_rt_run(&rt, 0); - assert(xco_step_status(&c1.base) == XCO_STEP_DEAD); - assert(xco_step_status(&c2.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&c1.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&c2.base) == XCO_STEP_DEAD); /* Trace alternates: 1 2 1 2 1 2 (FIFO ready-queue ordering). */ assert(trace_len == 6); @@ -171,7 +171,7 @@ static void test_xco_task_join_inline(void) { assert(xco_task_finished(&xt.task)); uintptr_t v; - assert(xco_event_try(xco_task_done_event(&xt.task), &v)); + assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); assert(v == 42); } @@ -201,7 +201,7 @@ static void test_xco_task_join_via_runtime(void) { assert(xco_task_finished(&xt.task)); uintptr_t v; - assert(xco_event_try(xco_task_done_event(&xt.task), &v)); + assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); assert(v == 107); } @@ -241,7 +241,7 @@ static void test_xco_task_joined_by_other_task(void) { /* Joiner's return is the target's return propagated through. */ uintptr_t v; - assert(xco_event_try(xco_task_done_event(&joiner.task), &v)); + assert(xco_event_poll(xco_task_done_event(&joiner.task), &v, NULL)); assert(v == 107); } @@ -268,7 +268,7 @@ static void test_xco_task_cancel(void) { assert(xco_task_finished(&xt.task)); uintptr_t v; - assert(xco_event_try(xco_task_done_event(&xt.task), &v)); + assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); assert(v == 0); /* cancelled */ } @@ -279,7 +279,7 @@ static void test_xco_task_cancel(void) { static void test_xco_task_via_xstep(void) { xco_cotask_t xt; xco_cotask_init(&xt, task_body_simple, stack_t1, STACK_BYTES); - assert(xco_step_status(&xt.co.base) == XCO_STEP_INIT); + assert(xco_mach_status(&xt.co.base) == XCO_STEP_INIT); xco_step_result_t r = xco_step(&xt.co.base, 41); assert(r.status == XCO_STEP_DEAD); @@ -287,32 +287,32 @@ static void test_xco_task_via_xstep(void) { assert(xco_task_finished(&xt.task)); uintptr_t v; - assert(xco_event_try(xco_task_done_event(&xt.task), &v)); + assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); assert(v == 42); } int main(void) { assert(xco_self() == NULL); - xco_t c; + xco_coro_t c; xco_init(&c, outer, stack_a, STACK_BYTES); - assert(xco_step_status(&c.base) == XCO_STEP_INIT); + assert(xco_mach_status(&c.base) == XCO_STEP_INIT); xco_step_result_t r = xco_step(&c.base, 0); assert(xco_self() == NULL); assert(r.status == XCO_STEP_SUSPENDED); assert(r.value == 11); - assert(xco_step_status(&c.base) == XCO_STEP_SUSPENDED); + assert(xco_mach_status(&c.base) == XCO_STEP_SUSPENDED); r = xco_step(&c.base, 42); assert(r.status == XCO_STEP_DEAD); assert(r.value == 999); - assert(xco_step_status(&c.base) == XCO_STEP_DEAD); + assert(xco_mach_status(&c.base) == XCO_STEP_DEAD); /* Unification: the same generic driver pumps a coroutine and a * hand-coded state machine, both reaching XCO_STEP_DEAD with the * expected value sequence. */ - xco_t co; + xco_coro_t co; xco_init(&co, counter, stack_c, STACK_BYTES); drive_counter(&co.base); diff --git a/xco.c b/xco.c @@ -2,12 +2,13 @@ * xco.c — implementation for xco.h. * * Two parts: - * - Event substrate (runtime, latch, semaphore, select/allof, channel, - * queue, broadcast, notify, timer, pairing heap, timeout, ticker, - * task group). Each event type is a small struct with a static vtable. + * - Event substrate (runtime, latch, semaphore, select/allof, queue + * (chan is a typedef alias for the cap=0+BLOCK case), broadcast, + * notify, timer, pairing heap, timeout, ticker, task group). Each + * event type is a small struct with a static vtable. * Waitlists are intrusive linked lists; fire-all detaches the whole * list before iterating so callbacks can do anything, including - * re-park or unpark sibling wakers, without iterator hazards. + * re-park or unpark sibling waiters, without iterator hazards. * * - Stack-switching coroutines (xco). The platform layer * (arch/<name>/xco_arch.c) supplies the register save/restore @@ -37,15 +38,15 @@ /* xco_rt_init and xco_rt_enqueue are defined inline in xco.h. */ -static xco_waker_t *xco_rt_dequeue(xco_runtime_t *rt) { - xco_waker_t *w = rt->head; +static xco_waiter_t *xco_rt_dequeue(xco_runtime_t *rt) { + xco_waiter_t *w = rt->head; if (!w) return NULL; rt->head = w->next; if (!rt->head) rt->tail = NULL; - /* Hand the waker back fully detached. Every fire path already clears + /* Hand the waiter back fully detached. Every fire path already clears * prev before firing (and we just walked off the ready-queue), so * w->prev is NULL in practice — making it explicit here means - * step_waker users can re-park without re-init, regardless of which + * waker users can re-park without re-init, regardless of which * fire path resumed them. */ w->next = NULL; w->prev = NULL; @@ -53,13 +54,13 @@ static xco_waker_t *xco_rt_dequeue(xco_runtime_t *rt) { } void xco_rt_run(xco_runtime_t *rt, uint64_t now) { - /* The runtime ready queue holds only step-wakers. Other waker + /* The runtime ready queue holds only wakers. Other waiter * shapes (e.g. select_input) fire synchronously inside event * notify paths and never reach here. * * If a timer source is attached, advance it at the top of each * iteration: any timer whose deadline <= now fires, enqueueing its - * step-wakers, which the inner loop then drains. A step may insert + * wakers, which the inner loop then drains. A step may insert * a fresh already-expired timer; the outer loop catches it on the * next pass. Termination: each pass either drains a non-empty * queue or exits, and advance only fires timers it then removes, @@ -67,19 +68,19 @@ void xco_rt_run(xco_runtime_t *rt, uint64_t now) { for (;;) { if (rt->timers) xco_timers_advance(rt->timers, now); if (!rt->head) return; - for (xco_waker_t *w; (w = xco_rt_dequeue(rt));) { - xco_step_waker_t *sw = (xco_step_waker_t *)w; - xco_step(sw->step, sw->resume_value); + for (xco_waiter_t *w; (w = xco_rt_dequeue(rt));) { + xco_waker_t *sw = (xco_waker_t *)w; + xco_step(sw->mach, sw->resume_value); } } } -/* ---- Step waker ------------------------------------------------------- */ +/* ---- Waker ------------------------------------------------------------ */ -/* Exposed (with leading underscore) so the inline xco_step_waker_init in +/* Exposed (with leading underscore) so the inline xco_waker_init in * xco.h can install it without dragging the body into the header. */ -void xco__step_waker_fire(xco_waker_t *w, uintptr_t value) { - xco_step_waker_t *sw = (xco_step_waker_t *)w; +void xco__waker_fire(xco_waiter_t *w, uintptr_t value) { + xco_waker_t *sw = (xco_waker_t *)w; sw->resume_value = value; xco_rt_enqueue(sw->rt, w); } @@ -88,30 +89,27 @@ void xco__step_waker_fire(xco_waker_t *w, uintptr_t value) { * Latch * ==================================================================== */ -static bool xco_latch_try(xco_event_t *e, uintptr_t *out) { +static bool xco_latch_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { xco_latch_t *l = (xco_latch_t *)e; - if (!l->set) return false; - if (out) *out = l->value; - return true; -} - -static void xco_latch_park(xco_event_t *e, xco_waker_t *w) { - xco_latch_t *l = (xco_latch_t *)e; - /* Single-threaded contract: caller must have just observed try=false. */ - assert(!l->set); - /* One-waker invariant: w must arrive clean. Detach paths (xco_latch_set - * iterator, xco_latch_unpark, xco_select_event_deinit) all leave wakers in + if (l->set) { + if (out) *out = l->value; + return true; + } + if (!w) return false; + /* One-waiter invariant: w must arrive clean. Detach paths (xco_latch_set + * iterator, xco_latch_unpark, xco_select_event_deinit) all leave waiters in * this state. A trip here means a double-park bug. */ assert(!w->prev && !w->next); w->next = l->waiters; if (l->waiters) l->waiters->prev = w; l->waiters = w; + return false; } -static void xco_latch_unpark(xco_event_t *e, xco_waker_t *w) { +static void xco_latch_unpark(xco_event_t *e, xco_waiter_t *w) { xco_latch_t *l = (xco_latch_t *)e; /* Detect "not on this list" via the invariant maintained by park - * and the detach paths: a parked waker has prev set OR is the + * and the detach paths: a parked waiter has prev set OR is the * head; a detached one has prev == NULL and is not the head. */ if (!w->prev && l->waiters != w) return; if (w->prev) w->prev->next = w->next; @@ -122,8 +120,7 @@ static void xco_latch_unpark(xco_event_t *e, xco_waker_t *w) { /* Exposed so the inline xco_latch_init in xco.h can reference it. */ const xco_event_vtable_t xco__latch_vt = { - .try_ = xco_latch_try, - .park = xco_latch_park, + .poll = xco_latch_poll, .unpark = xco_latch_unpark, }; @@ -132,14 +129,14 @@ void xco_latch_set(xco_latch_t *l, uintptr_t value) { l->set = true; l->value = value; - /* Detach the whole waitlist before firing. A waker's fire callback + /* Detach the whole waitlist before firing. A waiter's fire callback * might do anything (including unpark a sibling on another event), * but it cannot mutate this list — it's already gone. */ - xco_waker_t *w = l->waiters; + xco_waiter_t *w = l->waiters; l->waiters = NULL; while (w) { - xco_waker_t *next = w->next; /* save before xco_waker_fire clears */ - xco_waker_fire(w, value); + xco_waiter_t *next = w->next; /* save before xco_waiter_fire clears */ + xco_waiter_fire(w, value); w = next; } } @@ -152,7 +149,7 @@ void xco_latch_set(xco_latch_t *l, uintptr_t value) { * head/tail pair through chan_q_*). * ==================================================================== */ -static void xco_sem_q_push(xco_semaphore_t *s, xco_waker_t *w) { +static void xco_sem_q_push(xco_semaphore_t *s, xco_waiter_t *w) { assert(!w->prev && !w->next); w->prev = s->tail; w->next = NULL; @@ -161,8 +158,8 @@ static void xco_sem_q_push(xco_semaphore_t *s, xco_waker_t *w) { s->tail = w; } -static xco_waker_t *xco_sem_q_pop(xco_semaphore_t *s) { - xco_waker_t *w = s->head; +static xco_waiter_t *xco_sem_q_pop(xco_semaphore_t *s) { + xco_waiter_t *w = s->head; if (!w) return NULL; s->head = w->next; if (s->head) s->head->prev = NULL; @@ -171,7 +168,7 @@ static xco_waker_t *xco_sem_q_pop(xco_semaphore_t *s) { return w; } -static void xco_sem_q_remove(xco_semaphore_t *s, xco_waker_t *w) { +static void xco_sem_q_remove(xco_semaphore_t *s, xco_waiter_t *w) { if (!w->prev && s->head != w) return; if (w->prev) w->prev->next = w->next; else s->head = w->next; @@ -180,29 +177,25 @@ static void xco_sem_q_remove(xco_semaphore_t *s, xco_waker_t *w) { w->prev = w->next = NULL; } -static bool xco_semaphore_try(xco_event_t *e, uintptr_t *out) { +static bool xco_semaphore_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { xco_semaphore_t *s = (xco_semaphore_t *)e; - if (s->permits == 0) return false; - s->permits--; - if (out) *out = 1; - return true; -} - -static void xco_semaphore_park(xco_event_t *e, xco_waker_t *w) { - xco_semaphore_t *s = (xco_semaphore_t *)e; - /* Single-threaded contract: caller just observed try=false (permits=0). */ - assert(s->permits == 0); + if (s->permits > 0) { + s->permits--; + if (out) *out = 1; + return true; + } + if (!w) return false; xco_sem_q_push(s, w); + return false; } -static void xco_semaphore_unpark(xco_event_t *e, xco_waker_t *w) { +static void xco_semaphore_unpark(xco_event_t *e, xco_waiter_t *w) { xco_semaphore_t *s = (xco_semaphore_t *)e; xco_sem_q_remove(s, w); } const xco_event_vtable_t xco__semaphore_acquire_vt = { - .try_ = xco_semaphore_try, - .park = xco_semaphore_park, + .poll = xco_semaphore_poll, .unpark = xco_semaphore_unpark, }; @@ -213,13 +206,13 @@ void xco_semaphore_release(xco_semaphore_t *s, size_t n) { * and park behind the existing waiters until everyone ahead has been * served. */ while (n > 0) { - xco_waker_t *w = xco_sem_q_pop(s); + xco_waiter_t *w = xco_sem_q_pop(s); if (!w) break; n--; - /* Fire value is conventional 1 — "you got a permit". Step-waker + /* Fire value is conventional 1 — "you got a permit". Step-waiter * users ignore the value; select inputs capture it as the input's * value field. */ - xco_waker_fire(w, 1); + xco_waiter_fire(w, 1); } s->permits += n; } @@ -231,10 +224,10 @@ void xco_semaphore_release(xco_semaphore_t *s, size_t n) { /* One fire callback serves both modes. A counter `remaining` is decremented * on each fire; done is set when it hits 0. select inits remaining=1 (any * one fire closes); allof inits remaining=n (every input must fire). The - * disarm-siblings loop is a no-op for already-fired wakers, so it runs + * disarm-siblings loop is a no-op for already-fired waiters, so it runs * uniformly: for select it cleans up still-parked losers, for allof it * does nothing (every sibling is already detached by its source). */ -static void xco_select_input_fire(xco_waker_t *w, uintptr_t value) { +static void xco_select_input_fire(xco_waiter_t *w, uintptr_t value) { xco_select_input_t *in = (xco_select_input_t *)w; xco_select_event_t *s = in->parent; @@ -247,8 +240,8 @@ static void xco_select_input_fire(xco_waker_t *w, uintptr_t value) { if (--s->remaining > 0) return; size_t i = (size_t)(in - s->inputs); - /* Disarm anyone still parked so their wakers don't dangle on input - * waitlists past s's lifetime. Idempotent on already-detached wakers. */ + /* Disarm anyone still parked so their waiters don't dangle on input + * waitlists past s's lifetime. Idempotent on already-detached waiters. */ for (size_t j = 0; j < s->n; j++) { if (j != i) xco_event_unpark(s->inputs[j].src, &s->inputs[j].w); } @@ -267,7 +260,7 @@ void xco_select_event_init(xco_select_event_t *s, * so deinit has nothing to disarm. */ for (size_t i = 0; i < n; i++) { uintptr_t v; - if (xco_event_try(srcs[i], &v)) { + if (xco_event_poll(srcs[i], &v, NULL)) { inputs[i].value = v; /* captured for inputs[winner].value */ xco_latch_set(&s->done, i); return; @@ -281,7 +274,7 @@ void xco_select_event_init(xco_select_event_t *s, inputs[i].src = srcs[i]; inputs[i].parent = s; inputs[i].value = 0; - xco_event_park(srcs[i], &inputs[i].w); + (void)xco_event_poll(srcs[i], NULL, &inputs[i].w); } } @@ -295,9 +288,10 @@ void xco_allof_event_init(xco_select_event_t *s, if (n == 0) { xco_latch_set(&s->done, 0); return; } - /* Initialize each input then try-or-park. An already-ready input is + /* Initialize each input then poll. Fused: an already-ready input is * consumed inline (value captured, remaining--, no parking); the - * rest park. If everyone was inline, fire done at the end. */ + * rest end up parked by the same call. If everyone was inline, fire + * done at the end. */ for (size_t i = 0; i < n; i++) { inputs[i].w.next = NULL; inputs[i].w.prev = NULL; @@ -307,11 +301,9 @@ void xco_allof_event_init(xco_select_event_t *s, inputs[i].value = 0; uintptr_t v; - if (xco_event_try(srcs[i], &v)) { + if (xco_event_poll(srcs[i], &v, &inputs[i].w)) { inputs[i].value = v; s->remaining--; - } else { - xco_event_park(srcs[i], &inputs[i].w); } } @@ -324,7 +316,7 @@ void xco_select_event_deinit(xco_select_event_t *s) { /* done.set => the closing fire already disarmed everyone (or the * fast path skipped parking entirely). Otherwise — possible after a * partial allof — some inputs may still be parked; unpark is - * idempotent for already-detached wakers. */ + * idempotent for already-detached waiters. */ if (s->done.set) return; for (size_t i = 0; i < s->n; i++) { xco_event_unpark(s->inputs[i].src, &s->inputs[i].w); @@ -332,14 +324,16 @@ void xco_select_event_deinit(xco_select_event_t *s) { } /* ==================================================================== - * Channel + * FIFO waitlist helpers (shared by queue and notify) * - * Doubly-linked FIFO push/pop on a channel waitlist. Same shape as - * latch's list operations but with an explicit tail so arrival order - * is preserved (unlike latch, where waiter order is irrelevant). + * Doubly-linked FIFO push/pop. Same shape as latch's list operations + * but with an explicit tail so arrival order is preserved (unlike + * latch, where waiter order is irrelevant). The xco_chan_q_* names are + * historical — the chan code that originally used them is now folded + * into queue (chan = queue at cap=0 + BLOCK). * ==================================================================== */ -static void xco_chan_q_push(xco_waker_t **head, xco_waker_t **tail, xco_waker_t *w) { +static void xco_chan_q_push(xco_waiter_t **head, xco_waiter_t **tail, xco_waiter_t *w) { assert(!w->prev && !w->next); w->prev = *tail; w->next = NULL; @@ -348,8 +342,8 @@ static void xco_chan_q_push(xco_waker_t **head, xco_waker_t **tail, xco_waker_t *tail = w; } -static xco_waker_t *xco_chan_q_pop(xco_waker_t **head, xco_waker_t **tail) { - xco_waker_t *w = *head; +static xco_waiter_t *xco_chan_q_pop(xco_waiter_t **head, xco_waiter_t **tail) { + xco_waiter_t *w = *head; if (!w) return NULL; *head = w->next; if (*head) (*head)->prev = NULL; @@ -358,8 +352,8 @@ static xco_waker_t *xco_chan_q_pop(xco_waker_t **head, xco_waker_t **tail) { return w; } -static void xco_chan_q_remove(xco_waker_t **head, xco_waker_t **tail, xco_waker_t *w) { - /* Same not-on-list test as xco_latch_unpark: a queued waker has prev +static void xco_chan_q_remove(xco_waiter_t **head, xco_waiter_t **tail, xco_waiter_t *w) { + /* Same not-on-list test as xco_latch_unpark: a queued waiter has prev * set OR is the head; a detached one has prev == NULL and isn't * the head. */ if (!w->prev && *head != w) return; @@ -370,149 +364,6 @@ static void xco_chan_q_remove(xco_waker_t **head, xco_waker_t **tail, xco_waker_ w->prev = w->next = NULL; } -/* Recover the xco_chan_t from its embedded recv event. */ -static inline xco_chan_t *xco_chan_of_recv(xco_event_t *e) { - return (xco_chan_t *)((char *)e - offsetof(xco_chan_t, recv)); -} - -static bool xco_chan_recv_try(xco_event_t *e, uintptr_t *out) { - xco_chan_t *c = xco_chan_of_recv(e); - xco_waker_t *w = xco_chan_q_pop(&c->send_head, &c->send_tail); - if (w) { - /* w is &csw->sw.base; sw is the first field of xco_chan_send_waker_t, - * and base is the first field of xco_step_waker_t, so addresses align. */ - xco_chan_send_waker_t *csw = (xco_chan_send_waker_t *)w; - if (out) *out = csw->value; - csw->delivered = true; - /* Resume the sender. The fire value is unused for step-wakers; - * for op senders, xco__chan_send_op_fire reads csw->delivered. */ - xco_waker_fire(w, 0); - return true; - } - /* Close makes the recv event "ready" with no value: the receiver is - * expected to call xco_chan_recv to learn it's XCO_RECV_CLOSED. *out is - * undefined in that case (set to 0 here for determinism). */ - if (c->closed) { - if (out) *out = 0; - return true; - } - return false; -} - -static void xco_chan_recv_park(xco_event_t *e, xco_waker_t *w) { - xco_chan_t *c = xco_chan_of_recv(e); - xco_chan_q_push(&c->recv_head, &c->recv_tail, w); -} - -static void xco_chan_recv_unpark(xco_event_t *e, xco_waker_t *w) { - xco_chan_t *c = xco_chan_of_recv(e); - xco_chan_q_remove(&c->recv_head, &c->recv_tail, w); -} - -const xco_event_vtable_t xco__chan_recv_vt = { - .try_ = xco_chan_recv_try, - .park = xco_chan_recv_park, - .unpark = xco_chan_recv_unpark, -}; - -bool xco_chan_try_send(xco_chan_t *c, uintptr_t value) { - if (c->closed) return false; - xco_waker_t *w = xco_chan_q_pop(&c->recv_head, &c->recv_tail); - if (!w) return false; - /* Hand the value to the recv-side waker. step_waker stashes it as - * resume_value; xco_select_input_fire stashes it in input.value. */ - xco_waker_fire(w, value); - return true; -} - -void xco_chan_park_send(xco_chan_t *c, xco_chan_send_waker_t *csw) { - /* park_send after close is UB — caller must check xco_chan_is_closed - * (typically via xco_chan_try_send returning false plus xco_chan_is_closed). */ - assert(!c->closed); - xco_chan_q_push(&c->send_head, &c->send_tail, &csw->sw.base); -} - -void xco_chan_unpark_send(xco_chan_t *c, xco_chan_send_waker_t *csw) { - xco_chan_q_remove(&c->send_head, &c->send_tail, &csw->sw.base); -} - -xco_recv_status_t xco_chan_recv(xco_chan_t *c, uintptr_t *out) { - xco_waker_t *w = xco_chan_q_pop(&c->send_head, &c->send_tail); - if (w) { - xco_chan_send_waker_t *csw = (xco_chan_send_waker_t *)w; - if (out) *out = csw->value; - csw->delivered = true; - xco_waker_fire(w, 0); - return XCO_RECV_GOT; - } - if (c->closed) return XCO_RECV_CLOSED; - return XCO_RECV_EMPTY; -} - -void xco_chan_close(xco_chan_t *c) { - if (c->closed) return; - c->closed = true; - - /* Drain parked senders with delivered=false. xco_waker_fire detaches - * before invoking the callback, so re-park inside fire (e.g. to - * land on the runtime ready queue) is safe. */ - xco_waker_t *w; - while ((w = xco_chan_q_pop(&c->send_head, &c->send_tail)) != NULL) { - xco_chan_send_waker_t *csw = (xco_chan_send_waker_t *)w; - csw->delivered = false; - xco_waker_fire(w, 0); - } - /* Wake parked receivers so they observe XCO_RECV_CLOSED via xco_chan_recv. - * Fire value is irrelevant — the recv xco_event_try will return true - * because c->closed is set, but receivers should use xco_chan_recv. */ - while ((w = xco_chan_q_pop(&c->recv_head, &c->recv_tail)) != NULL) { - xco_waker_fire(w, 0); - } -} - -/* ---- Send op (selectable send) ---------------------------------------- */ - -void xco__chan_send_op_fire(xco_waker_t *w, uintptr_t value) { - /* Receiver hands no payload to the sender on delivery — the sender - * learns whether its value reached someone via op->csw.delivered, - * set by xco_chan_recv* (true) or xco_chan_close (false). */ - (void)value; - /* csw is the first field of xco_chan_send_op_t; sw is first of - * xco_chan_send_waker_t; base is first of xco_step_waker_t. All offsets - * coincide, so w aliases op. */ - xco_chan_send_op_t *op = (xco_chan_send_op_t *)w; - xco_latch_set(&op->done, op->csw.delivered ? 1 : 0); -} - -void xco_chan_send_op_init(xco_chan_send_op_t *op, xco_chan_t *c, uintptr_t value) { - /* The embedded chan_send_waker carries the value and provides the - * waker layout the chan's send list expects. rt/step are unused — - * fire goes straight to the latch, no scheduler hop. */ - xco_chan_send_waker_init(&op->csw, NULL, NULL, value); - op->csw.sw.base.fire = xco__chan_send_op_fire; - op->chan = c; - xco_latch_init(&op->done); - - if (c->closed) { - /* Closed channel: no delivery possible, resolve immediately. */ - xco_latch_set(&op->done, 0); - return; - } - if (xco_chan_try_send(c, value)) { - /* Inline delivery: no parking, done set immediately. */ - op->csw.delivered = true; - xco_latch_set(&op->done, 1); - return; - } - xco_chan_park_send(c, &op->csw); -} - -void xco_chan_send_op_deinit(xco_chan_send_op_t *op) { - /* If delivered, the waker is already off the send list. If not - * (e.g., select cancellation), pull it off so it doesn't dangle. */ - if (op->done.set) return; - xco_chan_unpark_send(op->chan, &op->csw); -} /* ==================================================================== * Queue @@ -550,15 +401,15 @@ static inline uintptr_t xco_queue_pop_buf(xco_queue_t *q) { * sender's value (which was queued later). No-op if no sender parked. */ static void xco_queue_drain_one_sender(xco_queue_t *q) { if (!q->send_head) return; - xco_waker_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail); - xco_queue_send_waker_t *qsw = (xco_queue_send_waker_t *)w; + xco_waiter_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail); + xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w; xco_queue_push_buf(q, qsw->value); qsw->delivered = true; /* Fire after pushing so the sender sees its delivery as complete. */ - xco_waker_fire(w, 0); + xco_waiter_fire(w, 0); } -static bool xco_queue_recv_try(xco_event_t *e, uintptr_t *out) { +static bool xco_queue_recv_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { xco_queue_t *q = xco_queue_of_recv(e); if (q->len > 0) { uintptr_t v = xco_queue_pop_buf(q); @@ -569,11 +420,11 @@ static bool xco_queue_recv_try(xco_event_t *e, uintptr_t *out) { /* Empty buffer. If a sender is parked here it can only mean cap==0 * (otherwise the sender would have used the buffer). Hand directly. */ if (q->send_head) { - xco_waker_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail); - xco_queue_send_waker_t *qsw = (xco_queue_send_waker_t *)w; + xco_waiter_t *sender = xco_chan_q_pop(&q->send_head, &q->send_tail); + xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)sender; if (out) *out = qsw->value; qsw->delivered = true; - xco_waker_fire(w, 0); + xco_waiter_fire(sender, 0); return true; } /* Closed and drained: receivers learn EOF via xco_queue_recv; out is @@ -582,66 +433,55 @@ static bool xco_queue_recv_try(xco_event_t *e, uintptr_t *out) { if (out) *out = 0; return true; } - return false; -} - -static void xco_queue_recv_park(xco_event_t *e, xco_waker_t *w) { - xco_queue_t *q = xco_queue_of_recv(e); + if (!w) return false; xco_chan_q_push(&q->recv_head, &q->recv_tail, w); + return false; } -static void xco_queue_recv_unpark(xco_event_t *e, xco_waker_t *w) { +static void xco_queue_recv_unpark(xco_event_t *e, xco_waiter_t *w) { xco_queue_t *q = xco_queue_of_recv(e); xco_chan_q_remove(&q->recv_head, &q->recv_tail, w); } const xco_event_vtable_t xco__queue_recv_vt = { - .try_ = xco_queue_recv_try, - .park = xco_queue_recv_park, + .poll = xco_queue_recv_poll, .unpark = xco_queue_recv_unpark, }; -bool xco_queue_try_send(xco_queue_t *q, uintptr_t value) { - if (q->closed) { - /* Send-after-close. BLOCK: no delivery, signal failure. DROP_*: - * silently drop (queue policy already says "may be lost"). */ - if (q->policy == XCO_QUEUE_BLOCK) return false; - return true; - } +xco_queue_send_status_t xco_queue_send_poll(xco_queue_t *q, uintptr_t value, + xco_queue_send_waiter_t *qsw) { + /* Closed is closed regardless of policy: caller learns the truth. */ + if (q->closed) return XCO_QSEND_CLOSED; + /* Direct handoff first: parked receivers always win over the buffer. * This is the rendezvous case and the cap==0 case. */ - xco_waker_t *w = xco_chan_q_pop(&q->recv_head, &q->recv_tail); + xco_waiter_t *w = xco_chan_q_pop(&q->recv_head, &q->recv_tail); if (w) { - xco_waker_fire(w, value); - return true; + xco_waiter_fire(w, value); + return XCO_QSEND_ACCEPTED; } if (q->len < q->cap) { xco_queue_push_buf(q, value); - return true; + return XCO_QSEND_ACCEPTED; } /* Buffer full and no waiting receiver. */ switch (q->policy) { case XCO_QUEUE_BLOCK: - return false; + if (!qsw) return XCO_QSEND_BLOCKED; + qsw->value = value; + xco_chan_q_push(&q->send_head, &q->send_tail, &qsw->sw.base); + return XCO_QSEND_BLOCKED; case XCO_QUEUE_DROP_NEWEST: - return true; + return XCO_QSEND_ACCEPTED; case XCO_QUEUE_DROP_OLDEST: (void)xco_queue_pop_buf(q); xco_queue_push_buf(q, value); - return true; + return XCO_QSEND_ACCEPTED; } __builtin_unreachable(); } -void xco_queue_park_send(xco_queue_t *q, xco_queue_send_waker_t *qsw) { - /* DROP_* never parks (try_send always returns true); only valid - * for BLOCK. park_send after close is UB. */ - assert(q->policy == XCO_QUEUE_BLOCK); - assert(!q->closed); - xco_chan_q_push(&q->send_head, &q->send_tail, &qsw->sw.base); -} - -void xco_queue_unpark_send(xco_queue_t *q, xco_queue_send_waker_t *qsw) { +void xco_queue_send_unpark(xco_queue_t *q, xco_queue_send_waiter_t *qsw) { xco_chan_q_remove(&q->send_head, &q->send_tail, &qsw->sw.base); } @@ -653,11 +493,11 @@ xco_recv_status_t xco_queue_recv(xco_queue_t *q, uintptr_t *out) { return XCO_RECV_GOT; } if (q->send_head) { - xco_waker_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail); - xco_queue_send_waker_t *qsw = (xco_queue_send_waker_t *)w; + xco_waiter_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail); + xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w; if (out) *out = qsw->value; qsw->delivered = true; - xco_waker_fire(w, 0); + xco_waiter_fire(w, 0); return XCO_RECV_GOT; } if (q->closed) return XCO_RECV_CLOSED; @@ -671,54 +511,46 @@ void xco_queue_close(xco_queue_t *q) { /* Drain parked senders with delivered=false. Senders only park * under BLOCK, so this is no-op for DROP_* (their waitlist is * always empty). */ - xco_waker_t *w; + xco_waiter_t *w; while ((w = xco_chan_q_pop(&q->send_head, &q->send_tail)) != NULL) { - xco_queue_send_waker_t *qsw = (xco_queue_send_waker_t *)w; + xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w; qsw->delivered = false; - xco_waker_fire(w, 0); + xco_waiter_fire(w, 0); } /* Wake parked receivers so they can observe closed via xco_queue_recv. * Receivers may still drain buffered values first — xco_queue_recv's * XCO_RECV_GOT path is hit before the XCO_RECV_CLOSED branch. */ while ((w = xco_chan_q_pop(&q->recv_head, &q->recv_tail)) != NULL) { - xco_waker_fire(w, 0); + xco_waiter_fire(w, 0); } } /* ---- Queue send op (selectable send) ---------------------------------- */ -void xco__queue_send_op_fire(xco_waker_t *w, uintptr_t value) { +void xco__queue_send_op_fire(xco_waiter_t *w, uintptr_t value) { (void)value; xco_queue_send_op_t *op = (xco_queue_send_op_t *)w; xco_latch_set(&op->done, op->qsw.delivered ? 1 : 0); } void xco_queue_send_op_init(xco_queue_send_op_t *op, xco_queue_t *q, uintptr_t value) { - xco_queue_send_waker_init(&op->qsw, NULL, NULL, value); + xco_queue_send_waiter_init(&op->qsw, NULL, NULL); op->qsw.sw.base.fire = xco__queue_send_op_fire; op->queue = q; xco_latch_init(&op->done); - if (q->closed) { - /* BLOCK: no delivery; DROP_*: dropped per policy. Either way the - * value did not reach a receiver, so delivered=false. */ - xco_latch_set(&op->done, 0); - return; - } - if (xco_queue_try_send(q, value)) { - /* Inline accept: handoff to receiver, buffered, or DROP_* policy - * accepted it. */ + switch (xco_queue_send_poll(q, value, &op->qsw)) { + case XCO_QSEND_ACCEPTED: op->qsw.delivered = true; xco_latch_set(&op->done, 1); return; + case XCO_QSEND_CLOSED: + xco_latch_set(&op->done, 0); + return; + case XCO_QSEND_BLOCKED: + /* Only BLOCK + full buffer; parked. */ + return; } - /* Only BLOCK policy with full buffer reaches here. */ - xco_queue_park_send(q, &op->qsw); -} - -void xco_queue_send_op_deinit(xco_queue_send_op_t *op) { - if (op->done.set) return; - xco_queue_unpark_send(op->queue, &op->qsw); } /* ==================================================================== @@ -732,20 +564,19 @@ void xco_queue_send_op_deinit(xco_queue_send_op_t *op) { * publishes — subscribers re-park to receive subsequent values. * ==================================================================== */ -static bool xco_broadcast_try(xco_event_t *e, uintptr_t *out) { - (void)e; (void)out; - return false; -} - -static void xco_broadcast_park(xco_event_t *e, xco_waker_t *w) { +static bool xco_broadcast_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { + (void)out; + /* Transient: never reports ready; just parks if asked. */ + if (!w) return false; xco_broadcast_t *b = (xco_broadcast_t *)e; assert(!w->prev && !w->next); w->next = b->waiters; if (b->waiters) b->waiters->prev = w; b->waiters = w; + return false; } -static void xco_broadcast_unpark(xco_event_t *e, xco_waker_t *w) { +static void xco_broadcast_unpark(xco_event_t *e, xco_waiter_t *w) { xco_broadcast_t *b = (xco_broadcast_t *)e; if (!w->prev && b->waiters != w) return; if (w->prev) w->prev->next = w->next; @@ -755,8 +586,7 @@ static void xco_broadcast_unpark(xco_event_t *e, xco_waker_t *w) { } const xco_event_vtable_t xco__broadcast_vt = { - .try_ = xco_broadcast_try, - .park = xco_broadcast_park, + .poll = xco_broadcast_poll, .unpark = xco_broadcast_unpark, }; @@ -765,15 +595,15 @@ void xco_broadcast_publish(xco_broadcast_t *b, uintptr_t value) { b->value = value; /* Detach the waitlist before iterating — same hazard-free pattern as - * xco_latch_set. A waker's fire callback may re-park itself on us (the + * xco_latch_set. A waiter's fire callback may re-park itself on us (the * common case for a re-arming subscriber); decoupling means that * re-park lands on a fresh waitlist, not on the snapshot we're * walking. */ - xco_waker_t *w = b->waiters; + xco_waiter_t *w = b->waiters; b->waiters = NULL; while (w) { - xco_waker_t *next = w->next; /* save before xco_waker_fire clears */ - xco_waker_fire(w, value); + xco_waiter_t *next = w->next; /* save before xco_waiter_fire clears */ + xco_waiter_fire(w, value); w = next; } } @@ -784,75 +614,47 @@ void xco_broadcast_publish(xco_broadcast_t *b, uintptr_t value) { * Doubly-linked FIFO waitlist (same shape as the chan/queue waitlists). * xco_notify_one fires the head; xco_notify_all detaches the whole list before * iterating so callbacks can re-park onto a fresh waitlist without - * iterator hazards (same pattern as xco_latch_set). xco_event_try is always - * false: notify is purely transient. + * iterator hazards (same pattern as xco_latch_set). xco_event_poll never + * reports ready: notify is purely transient. * ==================================================================== */ -static bool xco_notify_try(xco_event_t *e, uintptr_t *out) { - (void)e; (void)out; - return false; -} - -static void xco_notify_park(xco_event_t *e, xco_waker_t *w) { +static bool xco_notify_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { + (void)out; + if (!w) return false; xco_notify_t *n = (xco_notify_t *)e; xco_chan_q_push(&n->head, &n->tail, w); + return false; } -static void xco_notify_unpark(xco_event_t *e, xco_waker_t *w) { +static void xco_notify_unpark(xco_event_t *e, xco_waiter_t *w) { xco_notify_t *n = (xco_notify_t *)e; xco_chan_q_remove(&n->head, &n->tail, w); } const xco_event_vtable_t xco__notify_vt = { - .try_ = xco_notify_try, - .park = xco_notify_park, + .poll = xco_notify_poll, .unpark = xco_notify_unpark, }; void xco_notify_one(xco_notify_t *n) { - xco_waker_t *w = xco_chan_q_pop(&n->head, &n->tail); + xco_waiter_t *w = xco_chan_q_pop(&n->head, &n->tail); if (!w) return; - xco_waker_fire(w, 0); + xco_waiter_fire(w, 0); } void xco_notify_all(xco_notify_t *n) { /* Detach before iterating: re-parking inside fire lands on a fresh * (empty) list. Walk the snapshot via saved next pointers. */ - xco_waker_t *w = n->head; + xco_waiter_t *w = n->head; n->head = n->tail = NULL; while (w) { - xco_waker_t *next = w->next; - xco_waker_fire(w, 0); + xco_waiter_t *next = w->next; + xco_waiter_fire(w, 0); w = next; } } /* ==================================================================== - * Timer - * - * The timer-as-event surface is just the embedded latch's vtable: try - * checks done.set, park/unpark mutate done.waiters. The timer source - * triggers the latch from advance(). No bespoke timer vtable needed. - * ==================================================================== */ - -void xco_timer_init(xco_timer_t *t, xco_timers_t *ts, uint64_t deadline) { - xco_latch_init(&t->done); - t->deadline = deadline; - t->src = ts; - t->in_heap = false; - t->child = NULL; - t->prev = NULL; - t->next = NULL; - xco_timers_insert(ts, t); /* sets in_heap = true */ -} - -void xco_timer_deinit(xco_timer_t *t) { - /* Idempotent. After fire, in_heap is false (advance popped us); - * after explicit cancel, also false. Otherwise we're still queued. */ - if (t->in_heap) xco_timers_cancel(t->src, t); -} - -/* ==================================================================== * Pairing heap * * Standard intrusive pairing heap. Each node carries three link fields: @@ -987,11 +789,9 @@ static void xco_ph_advance(xco_timers_t *ts, uint64_t now) { } } -static bool xco_ph_peek(const xco_timers_t *ts, uint64_t *out) { +static uint64_t xco_ph_peek(const xco_timers_t *ts) { const xco_pairing_heap_t *h = (const xco_pairing_heap_t *)ts; - if (!h->root) return false; - if (out) *out = h->root->deadline; - return true; + return h->root ? h->root->deadline : UINT64_MAX; } const xco_timers_vtable_t xco__pairing_heap_vt = { @@ -1005,11 +805,11 @@ const xco_timers_vtable_t xco__pairing_heap_vt = { * Timeout * ==================================================================== */ -/* Bridge waker: parked on the timer's latch, fires the cancel when the +/* Bridge waiter: parked on the timer's latch, fires the cancel when the * timer fires. Two-step indirection so cancel can already have its own * waiters (e.g. a xco_wait_or_cancel select) without the timer's waitlist * caring about cancel internals. */ -static void xco__timeout_bridge_fire(xco_waker_t *w, uintptr_t value) { +static void xco__timeout_bridge_fire(xco_waiter_t *w, uintptr_t value) { (void)value; xco_timeout_t *to = (xco_timeout_t *)((char *)w - offsetof(xco_timeout_t, bridge)); xco_cancel_set(&to->cancel); @@ -1022,15 +822,10 @@ void xco_timeout_init(xco_timeout_t *to, xco_timers_t *ts, uint64_t deadline) { to->bridge.prev = NULL; to->bridge.fire = xco__timeout_bridge_fire; /* Park the bridge on the timer. If the timer fires, xco_latch_set - * detaches the bridge and calls our fire callback inline. */ - xco_event_park(xco_timer_event(&to->timer), &to->bridge); -} - -void xco_timeout_deinit(xco_timeout_t *to) { - /* unpark is idempotent (no-op if the bridge already fired or was - * never parked). xco_timer_deinit is idempotent on the in_heap flag. */ - xco_event_unpark(xco_timer_event(&to->timer), &to->bridge); - xco_timer_deinit(&to->timer); + * detaches the bridge and calls our fire callback inline. The timer + * was just inserted into the heap and hasn't been advanced, so poll + * always parks here (return value ignored). */ + (void)xco_event_poll(xco_timer_event(&to->timer), NULL, &to->bridge); } /* ==================================================================== @@ -1041,20 +836,19 @@ void xco_timeout_deinit(xco_timeout_t *to) { * doesn't matter; doubly-linked gives O(1) unpark for cancellation. * ==================================================================== */ -static bool xco_ticker_try(xco_event_t *e, uintptr_t *out) { - (void)e; (void)out; - return false; /* transient — wait for the next tick */ -} - -static void xco_ticker_park(xco_event_t *e, xco_waker_t *w) { +static bool xco_ticker_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { + (void)out; + /* Transient — never reports ready; just parks if asked. */ + if (!w) return false; xco_ticker_t *t = (xco_ticker_t *)((char *)e - offsetof(xco_ticker_t, base)); assert(!w->prev && !w->next); w->next = t->waiters; if (t->waiters) t->waiters->prev = w; t->waiters = w; + return false; } -static void xco_ticker_unpark(xco_event_t *e, xco_waker_t *w) { +static void xco_ticker_unpark(xco_event_t *e, xco_waiter_t *w) { xco_ticker_t *t = (xco_ticker_t *)((char *)e - offsetof(xco_ticker_t, base)); if (!w->prev && t->waiters != w) return; if (w->prev) w->prev->next = w->next; @@ -1064,17 +858,16 @@ static void xco_ticker_unpark(xco_event_t *e, xco_waker_t *w) { } const xco_event_vtable_t xco__ticker_vt = { - .try_ = xco_ticker_try, - .park = xco_ticker_park, + .poll = xco_ticker_poll, .unpark = xco_ticker_unpark, }; -/* Bridge waker: parks on the underlying timer's latch. On fire, compute +/* Bridge waiter: parks on the underlying timer's latch. On fire, compute * the next deadline (skip-ahead-safe), reinstall the timer, re-park the * bridge on the new timer, then fire every parked subscriber with the * just-fired deadline. The waitlist is detached before iteration so * subscribers can re-park inside their fire callbacks. */ -static void xco__ticker_bridge_fire(xco_waker_t *w, uintptr_t value) { +static void xco__ticker_bridge_fire(xco_waiter_t *w, uintptr_t value) { xco_ticker_t *t = (xco_ticker_t *)((char *)w - offsetof(xco_ticker_t, bridge)); uint64_t fired = (uint64_t)value; uint64_t next = fired + t->period; @@ -1086,17 +879,18 @@ static void xco__ticker_bridge_fire(xco_waker_t *w, uintptr_t value) { /* Reinstall the timer for the next tick. The latch's storage is * reused — xco_timer_init runs xco_latch_init on it. */ xco_timer_init(&t->timer, t->src, next); - /* Bridge waker is fully detached (xco_waker_fire just cleared its - * links); park it on the freshly-armed timer. */ - xco_event_park(xco_timer_event(&t->timer), &t->bridge); + /* Bridge waiter is fully detached (xco_waiter_fire just cleared its + * links); park it on the freshly-armed timer (which has not yet been + * advanced, so poll parks). */ + (void)xco_event_poll(xco_timer_event(&t->timer), NULL, &t->bridge); /* Fire the subscribers. Detach the waitlist first so re-park inside * fire lands on the now-empty list. */ - xco_waker_t *waiters = t->waiters; + xco_waiter_t *waiters = t->waiters; t->waiters = NULL; while (waiters) { - xco_waker_t *nxt = waiters->next; - xco_waker_fire(waiters, (uintptr_t)fired); + xco_waiter_t *nxt = waiters->next; + xco_waiter_fire(waiters, (uintptr_t)fired); waiters = nxt; } } @@ -1117,12 +911,12 @@ void xco_ticker_init(xco_ticker_t *t, xco_timers_t *ts, t->bridge.next = NULL; t->bridge.prev = NULL; t->bridge.fire = xco__ticker_bridge_fire; - xco_event_park(xco_timer_event(&t->timer), &t->bridge); + (void)xco_event_poll(xco_timer_event(&t->timer), NULL, &t->bridge); } void xco_ticker_deinit(xco_ticker_t *t) { /* Pull the bridge off the timer (no-op if already fired) and cancel - * the timer. Subscribers' wakers are the caller's storage; nothing + * the timer. Subscribers' waiters are the caller's storage; nothing * to free here. */ xco_event_unpark(xco_timer_event(&t->timer), &t->bridge); xco_timer_deinit(&t->timer); @@ -1131,7 +925,7 @@ void xco_ticker_deinit(xco_ticker_t *t) { /* ==================================================================== * Task group * - * Each attach contributes one to the countdown and parks a bridge waker + * Each attach contributes one to the countdown and parks a bridge waiter * on the task's done event. Bridge fire splices the slot out of the * group's list and decrements the countdown — the join event fires when * the last attached task finishes. @@ -1151,7 +945,7 @@ static void xco__task_group_detach_slot(xco_task_group_t *g, xco_group_attach_t slot->prev = slot->next = NULL; } -static void xco__task_group_bridge_fire(xco_waker_t *w, uintptr_t value) { +static void xco__task_group_bridge_fire(xco_waiter_t *w, uintptr_t value) { (void)value; xco_group_attach_t *slot = (xco_group_attach_t *)((char *)w - offsetof(xco_group_attach_t, bridge)); xco_task_group_t *g = slot->group; @@ -1192,11 +986,10 @@ void xco_task_group_attach(xco_task_group_t *g, xco_task_t *t, xco_group_attach_ slot->bridge.prev = NULL; slot->bridge.fire = xco__task_group_bridge_fire; - /* Park on the task's done event. If the task has already finished - * (re-attaching is UB per the contract, but if the task fired - * before attach finished initializing — e.g. an inline-spawned - * synchronous task), the xco_latch_park assert would catch it. */ - xco_event_park(xco_task_done_event(t), &slot->bridge); + /* Park on the task's done event. Re-attaching a finished task is UB + * per the contract; the latch's clean-waiter assert inside poll + * would catch a double-park. */ + (void)xco_event_poll(xco_task_done_event(t), NULL, &slot->bridge); } void xco_task_group_cancel(xco_task_group_t *g) { @@ -1215,22 +1008,22 @@ void xco_task_group_cancel(xco_task_group_t *g) { * ==================================================================== */ typedef struct xco_impl { - xco_step_t base; /* must be first; aliases xco_t.base */ + xco_mach_t base; /* must be first; aliases xco_coro_t.base */ _Alignas(XCO__CTX_ALIGN) unsigned char ctx_buf[XCO__CTX_SIZE]; xco_platform_ctx_t *resumer_ctx; /* where suspend/return goes */ xco_fn fn; } xco_impl_t; -_Static_assert(sizeof(xco_impl_t) <= sizeof(xco_t), "xco_t too small"); -_Static_assert(_Alignof(xco_impl_t) <= _Alignof(xco_t), "xco_t under-aligned"); +_Static_assert(sizeof(xco_impl_t) <= sizeof(xco_coro_t), "xco_coro_t too small"); +_Static_assert(_Alignof(xco_impl_t) <= _Alignof(xco_coro_t), "xco_coro_t under-aligned"); -static inline xco_impl_t *xco_impl_of(xco_t *c) { return (xco_impl_t *)c; } +static inline xco_impl_t *xco_impl_of(xco_coro_t *c) { return (xco_impl_t *)c; } static inline xco_platform_ctx_t *xco_ctx_of(xco_impl_t *ci) { return (xco_platform_ctx_t *)ci->ctx_buf; } /* Per-thread state. */ -static _Thread_local xco_impl_t *xco_t_current = NULL; +static _Thread_local xco_impl_t *xco_coro_current = NULL; static _Thread_local _Alignas(XCO__CTX_ALIGN) unsigned char xco_t_main_ctx_buf[XCO__CTX_SIZE]; static inline xco_platform_ctx_t *xco_main_ctx(void) { @@ -1240,10 +1033,10 @@ static inline xco_platform_ctx_t *xco_main_ctx(void) { /* Trampoline: runs on the coroutine's own stack, invoked by the * platform layer on the first switch into a fresh context. The * argument is the value passed to that first xco_step. The coroutine - * identifies itself via xco_t_current, set by the resumer just before + * identifies itself via xco_coro_current, set by the resumer just before * switching. */ static void xco_trampoline(uintptr_t arg) { - xco_impl_t *self = xco_t_current; + xco_impl_t *self = xco_coro_current; uintptr_t ret = self->fn(arg); self->base.status = XCO_STEP_DEAD; @@ -1254,25 +1047,25 @@ static void xco_trampoline(uintptr_t arg) { /* xco_step_fn entry point wired into base.step at init time. All callers * — generic xco_step consumers and xco-aware code alike — route through * here. */ -static xco_step_result_t xco_co_step(xco_step_t *s, uintptr_t value) { +static xco_step_result_t xco_co_step(xco_mach_t *s, uintptr_t value) { xco_impl_t *next = (xco_impl_t *)s; assert(next->base.status == XCO_STEP_INIT || next->base.status == XCO_STEP_SUSPENDED); - xco_impl_t *prev = xco_t_current; + xco_impl_t *prev = xco_coro_current; next->resumer_ctx = prev ? xco_ctx_of(prev) : xco_main_ctx(); next->base.status = XCO_STEP_RUNNING; - xco_t_current = next; + xco_coro_current = next; uintptr_t back = xco_platform_switch(next->resumer_ctx, xco_ctx_of(next), value); /* Coroutine has either suspended or returned; status is already * set correctly by xco_suspend or by the xco_trampoline. */ - xco_t_current = prev; + xco_coro_current = prev; return (xco_step_result_t){ .value = back, .status = next->base.status }; } -void xco_init(xco_t *c, xco_fn fn, +void xco_init(xco_coro_t *c, xco_fn fn, void *stack_base, size_t stack_len) { xco_impl_t *ci = xco_impl_of(c); ci->base.step = xco_co_step; @@ -1283,14 +1076,14 @@ void xco_init(xco_t *c, xco_fn fn, } uintptr_t xco_suspend(uintptr_t value) { - xco_impl_t *self = xco_t_current; + xco_impl_t *self = xco_coro_current; assert(self != NULL); self->base.status = XCO_STEP_SUSPENDED; return xco_platform_switch(xco_ctx_of(self), self->resumer_ctx, value); } -xco_t *xco_self(void) { - return (xco_t *)xco_t_current; +xco_coro_t *xco_self(void) { + return (xco_coro_t *)xco_coro_current; } /* ---- xco-backed task -------------------------------------------------- */ @@ -1302,7 +1095,7 @@ xco_t *xco_self(void) { * wakes with the body's return without the body needing to know about * the task surface at all. */ static uintptr_t xco_cotask_trampoline(uintptr_t arg) { - xco_t *self = xco_self(); + xco_coro_t *self = xco_self(); xco_cotask_t *xt = (xco_cotask_t *)((char *)self - offsetof(xco_cotask_t, co)); uintptr_t r = xt->fn(&xt->task, arg); xco_task_done(&xt->task, r); diff --git a/xco.h b/xco.h @@ -1,50 +1,59 @@ /* - * xco.h — minimal asymmetric coroutines + event substrate. C11. + * xco.h — minimal C11 concurrency library. * * Four layers in this header, bottom-up: * - * xco_step_t Generic resumable function. A value that can be + * xco_mach_t Generic resumable function. A value that can be * driven forward one step at a time; each step takes a * uintptr_t in, returns one out, and reports whether * the function suspended or finished. Substrate shared * by stack-switching coroutines (xco) and hand-coded * state machines. * - * waker / event / runtime - * Pollable event substrate (try / park / unpark) with a + * waiter / event / runtime + * Pollable event substrate (poll / unpark) with a * single-threaded FIFO ready queue. Concrete events: - * latch, countdown, notify, semaphore/mutex, select & + * latch, countdown, notify, semaphore/mutex, select, * allof, channel, queue, broadcast, cancel, timer, - * pairing-heap timer source, timeout, ticker, task, - * task_group. All storage caller-provided, no allocation, - * no atomics. + * pairing-heap timer source, timeout, ticker. All + * storage caller-provided, no allocation, no atomics. * - * xco_t Stack-switching coroutine. xco_t embeds xco_step_t as + * xco_task_t Lifecycle handle for a running xco_step. Bundles a + * done latch (fires with the step's return value) and + * a cancel latch (the cooperative wind-down signal). + * xco_task_group_t fans these in/out across a dynamic + * set of tasks. Storage caller-provided; the step + * itself lives wherever the caller put it. + * + * xco_coro_t Stack-switching coroutine. xco_coro_t embeds xco_mach_t as * its first member, so a coroutine is one concrete * kind of resumable function: generic code holding an - * xco_step_t * works on coroutines and hand-coded state + * xco_mach_t* works on coroutines and hand-coded state * machines uniformly. Values pass between caller and * coroutine through a single uintptr_t channel; pack * richer data behind a pointer. * Asymmetric: xco_suspend always returns to the most * recent resumer. Resumes nest like function calls. - * Thread-affine: a coroutine must be resumed on the - * thread that initialized it. + * Thread portability: the platform switch saves only + * callee-saved regs (no TLS register, no signal mask), + * so a fully-suspended coroutine could in principle be + * resumed on another thread. In practice don't: the + * runtime/event/waiter substrate is single-threaded + * (no atomics), so a coroutine parked on any event is + * tethered to that runtime's thread; and user code + * silently rebinds errno / _Thread_local / thread-affine + * OS handles to whichever thread resumed it. * - * xco_cotask_t xco specialization of xco_task_t. The xco_trampoline calls + * xco_cotask_t xco specialization of xco_task_t. The xco_trampoline calls * fn(&xt->task, arg) and then xco_task_done with the * return value, so xco_wait_or_cancel-style teardown works * without the user wiring anything. * - * Single-threaded only — matches xco's thread affinity. The contract - * "try first, park only if try failed" is race-free because nothing - * else runs between the two calls. - * - * One-waker invariant. An xco_step, while suspended, is parked on at most + * One-waiter invariant. An xco_step, while suspended, is parked on at most * one event. Multi-wait is composed in the event graph: build a * select_event (or any future combinator — all-of, timeout, ...) and * park on that. The xco_step never sees more than one event directly. - * This is what lets a single xco_step_waker_t live inline in the xco_step + * This is what lets a single xco_waker_t live inline in the xco_step * and a single next/prev pair serve both event waitlists and the * runtime ready queue (the two list memberships are disjoint in time). */ @@ -52,6 +61,7 @@ #ifndef XCO_H #define XCO_H +#include <assert.h> #include <stdbool.h> #include <stddef.h> #include <stdint.h> @@ -64,17 +74,17 @@ /* ==================================================================== * xco_step — generic resumable function interface. * - * The first-member convention: embed xco_step_t as the first field of + * The first-member convention: embed xco_mach_t as the first field of * your concrete type so a pointer to your type can be passed wherever - * an xco_step_t * is expected. + * an xco_mach_t * is expected. * * typedef struct { - * xco_step_t base; + * xco_mach_t base; * int phase; * ... * } parser_t; * - * static xco_step_result_t parser_step(xco_step_t *s, uintptr_t v) { + * static xco_step_result_t parser_step(xco_mach_t *s, uintptr_t v) { * parser_t *p = (parser_t *)s; * switch (p->phase) { * case 0: p->phase = 1; return (xco_step_result_t){v + 1, XCO_STEP_SUSPENDED}; @@ -92,81 +102,92 @@ typedef enum { XCO_STEP_RUNNING, /* inside step(), or in an active resume chain */ XCO_STEP_SUSPENDED, /* yielded; resumable */ XCO_STEP_DEAD, /* function returned */ -} xco_step_status_t; +} xco_mach_status_t; typedef struct { uintptr_t value; - xco_step_status_t status; + xco_mach_status_t status; } xco_step_result_t; -typedef struct xco_step xco_step_t; -typedef xco_step_result_t (*xco_step_fn)(xco_step_t *s, uintptr_t value); +typedef struct xco_mach xco_mach_t; +typedef xco_step_result_t (*xco_step_fn)(xco_mach_t *s, uintptr_t value); -struct xco_step { +struct xco_mach { xco_step_fn step; - xco_step_status_t status; /* cached; xco_step() syncs from each result */ + xco_mach_status_t status; /* cached; xco_step() syncs from each result */ }; /* Drive one step. The wrapper updates s->status from the returned * result so step implementations only need to populate the result. */ -static inline xco_step_result_t xco_step(xco_step_t *s, uintptr_t value) { +static inline xco_step_result_t xco_step(xco_mach_t *s, uintptr_t value) { xco_step_result_t r = s->step(s, value); s->status = r.status; return r; } -static inline xco_step_status_t xco_step_status(const xco_step_t *s) { +static inline xco_mach_status_t xco_mach_status(const xco_mach_t *s) { return s->status; } /* ==================================================================== * Event substrate. * + * Fused try-or-park: xco_event_poll(e, &out, w) returns true if the + * event is ready (writes the value to *out, w is NOT parked) and false + * otherwise (parks w on the waitlist iff w is non-NULL). The two + * degenerate forms: + * + * xco_event_poll(e, &v, NULL) — pure try (peek; never parks). + * xco_event_poll(e, NULL, w) — park-if-not-ready (out discarded). + * + * Fusing closes the try/park TOCTOU window an MT impl would otherwise + * have to internally re-check, and lets each event arm a waiter + * atomically with its readiness check. + * * Standard usage from a state machine: * * uintptr_t v; - * if (xco_event_try(e, &v)) { ... use v ... } - * else { xco_event_park(e, &my_waker.base); return SUSPENDED; } + * if (xco_event_poll(e, &v, &my_waiter.base)) { ... use v ... } + * else { return SUSPENDED; } * * Standard usage from an xco coroutine wrapper: * * uintptr_t v; - * if (!xco_event_try(e, &v)) { - * xco_step_waker_t sw; - * xco_step_waker_init(&sw, rt, &xco_self()->base); - * xco_event_park(e, &sw.base); + * xco_waker_t sw; + * xco_waker_init(&sw, rt, &xco_self()->base); + * if (!xco_event_poll(e, &v, &sw.base)) { * xco_suspend(0); - * (void)xco_event_try(e, &v); // now ready + * (void)xco_event_poll(e, &v, NULL); // sticky: now ready * } * ==================================================================== */ -/* ---- Waker ------------------------------------------------------------ */ +/* ---- Waiter ------------------------------------------------------------ */ -typedef struct xco_waker xco_waker_t; -struct xco_waker { +typedef struct xco_waiter xco_waiter_t; +struct xco_waiter { /* Doubly-linked while parked on an event waitlist, so unpark is * O(1). Reused as the singly-linked next pointer while on the * runtime ready queue (FIFO, no removal from middle); prev is * undefined in that state and reset on the next park. */ - xco_waker_t *next; - xco_waker_t *prev; + xco_waiter_t *next; + xco_waiter_t *prev; /* Fire callback. value is the event's payload at fire time — sticky * events also store it on themselves, transient events (channels, - * one-shot signals) deliver only here. Wakers that don't care + * one-shot signals) deliver only here. Waiters that don't care * about the value just ignore the parameter. * - * Invoke via xco_waker_fire (below), not directly: the helper enforces - * the "fire receives a fully detached waker" contract that makes it + * Invoke via xco_waiter_fire (below), not directly: the helper enforces + * the "fire receives a fully detached waiter" contract that makes it * safe to re-park inside the callback. */ - void (*fire)(xco_waker_t *w, uintptr_t value); + void (*fire)(xco_waiter_t *w, uintptr_t value); }; -/* Canonical way to invoke a waker's fire callback. Hands the callback a - * fully detached waker so the callback (or whatever the resumed step +/* Canonical way to invoke a waiter's fire callback. Hands the callback a + * fully detached waiter so the callback (or whatever the resumed step * does) can re-park on a fresh waitlist without colliding with stale * link state. Detachers that lead into fire (queue pops, latch drains, * etc.) don't need to clear prev/next themselves. */ -static inline void xco_waker_fire(xco_waker_t *w, uintptr_t value) { +static inline void xco_waiter_fire(xco_waiter_t *w, uintptr_t value) { w->prev = NULL; w->next = NULL; w->fire(w, value); @@ -177,24 +198,22 @@ static inline void xco_waker_fire(xco_waker_t *w, uintptr_t value) { typedef struct xco_event xco_event_t; typedef struct { - /* Inline-succeed if possible. *out receives the event's value when - * it has one (latch payload, select winner index, channel data). - * out may be NULL if the caller doesn't need the value. */ - bool (*try_)(xco_event_t *e, uintptr_t *out); - /* Arm w on the event's waitlist. Caller's contract: try_ returned - * false. Single-threaded runtime guarantees no transition between. */ - void (*park)(xco_event_t *e, xco_waker_t *w); + /* Fused try + park. If the event is ready, write its value to *out + * (when out != NULL), do NOT park w, and return true. Otherwise, if + * w != NULL park it on the waitlist and return false; if w == NULL + * just return false without parking. *out is left untouched on the + * not-ready path. */ + bool (*poll)(xco_event_t *e, uintptr_t *out, xco_waiter_t *w); /* Remove w from the waitlist. Idempotent: no-op if not parked. */ - void (*unpark)(xco_event_t *e, xco_waker_t *w); + void (*unpark)(xco_event_t *e, xco_waiter_t *w); } xco_event_vtable_t; struct xco_event { const xco_event_vtable_t *vt; }; -static inline bool xco_event_try(xco_event_t *e, uintptr_t *out) { - return e->vt->try_(e, out); +static inline bool xco_event_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { + return e->vt->poll(e, out, w); } -static inline void xco_event_park(xco_event_t *e, xco_waker_t *w) { e->vt->park(e, w); } -static inline void xco_event_unpark(xco_event_t *e, xco_waker_t *w) { e->vt->unpark(e, w); } +static inline void xco_event_unpark(xco_event_t *e, xco_waiter_t *w) { e->vt->unpark(e, w); } /* ---- Runtime ---------------------------------------------------------- */ @@ -203,7 +222,7 @@ static inline void xco_event_unpark(xco_event_t *e, xco_waker_t *w) { e->vt->unp typedef struct xco_timers xco_timers_t; typedef struct xco_runtime { - xco_waker_t *head, *tail; + xco_waiter_t *head, *tail; xco_timers_t *timers; /* optional; advanced inside xco_rt_run */ } xco_runtime_t; @@ -214,21 +233,21 @@ static inline void xco_rt_init(xco_runtime_t *rt) { /* Attach (or detach with NULL) a timer source. While attached, xco_rt_run * advances it each pass with the now value the caller supplied; firing - * timers may enqueue more wakers, which the same xco_rt_run call then drains. */ + * timers may enqueue more waiters, which the same xco_rt_run call then drains. */ static inline void xco_rt_attach_timers(xco_runtime_t *rt, xco_timers_t *ts) { rt->timers = ts; } -/* Append w to the ready queue. Used by xco__step_waker_fire and by anyone - * else that wants a waker resumed by the scheduler. */ -static inline void xco_rt_enqueue(xco_runtime_t *rt, xco_waker_t *w) { +/* Append w to the ready queue. Used by xco__waker_fire and by anyone + * else that wants a waiter resumed by the scheduler. */ +static inline void xco_rt_enqueue(xco_runtime_t *rt, xco_waiter_t *w) { w->next = NULL; if (rt->tail) rt->tail->next = w; else rt->head = w; rt->tail = w; } -/* Drain the ready queue, resuming each step-waker's xco_step, until empty. +/* Drain the ready queue, resuming each waker's xco_step, until empty. * Steps may re-arm on events (and thus leave the queue) or enqueue * other steps; xco_rt_run keeps going until quiescent. now is forwarded to * any attached timer source's advance(); pass 0 (or anything) when no @@ -238,29 +257,29 @@ void xco_rt_run(xco_runtime_t *rt, uint64_t now); /* The canonical bridge between events and the scheduler. When fired, * stashes the value and enqueues itself onto rt; xco_rt_run pops it and - * calls xco_step(step, value), so the resumed step receives the event's + * calls xco_step(mach, value), so the resumed step receives the event's * payload directly without a re-try. * - * Init once, re-park freely. The runtime hands the waker back fully + * Init once, re-park freely. The runtime hands the waiter back fully * detached (next/prev cleared) before invoking the resumed step, so a - * subscriber that wants to wait on the next event can call xco_event_park + * subscriber that wants to wait on the next event can call xco_event_poll * directly — no re-init needed unless rt or step changes. */ typedef struct { - xco_waker_t base; + xco_waiter_t base; xco_runtime_t *rt; - xco_step_t *step; + xco_mach_t *mach; uintptr_t resume_value; /* set by fire, consumed by xco_rt_run */ -} xco_step_waker_t; +} xco_waker_t; -/* Defined in xco.c; declared here so xco_step_waker_init can install it. */ -void xco__step_waker_fire(xco_waker_t *w, uintptr_t value); +/* Defined in xco.c; declared here so xco_waker_init can install it. */ +void xco__waker_fire(xco_waiter_t *w, uintptr_t value); -static inline void xco_step_waker_init(xco_step_waker_t *sw, xco_runtime_t *rt, xco_step_t *s) { +static inline void xco_waker_init(xco_waker_t *sw, xco_runtime_t *rt, xco_mach_t *m) { sw->base.next = NULL; sw->base.prev = NULL; - sw->base.fire = xco__step_waker_fire; + sw->base.fire = xco__waker_fire; sw->rt = rt; - sw->step = s; + sw->mach = m; sw->resume_value = 0; } @@ -273,7 +292,7 @@ typedef struct { xco_event_t base; bool set; uintptr_t value; - xco_waker_t *waiters; + xco_waiter_t *waiters; } xco_latch_t; /* Defined in xco.c; referenced by xco_latch_init. */ @@ -307,12 +326,12 @@ static inline void xco_countdown_init(xco_countdown_t *c, size_t n) { } static inline void xco_countdown_add(xco_countdown_t *c, size_t n) { - /* UB after fire — caller's contract. */ + assert(!c->done.set); c->remaining += n; } static inline void xco_countdown_done(xco_countdown_t *c) { - /* UB at 0 — caller's contract. */ + assert(c->remaining > 0); if (--c->remaining == 0) xco_latch_set(&c->done, 0); } @@ -322,15 +341,15 @@ static inline bool xco_countdown_fired(const xco_countdown_t *c) { return c- /* ---- Notify (wake-one / wake-all) ------------------------------------- */ /* Transient signal with no sticky state. xco_notify_one fires (and detaches) - * the head of a FIFO waitlist; xco_notify_all fires every parked waker. Both + * the head of a FIFO waitlist; xco_notify_all fires every parked waiter. Both * are no-ops when the waitlist is empty. Subscribers must re-park to see * subsequent notifications. * - * xco_event_try always returns false: there is no "ready now" state — a + * xco_event_poll never reports ready: there is no "ready now" state — a * subscriber waits for the *next* notify. */ typedef struct xco_notify { xco_event_t base; - xco_waker_t *head, *tail; + xco_waiter_t *head, *tail; } xco_notify_t; extern const xco_event_vtable_t xco__notify_vt; @@ -348,9 +367,9 @@ void xco_notify_all(xco_notify_t *n); /* ---- Semaphore -------------------------------------------------------- */ /* Counting semaphore. acquire is exposed as xco_event_t (composable with - * select / xco_wait_or_cancel): xco_event_try succeeds and decrements when - * permits > 0; otherwise the waker parks FIFO. xco_semaphore_release(n) - * hands one permit to each of up to n waiting wakers (each is fired, + * select / xco_wait_or_cancel): xco_event_poll succeeds and decrements when + * permits > 0; otherwise the waiter parks FIFO. xco_semaphore_release(n) + * hands one permit to each of up to n waiting waiters (each is fired, * which the receiver treats as "you got a permit") before adding any * leftover to the count. * @@ -358,14 +377,14 @@ void xco_notify_all(xco_notify_t *n); * shape; if you need it, call sequentially. For binary use (mutex-style * critical section across awaits) init with permits = 1. * - * Fairness: FIFO at the waitlist. A racing inline xco_event_try by a fresh + * Fairness: FIFO at the waitlist. A racing inline xco_event_poll by a fresh * caller can jump ahead of parked waiters when permits are released * back to count rather than directly handed off — release prefers * parked waiters first to avoid that. */ typedef struct xco_semaphore { xco_event_t acquire; size_t permits; - xco_waker_t *head, *tail; + xco_waiter_t *head, *tail; } xco_semaphore_t; extern const xco_event_vtable_t xco__semaphore_acquire_vt; @@ -403,8 +422,9 @@ static inline void xco_mutex_release(xco_mutex_t *m) { xco_semaphore_release * closed the wait — the winner for select, the last-to-fire for allof — * and inputs[i].value carries each fired input's payload (works * uniformly for sticky and transient sources, where re-trying the input - * would either succeed or fail). Composes: a select_event is itself an - * event. */ + * would either succeed or fail). + * + * Composes: a select_event is itself an event. */ typedef struct xco_select_event xco_select_event_t; @@ -412,7 +432,7 @@ typedef struct xco_select_event xco_select_event_t; * the select_event. After fire, .value holds whatever the input * delivered; other fields are internal. */ typedef struct { - xco_waker_t w; + xco_waiter_t w; xco_event_t *src; xco_select_event_t *parent; uintptr_t value; /* captured at fire time */ @@ -429,7 +449,7 @@ struct xco_select_event { /* Initialize as a select (any-of). inputs[] is caller-provided storage * for n nodes; srcs[] is the array of n input event pointers (read * once during init). If any input is already ready, the select fires - * immediately and no wakers are parked. Use &s->done.base as the + * immediately and no waiters are parked. Use &s->done.base as the * resulting xco_event_t. */ void xco_select_event_init(xco_select_event_t *s, xco_select_input_t *inputs, size_t n, @@ -447,154 +467,52 @@ void xco_allof_event_init(xco_select_event_t *s, * it has not yet fired. */ void xco_select_event_deinit(xco_select_event_t *s); -/* ---- Channel ---------------------------------------------------------- */ -/* Unbuffered rendezvous channel carrying uintptr_t. Senders and receivers - * wait on each other; whichever arrives first parks until its peer - * shows up. The pending value lives in the sender's xco_chan_send_waker_t - * for the duration of any wait — no per-channel buffer. +/* ---- Queue ------------------------------------------------------------ */ + +/* Bounded FIFO of uintptr_t. Caller provides the ring buffer storage. + * Recv side is exposed as xco_event_t (composable with select). Send side + * is a typed API (carries a value), shaped after the event-poll fusion: + * fused try + park, NULL-waiter degenerates to pure-try. * - * Recv side is exposed as xco_event_t (composable with select). Send side is - * a typed API because send carries a value that doesn't fit xco_event_t's - * try(out) signature. Selecting on send is therefore not supported in - * this layer; if needed, wrap a send in a per-call op event. + * Three full-buffer policies, fixed at init: + * XCO_QUEUE_BLOCK senders park until a receiver makes room. + * XCO_QUEUE_DROP_NEWEST xco_queue_send_poll silently discards the new value. + * XCO_QUEUE_DROP_OLDEST xco_queue_send_poll evicts the head and pushes new tail. + * + * Senders never park under DROP_* policies — passing a non-NULL qsw is + * only meaningful under XCO_QUEUE_BLOCK. xco_queue_send_unpark is + * idempotent (cancellation-safe). * - * Rendezvous matrix: + * Direct-handoff: xco_queue_send_poll first checks for a parked receiver and + * delivers inline if present (payload bypasses the buffer), regardless + * of policy. + * + * Rendezvous matrix (BLOCK + cap=0; the xco_chan_* aliases at the bottom + * of this section name this configuration explicitly): * send + parked recv fire recv with value, sender continues inline. - * send + no recv sender parks (xco_chan_park_send); peer pulls later. + * send + no recv sender parks (xco_queue_send_poll); peer pulls later. * recv + parked sender read sender's value, fire sender (delivery * confirmation), receiver continues inline. - * recv + no sender receiver parks (xco_event_park on recv); peer + * recv + no sender receiver parks (xco_event_poll on recv); peer * delivers later. * * FIFO order on both waitlists. * - * Close: optional EOF semantics. After xco_chan_close, try_send fails (no - * delivery), parked senders are drained with delivered=false, and parked - * receivers are woken so they can observe XCO_RECV_CLOSED via xco_chan_recv. The - * recv event is "ready" iff a value is available OR the channel is - * closed — receivers must call xco_chan_recv to disambiguate value vs EOF. - * xco_chan_park_send after close is UB. */ - -/* Result of a typed receive on a channel or queue. */ + * Close: optional EOF semantics. After xco_queue_close, xco_queue_send_poll + * returns XCO_QSEND_CLOSED regardless of policy (no delivery), parked + * senders are drained with delivered=false, and parked receivers are + * woken so they can observe XCO_RECV_CLOSED via xco_queue_recv. The recv + * event is "ready" iff a value is available OR the queue is closed — + * receivers must call xco_queue_recv to disambiguate value vs EOF. */ + +/* Result of a typed receive on a queue (or chan, which is just a queue). */ typedef enum { XCO_RECV_GOT, /* *out holds the delivered value */ XCO_RECV_EMPTY, /* nothing available right now; caller may park */ XCO_RECV_CLOSED, /* peer closed and no values remain */ } xco_recv_status_t; -typedef struct xco_chan { - xco_event_t recv; /* the recv-side event */ - xco_waker_t *send_head, *send_tail; /* parked xco_chan_send_waker_t bases */ - xco_waker_t *recv_head, *recv_tail; /* parked recv-side wakers */ - bool closed; -} xco_chan_t; - -extern const xco_event_vtable_t xco__chan_recv_vt; - -static inline void xco_chan_init(xco_chan_t *c) { - c->recv.vt = &xco__chan_recv_vt; - c->send_head = c->send_tail = NULL; - c->recv_head = c->recv_tail = NULL; - c->closed = false; -} - -/* Sender-side waker. Embeds a step_waker (the actual scheduler bridge) - * and the value to deliver. The receiver reads .value, then fires the - * step_waker — the sender resumes via the runtime, no payload passed. - * - * `delivered` is set by the closing side before fire: true on a normal - * recv handoff, false when xco_chan_close drains the parked-sender list. - * Senders read it after resume to know whether their value reached a - * receiver. */ -typedef struct { - xco_step_waker_t sw; - uintptr_t value; - bool delivered; -} xco_chan_send_waker_t; - -static inline void xco_chan_send_waker_init(xco_chan_send_waker_t *csw, - xco_runtime_t *rt, xco_step_t *s, - uintptr_t value) { - xco_step_waker_init(&csw->sw, rt, s); - csw->value = value; - csw->delivered = false; -} - -/* Try to deliver value inline. Returns true iff a receiver was parked - * AND the channel is open; its waker fires with the value, the receiver - * becomes ready, and the sender continues without parking. Returns false - * after close (no delivery). */ -bool xco_chan_try_send(xco_chan_t *c, uintptr_t value); - -/* Park a sender. csw->value must already be set (use xco_chan_send_waker_init). - * The sender's xco_step is resumed when a receiver consumes the value, or - * when xco_chan_close drains the list (sender resumes with csw->delivered - * false). Calling xco_chan_park_send on a closed channel is UB. */ -void xco_chan_park_send(xco_chan_t *c, xco_chan_send_waker_t *csw); - -/* Remove a parked sender (cancellation). No-op if not parked. */ -void xco_chan_unpark_send(xco_chan_t *c, xco_chan_send_waker_t *csw); - -/* Typed receive. Disambiguates value vs EOF where xco_event_try cannot. */ -xco_recv_status_t xco_chan_recv(xco_chan_t *c, uintptr_t *out); - -/* Close the channel. Idempotent. Drains parked senders (each resumes - * with csw->delivered = false) and wakes parked receivers (which then - * observe XCO_RECV_CLOSED via xco_chan_recv). Subsequent xco_chan_try_send returns - * false; xco_chan_park_send is UB. */ -void xco_chan_close(xco_chan_t *c); -static inline bool xco_chan_is_closed(const xco_chan_t *c) { return c->closed; } - -/* Selectable send op. A per-call object that holds the value, parks - * on the channel, and exposes &op->done.base as the event that fires - * when delivery resolves. Compose with select like any other event. - * - * The op embeds a xco_chan_send_waker_t (so the chan's send list stays - * uniform — receivers read .value at the same offset for both direct - * and op senders) but rewires its fire callback: instead of resuming - * an xco_step, fire sets op->done. Polymorphism via the function pointer. - * - * The done latch's payload is 1 on a successful delivery and 0 on - * close-drain (sender's value did not reach a receiver). The init path - * resolves to 0 inline if the channel is already closed. - * - * Lifecycle: init → wait on &op->done.base → deinit. Always deinit; - * it's a no-op after resolution and unparks the chan-side waker if not. */ -typedef struct { - xco_chan_send_waker_t csw; /* parked on chan; fire overridden */ - xco_chan_t *chan; - xco_latch_t done; -} xco_chan_send_op_t; - -/* Implementation detail: exposed so xco_chan_send_op_init can install it. */ -void xco__chan_send_op_fire(xco_waker_t *w, uintptr_t value); - -void xco_chan_send_op_init(xco_chan_send_op_t *op, xco_chan_t *c, uintptr_t value); -void xco_chan_send_op_deinit(xco_chan_send_op_t *op); - -/* ---- Queue ------------------------------------------------------------ */ - -/* Bounded FIFO of uintptr_t. Caller provides the ring buffer storage. - * Recv side is exposed as xco_event_t (composable with select). Send side is - * a typed API (carries a value), shaped after chan's send. - * - * Three full-buffer policies, fixed at init: - * XCO_QUEUE_BLOCK senders park until a receiver makes room. - * XCO_QUEUE_DROP_NEWEST xco_queue_try_send silently discards the new value. - * XCO_QUEUE_DROP_OLDEST xco_queue_try_send evicts the head and pushes new tail. - * - * Senders never park under DROP_* policies — xco_queue_park_send is only - * meaningful under XCO_QUEUE_BLOCK, and only after xco_queue_try_send returned - * false. xco_queue_unpark_send is idempotent (cancellation-safe). - * - * Direct-handoff: xco_queue_try_send first checks for a parked receiver and - * delivers inline if present (payload bypasses the buffer), regardless - * of policy. Symmetric to chan. - * - * cap == 0 with XCO_QUEUE_BLOCK degenerates to a rendezvous channel; xco_chan_t - * remains the more direct expression of that case. */ - typedef enum { XCO_QUEUE_BLOCK, XCO_QUEUE_DROP_NEWEST, @@ -606,8 +524,8 @@ typedef struct xco_queue { uintptr_t *buf; size_t cap, head, len; xco_queue_policy_t policy; - xco_waker_t *send_head, *send_tail; - xco_waker_t *recv_head, *recv_tail; + xco_waiter_t *send_head, *send_tail; + xco_waiter_t *recv_head, *recv_tail; bool closed; } xco_queue_t; @@ -628,66 +546,155 @@ static inline void xco_queue_init(xco_queue_t *q, uintptr_t *buf, size_t cap, static inline xco_event_t *xco_queue_recv_event(xco_queue_t *q) { return &q->recv; } -/* Try to enqueue. Direct-delivers to a parked receiver if one is waiting. - * Returns: - * XCO_QUEUE_BLOCK true if delivered or buffered; false if full. - * XCO_QUEUE_DROP_NEWEST always true (silently drops if full). - * XCO_QUEUE_DROP_OLDEST always true (evicts head if full). */ -bool xco_queue_try_send(xco_queue_t *q, uintptr_t value); - -/* Sender-side waker for XCO_QUEUE_BLOCK. Same shape as xco_chan_send_waker_t: - * a step_waker plus the value to deliver. Receivers read .value at the - * same offset so the queue's send list stays uniform. `delivered` is - * set by the closing side: true on a normal handoff, false on a close - * drain. */ +/* Sender-side waiter for XCO_QUEUE_BLOCK. Same shape as xco_chan_send_waiter_t: + * a waker plus a value slot the receiver / close-drain reads back on the + * park path. `delivered` is set by the closing side: true on a normal + * handoff, false on a close drain. */ typedef struct { - xco_step_waker_t sw; - uintptr_t value; + xco_waker_t sw; + uintptr_t value; /* set by xco_queue_send_poll on the park path */ bool delivered; -} xco_queue_send_waker_t; +} xco_queue_send_waiter_t; -static inline void xco_queue_send_waker_init(xco_queue_send_waker_t *qsw, - xco_runtime_t *rt, xco_step_t *s, - uintptr_t value) { - xco_step_waker_init(&qsw->sw, rt, s); - qsw->value = value; +static inline void xco_queue_send_waiter_init(xco_queue_send_waiter_t *qsw, + xco_runtime_t *rt, xco_mach_t *m) { + xco_waker_init(&qsw->sw, rt, m); + qsw->value = 0; qsw->delivered = false; } -void xco_queue_park_send (xco_queue_t *q, xco_queue_send_waker_t *qsw); -void xco_queue_unpark_send(xco_queue_t *q, xco_queue_send_waker_t *qsw); - -/* Typed receive. Mirrors xco_chan_recv: returns XCO_RECV_GOT (value popped from - * the buffer or directly from a parked sender), XCO_RECV_CLOSED (closed and - * drained), or XCO_RECV_EMPTY (caller may park). */ +/* Result of xco_queue_send_poll. */ +typedef enum { + XCO_QSEND_ACCEPTED, /* delivered to a parked receiver, buffered, or + accepted-by-policy (silently dropped under + DROP_NEWEST, evicted-and-pushed under DROP_OLDEST) */ + XCO_QSEND_BLOCKED, /* BLOCK + full; parked iff qsw != NULL */ + XCO_QSEND_CLOSED, /* queue closed; never parks. Returned regardless + of policy — closed is closed. */ +} xco_queue_send_status_t; + +/* Fused try + park for a sender. Direct-delivers to a parked receiver if + * one is waiting (returns XCO_QSEND_ACCEPTED, qsw not parked). Otherwise: + * + * XCO_QUEUE_BLOCK + room buffered → XCO_QSEND_ACCEPTED. + * XCO_QUEUE_BLOCK + full XCO_QSEND_BLOCKED; if qsw != NULL the + * sender's value is stashed in qsw->value + * and qsw is parked. + * XCO_QUEUE_DROP_NEWEST + full silently drops → XCO_QSEND_ACCEPTED. + * XCO_QUEUE_DROP_OLDEST + full evicts head, pushes new tail → ACCEPTED. + * closed (any policy) XCO_QSEND_CLOSED; never parks. + * + * The two degenerate forms mirror xco_event_poll: + * xco_queue_send_poll(q, v, NULL) — pure try (peek; never parks). + * xco_queue_send_poll(q, v, qsw) — fused try-or-park (BLOCK only). */ +xco_queue_send_status_t xco_queue_send_poll(xco_queue_t *q, uintptr_t value, + xco_queue_send_waiter_t *qsw); + +void xco_queue_send_unpark(xco_queue_t *q, xco_queue_send_waiter_t *qsw); + +/* Typed receive. Disambiguates value vs EOF where xco_event_poll cannot: + * returns XCO_RECV_GOT (value popped from the buffer or directly from a + * parked sender), XCO_RECV_CLOSED (closed and drained), or + * XCO_RECV_EMPTY (caller may park). */ xco_recv_status_t xco_queue_recv(xco_queue_t *q, uintptr_t *out); /* Close the queue. Idempotent. Drains parked senders (delivered=false) - * and wakes parked receivers. After close, xco_queue_try_send under - * XCO_QUEUE_BLOCK returns false; under QUEUE_DROP_* the value is silently - * dropped (returns true). xco_queue_park_send after close is UB. */ + * and wakes parked receivers. After close, xco_queue_send_poll returns + * XCO_QSEND_CLOSED regardless of policy. */ void xco_queue_close(xco_queue_t *q); static inline bool xco_queue_is_closed(const xco_queue_t *q) { return q->closed; } -/* Selectable send op. Mirrors xco_chan_send_op_t: a per-call object that - * holds the value, parks on the queue (only meaningful under XCO_QUEUE_BLOCK), - * and exposes &op->done.base as the event that fires when the send - * resolves. The done latch's payload is 1 on a successful delivery - * (handed to a receiver, buffered, or accepted under DROP_*) and 0 on - * close-drain. +/* Selectable send op. A per-call object that holds the value, parks on + * the queue (only meaningful under XCO_QUEUE_BLOCK), and exposes + * &op->done.base as the event that fires when the send resolves. * - * Under DROP_* policies the send always resolves inline at init: the - * try_send path returns true and op->done fires immediately. */ + * The op embeds a xco_queue_send_waiter_t (so the queue's send list stays + * uniform — receivers read .value at the same offset for both direct + * and op senders) but rewires its fire callback: instead of resuming an + * xco_step, fire sets op->done. Polymorphism via the function pointer. + * + * The done latch's payload is 1 on XCO_QSEND_ACCEPTED (handed to a + * receiver, buffered, or accepted under DROP_*) and 0 on + * XCO_QSEND_CLOSED / close-drain. + * + * Under DROP_* policies the send always resolves inline at init (the + * poll returns ACCEPTED and op->done fires immediately). + * + * Lifecycle: init → wait on &op->done.base → deinit. Always deinit; + * it's a no-op after resolution and unparks the queue-side waiter if not. */ typedef struct { - xco_queue_send_waker_t qsw; /* parked on queue; fire overridden */ + xco_queue_send_waiter_t qsw; /* parked on queue; fire overridden */ xco_queue_t *queue; xco_latch_t done; } xco_queue_send_op_t; -void xco__queue_send_op_fire(xco_waker_t *w, uintptr_t value); +void xco__queue_send_op_fire(xco_waiter_t *w, uintptr_t value); + +void xco_queue_send_op_init(xco_queue_send_op_t *op, xco_queue_t *q, uintptr_t value); +static inline void xco_queue_send_op_deinit(xco_queue_send_op_t *op) { + if (op->done.set) return; + xco_queue_send_unpark(op->queue, &op->qsw); +} + +/* ---- Channel (alias) -------------------------------------------------- */ + +/* Unbuffered rendezvous channel: a queue at cap=0 with XCO_QUEUE_BLOCK + * policy. Senders and receivers wait on each other; whichever arrives + * first parks until its peer shows up. The pending value lives in the + * sender's xco_chan_send_waiter_t for the duration of any wait — no + * per-channel buffer storage. + * + * The xco_chan_* names are thin aliases over the queue API: a chan IS a + * queue. They exist so call sites can name "rendezvous" explicitly + * rather than spelling out cap=0+BLOCK. The queue's recv event, send + * poll, close, recv, and selectable send op all carry over unchanged. + * + * Storage: an xco_chan_t carries the queue's buffer/cap/policy fields + * even though they're inert at cap=0 (~40 bytes of overhead vs a + * dedicated rendezvous struct). Below the noise floor for typical use. */ + +typedef xco_queue_t xco_chan_t; +typedef xco_queue_send_waiter_t xco_chan_send_waiter_t; +typedef xco_queue_send_status_t xco_chan_send_status_t; +typedef xco_queue_send_op_t xco_chan_send_op_t; + +/* Status aliases. Same enumerators, vocabulary at call sites: a chan + * "delivers" rather than "accepts" — but the underlying constants are + * the queue's. */ +#define XCO_SEND_DELIVERED XCO_QSEND_ACCEPTED +#define XCO_SEND_BLOCKED XCO_QSEND_BLOCKED +#define XCO_SEND_CLOSED XCO_QSEND_CLOSED -void xco_queue_send_op_init (xco_queue_send_op_t *op, xco_queue_t *q, uintptr_t value); -void xco_queue_send_op_deinit(xco_queue_send_op_t *op); +static inline void xco_chan_init(xco_chan_t *c) { + xco_queue_init(c, NULL, 0, XCO_QUEUE_BLOCK); +} +static inline xco_event_t *xco_chan_recv_event(xco_chan_t *c) { + return xco_queue_recv_event(c); +} +static inline void xco_chan_send_waiter_init(xco_chan_send_waiter_t *csw, + xco_runtime_t *rt, xco_mach_t *m) { + xco_queue_send_waiter_init(csw, rt, m); +} +static inline xco_chan_send_status_t xco_chan_send_poll(xco_chan_t *c, uintptr_t value, + xco_chan_send_waiter_t *csw) { + return xco_queue_send_poll(c, value, csw); +} +static inline void xco_chan_send_unpark(xco_chan_t *c, xco_chan_send_waiter_t *csw) { + xco_queue_send_unpark(c, csw); +} +static inline xco_recv_status_t xco_chan_recv(xco_chan_t *c, uintptr_t *out) { + return xco_queue_recv(c, out); +} +static inline void xco_chan_close(xco_chan_t *c) { xco_queue_close(c); } +static inline bool xco_chan_is_closed(const xco_chan_t *c) { + return xco_queue_is_closed(c); +} +static inline void xco_chan_send_op_init(xco_chan_send_op_t *op, xco_chan_t *c, uintptr_t value) { + xco_queue_send_op_init(op, c, value); +} +static inline void xco_chan_send_op_deinit(xco_chan_send_op_t *op) { + xco_queue_send_op_deinit(op); +} /* ---- Broadcast (slot) ------------------------------------------------- */ @@ -698,7 +705,7 @@ void xco_queue_send_op_deinit(xco_queue_send_op_t *op); * publish time miss that publish but will see the next one. This is the * coalescing "watch a slot" semantics, not lossless fan-out. * - * xco_event_try always returns false: there is no "ready now" state — a + * xco_event_poll never reports ready: there is no "ready now" state — a * subscriber waits for the *next* publish. To read the latest published * value at any time, use xco_broadcast_value (valid once xco_broadcast_has_value * returns true). @@ -710,7 +717,7 @@ typedef struct xco_broadcast { xco_event_t base; bool has_value; uintptr_t value; - xco_waker_t *waiters; + xco_waiter_t *waiters; } xco_broadcast_t; extern const xco_event_vtable_t xco__broadcast_vt; @@ -740,7 +747,7 @@ void xco_broadcast_publish(xco_broadcast_t *b, uintptr_t value); * Discipline: cancellation notifies; it never drops in-flight values. * A cancelled await returns control to its caller, which is responsible * for draining whatever it owns — deinit a pending chan_send_op so its - * value goes back to the sender, deinit a select_event so input wakers + * value goes back to the sender, deinit a select_event so input waiters * detach, drive a cancellable coroutine to XCO_STEP_DEAD before freeing * its stack. The xco layer does no unwinding; the coroutine cooperates. */ @@ -804,17 +811,25 @@ typedef struct { /* Fire every timer whose deadline <= now, in deadline order, popping * each from the source. Each fire drains the timer's waiter list. */ void (*advance)(xco_timers_t *ts, uint64_t now); - /* If any timer is queued, write its deadline to *out and return - * true. *out untouched on false. */ - bool (*peek) (const xco_timers_t *ts, uint64_t *out); + /* Return the earliest queued deadline, or UINT64_MAX if no timer + * is queued. */ + uint64_t (*peek)(const xco_timers_t *ts); } xco_timers_vtable_t; -struct xco_timers { const xco_timers_vtable_t *vt; }; +struct xco_timers { + const xco_timers_vtable_t *vt; + uint64_t now; /* most recent advance() input; monotonic */ +}; -static inline void xco_timers_insert (xco_timers_t *ts, xco_timer_t *t) { ts->vt->insert (ts, t); } -static inline void xco_timers_cancel (xco_timers_t *ts, xco_timer_t *t) { ts->vt->cancel (ts, t); } -static inline void xco_timers_advance(xco_timers_t *ts, uint64_t now) { ts->vt->advance(ts, now); } -static inline bool xco_timers_peek (const xco_timers_t *ts, uint64_t *o) { return ts->vt->peek(ts, o); } +static inline void xco_timers_insert (xco_timers_t *ts, xco_timer_t *t) { ts->vt->insert (ts, t); } +static inline void xco_timers_cancel (xco_timers_t *ts, xco_timer_t *t) { ts->vt->cancel (ts, t); } +static inline void xco_timers_advance(xco_timers_t *ts, uint64_t now) { + assert(now >= ts->now); + ts->now = now; + ts->vt->advance(ts, now); +} +static inline uint64_t xco_timers_peek (const xco_timers_t *ts) { return ts->vt->peek(ts); } +static inline uint64_t xco_timers_now (const xco_timers_t *ts) { return ts->now; } /* Concrete timer. Embeds a latch so try/park/unpark and the fire-all * waitlist handling come for free; the timer source manipulates only @@ -833,8 +848,20 @@ struct xco_timer { static inline xco_event_t *xco_timer_event(xco_timer_t *t) { return &t->done.base; } static inline bool xco_timer_fired(const xco_timer_t *t) { return t->done.set; } -void xco_timer_init (xco_timer_t *t, xco_timers_t *ts, uint64_t deadline); -void xco_timer_deinit(xco_timer_t *t); +static inline void xco_timer_init(xco_timer_t *t, xco_timers_t *ts, uint64_t deadline) { + xco_latch_init(&t->done); + t->deadline = deadline; + t->src = ts; + t->in_heap = false; + t->child = NULL; + t->prev = NULL; + t->next = NULL; + xco_timers_insert(ts, t); +} + +static inline void xco_timer_deinit(xco_timer_t *t) { + if (t->in_heap) xco_timers_cancel(t->src, t); +} /* In-tree timer source: intrusive pairing heap. O(1) amortized insert * and meld; O(log n) amortized advance and cancel. No per-source @@ -848,8 +875,9 @@ typedef struct { extern const xco_timers_vtable_t xco__pairing_heap_vt; static inline void xco_pairing_heap_init(xco_pairing_heap_t *h) { - h->base.vt = &xco__pairing_heap_vt; - h->root = NULL; + h->base.vt = &xco__pairing_heap_vt; + h->base.now = 0; + h->root = NULL; } /* ---- Timeout ---------------------------------------------------------- */ @@ -865,16 +893,19 @@ static inline void xco_pairing_heap_init(xco_pairing_heap_t *h) { * xco_select_event_deinit(&sel); * xco_timeout_deinit(&to); // safe whether the timer fired or not * - * The bridge waker is parked on the timer; when it fires it sets the + * The bridge waiter is parked on the timer; when it fires it sets the * cancel. Bridge fire is idempotent vs xco_cancel_set (a sticky latch). */ typedef struct xco_timeout { xco_timer_t timer; xco_cancel_t cancel; - xco_waker_t bridge; + xco_waiter_t bridge; } xco_timeout_t; -void xco_timeout_init (xco_timeout_t *to, xco_timers_t *ts, uint64_t deadline); -void xco_timeout_deinit(xco_timeout_t *to); +void xco_timeout_init(xco_timeout_t *to, xco_timers_t *ts, uint64_t deadline); +static inline void xco_timeout_deinit(xco_timeout_t *to) { + xco_event_unpark(xco_timer_event(&to->timer), &to->bridge); + xco_timer_deinit(&to->timer); +} /* ---- Ticker ----------------------------------------------------------- */ @@ -889,14 +920,14 @@ void xco_timeout_deinit(xco_timeout_t *to); * ... wait on xco_ticker_event(&t), re-park to see further ticks ... * xco_ticker_deinit(&t); // cancels the in-flight timer * - * xco_event_try always returns false; subscribers wait for the *next* tick. */ + * xco_event_poll never reports ready; subscribers wait for the *next* tick. */ typedef struct xco_ticker { xco_timer_t timer; xco_timers_t *src; uint64_t period; xco_event_t base; - xco_waker_t *waiters; - xco_waker_t bridge; /* internal: parks on xco_timer_event */ + xco_waiter_t *waiters; + xco_waiter_t bridge; /* internal: parks on xco_timer_event */ } xco_ticker_t; extern const xco_event_vtable_t xco__ticker_vt; @@ -929,13 +960,13 @@ static inline xco_event_t *xco_ticker_event(xco_ticker_t *t) { return &t->base; * latch's payload is the xco_step's return value. */ typedef struct xco_task { - xco_step_t *step; + xco_mach_t *mach; xco_latch_t done; xco_cancel_t cancel; } xco_task_t; -static inline void xco_task_init(xco_task_t *t, xco_step_t *step) { - t->step = step; +static inline void xco_task_init(xco_task_t *t, xco_mach_t *mach) { + t->mach = mach; xco_latch_init(&t->done); xco_cancel_init(&t->cancel); } @@ -949,7 +980,7 @@ static inline xco_event_t *xco_task_done_event (xco_task_t *t) { return static inline xco_cancel_t *xco_task_cancel (xco_task_t *t) { return &t->cancel; } static inline bool xco_task_finished (const xco_task_t *t) { return t->done.set; } static inline bool xco_task_is_cancelled(const xco_task_t *t) { return xco_cancel_is_set(&t->cancel); } -static inline xco_step_t *xco_task_step (xco_task_t *t) { return t->step; } +static inline xco_mach_t *xco_task_mach (xco_task_t *t) { return t->mach; } /* ---- Task group ------------------------------------------------------- */ @@ -958,7 +989,7 @@ static inline xco_step_t *xco_task_step (xco_task_t *t) { return t * the group itself does no allocation. * * xco_task_group_attach(g, t, slot): - * xco_countdown_add(g->pending, 1); slot's bridge waker parks on + * xco_countdown_add(g->pending, 1); slot's bridge waiter parks on * xco_task_done_event(t); slot is appended to g's list. When the task's * done fires, the bridge fires: it splices the slot out of g's list * and calls xco_countdown_done(&g->pending). Re-attaching a finished @@ -982,7 +1013,7 @@ typedef struct xco_task_group { } xco_task_group_t; struct xco_group_attach { - xco_waker_t bridge; /* parked on xco_task_done_event(task) */ + xco_waiter_t bridge; /* parked on xco_task_done_event(task) */ xco_task_t *task; xco_task_group_t *group; xco_group_attach_t *next, *prev; @@ -1013,20 +1044,20 @@ static inline xco_cancel_t *xco_task_group_cancel_handle(xco_task_group_t *g) { typedef uintptr_t (*xco_fn)(uintptr_t arg); /* Coroutine control block. Allocate anywhere — on a stack, in a - * struct, on the heap. xco_step_t base is first so xco_t * can be passed - * directly to xco_step() or any generic xco_step_t * consumer. The trailing + * struct, on the heap. xco_mach_t base is first so xco_coro_t * can be passed + * directly to xco_step() or any generic xco_mach_t * consumer. The trailing * priv storage holds the saved register context and bookkeeping; * its contents are private to the implementation. */ -typedef struct xco { - xco_step_t base; +typedef struct xco_coro { + xco_mach_t base; _Alignas(XCO_ALIGN) unsigned char priv[XCO_SIZE]; -} xco_t; +} xco_coro_t; /* Initialize *c to run fn on [stack_base, stack_base + stack_len). * stack_base must be XCO_STACK_ALIGN-aligned; the runtime picks the * starting SP based on the architecture's stack growth direction. * Status after init is XCO_STEP_INIT. */ -void xco_init(xco_t *c, xco_fn fn, +void xco_init(xco_coro_t *c, xco_fn fn, void *stack_base, size_t stack_len); /* Suspend the currently running coroutine, returning value to its @@ -1036,17 +1067,17 @@ uintptr_t xco_suspend(uintptr_t value); /* The currently running coroutine, or NULL if the caller is not in * one. The runtime maintains this for xco_suspend. */ -xco_t *xco_self(void); +xco_coro_t *xco_self(void); /* Resuming a coroutine is just driving its xco_step: callers use * xco_step(&c->base, v) directly. Reading status without resuming is - * xco_step_status(&c->base). The xco layer adds no separate vocabulary + * xco_mach_status(&c->base). The xco layer adds no separate vocabulary * for these — that's the unification with hand-coded state machines. * Resuming a coroutine that is not XCO_STEP_INIT or XCO_STEP_SUSPENDED is * undefined. */ /* Convenience: init then first-step in one call. */ -static inline xco_step_result_t xco_spawn(xco_t *c, xco_fn fn, +static inline xco_step_result_t xco_spawn(xco_coro_t *c, xco_fn fn, void *stack_base, size_t stack_len, uintptr_t arg) { xco_init(c, fn, stack_base, stack_len); @@ -1058,24 +1089,27 @@ static inline xco_step_result_t xco_spawn(xco_t *c, xco_fn fn, * wants to give other ready work a turn between long-running steps. * Must be called from inside a coroutine driven by rt. */ static inline void xco_yield(xco_runtime_t *rt) { - xco_step_waker_t sw; - xco_step_waker_init(&sw, rt, &xco_self()->base); + xco_coro_t *self = xco_self(); + assert(self != NULL); + xco_waker_t sw; + xco_waker_init(&sw, rt, &self->base); xco_rt_enqueue(rt, &sw.base); xco_suspend(0); } -/* Await an event from inside a coroutine. The standard try-park-suspend +/* Await an event from inside a coroutine. The standard poll-suspend * dance, in one call. Returns the event's value (delivered by fire on - * the slow path, by xco_event_try on the fast path). Must be called from - * inside a coroutine driven by rt. */ + * the slow path, by the inline poll on the fast path). Must be called + * from inside a coroutine driven by rt. */ static inline uintptr_t xco_await(xco_runtime_t *rt, xco_event_t *e) { + xco_coro_t *self = xco_self(); + assert(self != NULL); uintptr_t v; - if (xco_event_try(e, &v)) return v; - xco_step_waker_t sw; - xco_step_waker_init(&sw, rt, &xco_self()->base); - xco_event_park(e, &sw.base); + xco_waker_t sw; + xco_waker_init(&sw, rt, &self->base); + if (xco_event_poll(e, &v, &sw.base)) return v; xco_suspend(0); - (void)xco_event_try(e, &v); + (void)xco_event_poll(e, &v, NULL); return v; } @@ -1118,7 +1152,7 @@ typedef uintptr_t (*xco_cotask_fn)(xco_task_t *t, uintptr_t arg); typedef struct xco_cotask { xco_task_t task; - xco_t co; + xco_coro_t co; xco_cotask_fn fn; } xco_cotask_t;