xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit c8fcdddc76d080bee2555c69513e555134ae6768
parent a5b1bc4c164eec894ea9b6ca5f9de807da6b7988
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 01:47:59 -0700

Add channel: rendezvous primitive on the event substrate

Unbuffered uintptr_t channel. Sender and receiver wait on each other;
whichever arrives first parks until its peer shows up. The pending
value lives in the parked sender's chan_send_waker_t — no per-channel
buffer.

Two-tier API on the send side:
  chan_try_send / chan_park_send / chan_unpark_send
      Lightweight direct send. Sender parks until a receiver consumes
      the value, then resumes via the runtime.

  chan_send_op_t
      Per-call op object exposing a done latch as event_t. Embeds
      chan_send_waker_t (so the chan's send list stays uniform — recv
      reads .value at one offset for both flavors) and overrides the
      embedded waker's fire to set the op's done latch instead of
      resuming an xstep. This is what makes select-over-send work:
      compose &op->done.base in a select like any other event.

Recv side is exposed as event_t (chan_t.recv) and composes with select
directly. To make transient sources usable through select — where the
fire-time value would otherwise be dropped by select_input_fire —
select_input_t now captures the value at fire time, and the fast path
in select_event_init stashes it from the inline try. Consumers read
inputs[winner].value uniformly across sticky and transient sources.

Both waitlists are doubly-linked FIFOs; in single-threaded operation
only one side is ever non-empty (try-then-park is atomic), so the
queues never need cross-side matching beyond pop-and-fire. Tests
cover both arrival orders, sender and receiver FIFO accumulation,
mid-list unpark, select-on-recv (wake and fast paths), and the four
send-op cases (inline, parked, select-wins, select-loses).

Diffstat:
Mevent.c | 135+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mevent.h | 97+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Mtests/test_event.c | 360+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 585 insertions(+), 7 deletions(-)

diff --git a/event.c b/event.c @@ -108,9 +108,6 @@ void latch_set(latch_t *l, uintptr_t value) { /* ---- Select ----------------------------------------------------------- */ static void select_input_fire(waker_t *w, uintptr_t value) { - /* The input's own value is irrelevant here — select delivers the - * winning index. Consumers re-try the input event for its payload. */ - (void)value; select_input_t *in = (select_input_t *)w; select_event_t *s = in->parent; size_t i = (size_t)(in - s->inputs); @@ -118,6 +115,10 @@ static void select_input_fire(waker_t *w, uintptr_t value) { /* Sticky: only the first input to fire records its index. Later * stragglers (if any escape unparking) hit the idempotent guard. */ if (s->done.set) return; + /* Capture the input's payload before resuming anyone. Sticky + * sources also keep it on themselves; transient sources (channels) + * deliver only here, so this is the only durable record. */ + in->value = value; latch_set(&s->done, i); /* Disarm everyone else so their wakers don't dangle on input @@ -139,6 +140,7 @@ void select_event_init(select_event_t *s, for (size_t i = 0; i < n; i++) { uintptr_t v; if (event_try(srcs[i], &v)) { + inputs[i].value = v; /* captured for inputs[winner].value */ latch_set(&s->done, i); return; } @@ -150,6 +152,7 @@ void select_event_init(select_event_t *s, inputs[i].w.fire = select_input_fire; inputs[i].src = srcs[i]; inputs[i].parent = s; + inputs[i].value = 0; event_park(srcs[i], &inputs[i].w); } } @@ -163,3 +166,129 @@ void select_event_deinit(select_event_t *s) { event_unpark(s->inputs[i].src, &s->inputs[i].w); } } + +/* ---- Channel ---------------------------------------------------------- */ + +/* Doubly-linked FIFO push/pop on a channel waitlist. Same shape as + * latch's list operations but with an explicit tail so arrival order + * is preserved (unlike latch, where waiter order is irrelevant). */ + +static void chan_q_push(waker_t **head, waker_t **tail, waker_t *w) { + assert(!w->prev && !w->next); + w->prev = *tail; + w->next = NULL; + if (*tail) (*tail)->next = w; + else *head = w; + *tail = w; +} + +static waker_t *chan_q_pop(waker_t **head, waker_t **tail) { + waker_t *w = *head; + if (!w) return NULL; + *head = w->next; + if (*head) (*head)->prev = NULL; + else *tail = NULL; + w->prev = w->next = NULL; + return w; +} + +static void chan_q_remove(waker_t **head, waker_t **tail, waker_t *w) { + /* Same not-on-list test as latch_unpark: a queued waker has prev + * set OR is the head; a detached one has prev == NULL and isn't + * the head. */ + if (!w->prev && *head != w) return; + if (w->prev) w->prev->next = w->next; + else *head = w->next; + if (w->next) w->next->prev = w->prev; + else *tail = w->prev; + w->prev = w->next = NULL; +} + +/* Recover the chan_t from its embedded recv event. */ +static inline chan_t *chan_of_recv(event_t *e) { + return (chan_t *)((char *)e - offsetof(chan_t, recv)); +} + +static bool chan_recv_try(event_t *e, uintptr_t *out) { + chan_t *c = chan_of_recv(e); + waker_t *w = chan_q_pop(&c->send_head, &c->send_tail); + if (!w) return false; + /* w is &csw->sw.base; sw is the first field of chan_send_waker_t, + * and base is the first field of step_waker_t, so addresses align. */ + chan_send_waker_t *csw = (chan_send_waker_t *)w; + if (out) *out = csw->value; + /* Resume the sender. The fire value (delivery confirmation) is + * unused — receivers learn the value, senders just learn "done". */ + w->fire(w, 0); + return true; +} + +static void chan_recv_park(event_t *e, waker_t *w) { + chan_t *c = chan_of_recv(e); + chan_q_push(&c->recv_head, &c->recv_tail, w); +} + +static void chan_recv_unpark(event_t *e, waker_t *w) { + chan_t *c = chan_of_recv(e); + chan_q_remove(&c->recv_head, &c->recv_tail, w); +} + +const event_vtable_t _chan_recv_vt = { + .try_ = chan_recv_try, + .park = chan_recv_park, + .unpark = chan_recv_unpark, +}; + +bool chan_try_send(chan_t *c, uintptr_t value) { + waker_t *w = chan_q_pop(&c->recv_head, &c->recv_tail); + if (!w) return false; + /* Hand the value to the recv-side waker. step_waker stashes it as + * resume_value; select_input_fire stashes it in input.value. */ + w->fire(w, value); + return true; +} + +void chan_park_send(chan_t *c, chan_send_waker_t *csw) { + chan_q_push(&c->send_head, &c->send_tail, &csw->sw.base); +} + +void chan_unpark_send(chan_t *c, chan_send_waker_t *csw) { + chan_q_remove(&c->send_head, &c->send_tail, &csw->sw.base); +} + +/* ---- Send op (selectable send) ---------------------------------------- */ + +void _chan_send_op_fire(waker_t *w, uintptr_t value) { + /* Receiver hands no payload to the sender on delivery — the sender + * just learns "delivered." The value is what flowed the other way. */ + (void)value; + /* csw is the first field of chan_send_op_t; sw is first of + * chan_send_waker_t; base is first of step_waker_t. All offsets + * coincide, so w aliases op. */ + chan_send_op_t *op = (chan_send_op_t *)w; + latch_set(&op->done, 0); +} + +void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value) { + /* The embedded chan_send_waker carries the value and provides the + * waker layout the chan's send list expects. rt/step are unused — + * fire goes straight to the latch, no scheduler hop. */ + chan_send_waker_init(&op->csw, NULL, NULL, value); + op->csw.sw.base.fire = _chan_send_op_fire; + op->chan = c; + latch_init(&op->done); + + if (chan_try_send(c, value)) { + /* Inline delivery: no parking, done set immediately. */ + latch_set(&op->done, 0); + return; + } + chan_park_send(c, &op->csw); +} + +void chan_send_op_deinit(chan_send_op_t *op) { + /* If delivered, the waker is already off the send list. If not + * (e.g., select cancellation), pull it off so it doesn't dangle. */ + if (op->done.set) return; + chan_unpark_send(op->chan, &op->csw); +} diff --git a/event.h b/event.h @@ -176,18 +176,21 @@ void latch_set(latch_t *l, uintptr_t value); /* ---- Select ----------------------------------------------------------- */ /* Fires when any of N input events fires. The winning index is the - * payload of the underlying latch; consumers re-try that input to pull - * its actual value. Composes: a select_event is itself an event. */ + * payload of the underlying latch; the winning input's payload is + * captured in inputs[winner].value (works uniformly for sticky and + * transient sources, where re-trying the input would either succeed + * or fail). Composes: a select_event is itself an event. */ typedef struct select_event select_event_t; /* Per-input arming record. Caller-allocated as an array of n alongside - * the select_event. Internal layout — fields are touched only by - * select_event_init / select_input_fire. */ + * the select_event. After fire, .value holds whatever the input + * delivered; other fields are internal. */ typedef struct { waker_t w; event_t *src; select_event_t *parent; + uintptr_t value; /* captured at fire time */ } select_input_t; struct select_event { @@ -208,4 +211,90 @@ void select_event_init(select_event_t *s, * Required before s leaves scope if it has not yet fired. */ void select_event_deinit(select_event_t *s); +/* ---- Channel ---------------------------------------------------------- */ + +/* Unbuffered rendezvous channel carrying uintptr_t. Senders and receivers + * wait on each other; whichever arrives first parks until its peer + * shows up. The pending value lives in the sender's chan_send_waker_t + * for the duration of any wait — no per-channel buffer. + * + * Recv side is exposed as event_t (composable with select). Send side is + * a typed API because send carries a value that doesn't fit event_t's + * try(out) signature. Selecting on send is therefore not supported in + * this layer; if needed, wrap a send in a per-call op event. + * + * Rendezvous matrix: + * send + parked recv fire recv with value, sender continues inline. + * send + no recv sender parks (chan_park_send); peer pulls later. + * recv + parked sender read sender's value, fire sender (delivery + * confirmation), receiver continues inline. + * recv + no sender receiver parks (event_park on recv); peer + * delivers later. + * + * FIFO order on both waitlists. */ + +typedef struct chan { + event_t recv; /* the recv-side event */ + waker_t *send_head, *send_tail; /* parked chan_send_waker_t bases */ + waker_t *recv_head, *recv_tail; /* parked recv-side wakers */ +} chan_t; + +extern const event_vtable_t _chan_recv_vt; + +static inline void chan_init(chan_t *c) { + c->recv.vt = &_chan_recv_vt; + c->send_head = c->send_tail = NULL; + c->recv_head = c->recv_tail = NULL; +} + +/* Sender-side waker. Embeds a step_waker (the actual scheduler bridge) + * and the value to deliver. The receiver reads .value, then fires the + * step_waker — the sender resumes via the runtime, no payload passed. */ +typedef struct { + step_waker_t sw; + uintptr_t value; +} chan_send_waker_t; + +static inline void chan_send_waker_init(chan_send_waker_t *csw, + runtime_t *rt, xstep_t *s, + uintptr_t value) { + step_waker_init(&csw->sw, rt, s); + csw->value = value; +} + +/* Try to deliver value inline. Returns true iff a receiver was parked; + * its waker fires with the value, the receiver becomes ready, and the + * sender continues without parking. */ +bool chan_try_send(chan_t *c, uintptr_t value); + +/* Park a sender. csw->value must already be set (use chan_send_waker_init). + * The sender's xstep is resumed when a receiver consumes the value. */ +void chan_park_send(chan_t *c, chan_send_waker_t *csw); + +/* Remove a parked sender (cancellation). No-op if not parked. */ +void chan_unpark_send(chan_t *c, chan_send_waker_t *csw); + +/* Selectable send op. A per-call object that holds the value, parks + * on the channel, and exposes &op->done.base as the event that fires + * when delivery completes. Compose with select like any other event. + * + * The op embeds a chan_send_waker_t (so the chan's send list stays + * uniform — receivers read .value at the same offset for both direct + * and op senders) but rewires its fire callback: instead of resuming + * an xstep, fire sets op->done. Polymorphism via the function pointer. + * + * Lifecycle: init → wait on &op->done.base → deinit. Always deinit; + * it's a no-op after delivery and unparks the chan-side waker if not. */ +typedef struct { + chan_send_waker_t csw; /* parked on chan; fire overridden */ + chan_t *chan; + latch_t done; +} chan_send_op_t; + +/* Implementation detail: exposed so chan_send_op_init can install it. */ +void _chan_send_op_fire(waker_t *w, uintptr_t value); + +void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value); +void chan_send_op_deinit(chan_send_op_t *op); + #endif /* EVENT_H */ diff --git a/tests/test_event.c b/tests/test_event.c @@ -275,6 +275,353 @@ static void test_select_compose(void) { select_event_deinit(&inner); } +/* ---- Channel ------------------------------------------------------- */ + +/* Sender state machine: try inline; if blocked, park with value. */ +typedef struct { + xstep_t base; + chan_t *c; + runtime_t *rt; + chan_send_waker_t csw; + uintptr_t value; + int phase; + bool done; +} sender_t; + +static xstep_result_t sender_step(xstep_t *s, uintptr_t v) { + sender_t *snd = (sender_t *)s; + (void)v; + switch (snd->phase) { + case 0: + if (chan_try_send(snd->c, snd->value)) { + snd->done = true; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + chan_send_waker_init(&snd->csw, snd->rt, &snd->base, snd->value); + chan_park_send(snd->c, &snd->csw); + snd->phase = 1; + return (xstep_result_t){0, XSTEP_SUSPENDED}; + case 1: + snd->done = true; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + __builtin_unreachable(); +} + +static void sender_init(sender_t *snd, runtime_t *rt, chan_t *c, uintptr_t value) { + snd->base = (xstep_t){.step = sender_step, .status = XSTEP_INIT}; + snd->c = c; + snd->rt = rt; + snd->value = value; + snd->phase = 0; + snd->done = false; +} + +static void test_chan_send_blocks_until_recv(void) { + /* Sender arrives first, parks; receiver pulls and both finish. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + sender_t snd; sender_init(&snd, &rt, &c, 0xDEADBEEF); + xstep(&snd.base, 0); + assert(xstep_status(&snd.base) == XSTEP_SUSPENDED); + assert(c.send_head == &snd.csw.sw.base); + assert(c.send_tail == &snd.csw.sw.base); + + waiter_t r; waiter_init(&r, &rt, &c.recv); + xstep_result_t rr = xstep(&r.base, 0); + assert(rr.status == XSTEP_DEAD); + assert(rr.value == 0xDEADBEEF); + assert(r.got == 0xDEADBEEF); + assert(c.send_head == NULL); + + /* Sender's resumption is queued by the recv-side fire. */ + rt_run(&rt); + assert(xstep_status(&snd.base) == XSTEP_DEAD); + assert(snd.done); +} + +static void test_chan_recv_blocks_until_send(void) { + /* Receiver arrives first, parks; sender delivers inline. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + waiter_t r; waiter_init(&r, &rt, &c.recv); + xstep(&r.base, 0); + assert(xstep_status(&r.base) == XSTEP_SUSPENDED); + assert(c.recv_head == &r.sw.base); + + bool delivered = chan_try_send(&c, 0xCAFE); + assert(delivered); + assert(c.recv_head == NULL); + + rt_run(&rt); + assert(xstep_status(&r.base) == XSTEP_DEAD); + assert(r.got == 0xCAFE); +} + +static void test_chan_try_send_no_recv(void) { + /* No receiver parked: try_send fails without modifying state. */ + chan_t c; chan_init(&c); + assert(!chan_try_send(&c, 1)); + assert(c.send_head == NULL && c.recv_head == NULL); +} + +static void test_chan_fifo(void) { + /* Three senders park; three receives pull values in arrival order. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + sender_t s[3]; + for (int i = 0; i < 3; i++) { + sender_init(&s[i], &rt, &c, (uintptr_t)(100 + i)); + xstep(&s[i].base, 0); + assert(xstep_status(&s[i].base) == XSTEP_SUSPENDED); + } + assert(c.send_head == &s[0].csw.sw.base); + assert(c.send_tail == &s[2].csw.sw.base); + + for (int i = 0; i < 3; i++) { + uintptr_t v; + assert(event_try(&c.recv, &v)); + assert(v == (uintptr_t)(100 + i)); + } + assert(c.send_head == NULL); + + rt_run(&rt); + for (int i = 0; i < 3; i++) { + assert(xstep_status(&s[i].base) == XSTEP_DEAD); + assert(s[i].done); + } +} + +static void test_chan_unpark_send(void) { + /* Cancel a parked sender — list is repaired; other waiters intact. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + sender_t a, b, d; + sender_init(&a, &rt, &c, 1); + sender_init(&b, &rt, &c, 2); + sender_init(&d, &rt, &c, 3); + xstep(&a.base, 0); xstep(&b.base, 0); xstep(&d.base, 0); + assert(c.send_head == &a.csw.sw.base); + assert(c.send_tail == &d.csw.sw.base); + + chan_unpark_send(&c, &b.csw); /* middle */ + /* Idempotent: removing again is a no-op. */ + chan_unpark_send(&c, &b.csw); + + /* Order preserved: a, then d. */ + uintptr_t v; + assert(event_try(&c.recv, &v) && v == 1); + assert(event_try(&c.recv, &v) && v == 3); + assert(c.send_head == NULL); + + /* a and d resume; b stays SUSPENDED (its waker was unparked + * without firing). Drain so it doesn't dangle. */ + rt_run(&rt); + assert(xstep_status(&a.base) == XSTEP_DEAD); + assert(xstep_status(&d.base) == XSTEP_DEAD); + assert(xstep_status(&b.base) == XSTEP_SUSPENDED); +} + +static void test_chan_select_recv(void) { + /* select(chan_recv, latch). Sender delivers; select fires with + * chan_recv's index, and the value is captured in inputs[winner].value. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + latch_t l; latch_init(&l); + + select_event_t sel; + select_input_t inputs[2]; + event_t *srcs[2] = {&c.recv, &l.base}; + select_event_init(&sel, inputs, 2, srcs); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + assert(c.recv_head != NULL); /* select's input waker parked here */ + + bool delivered = chan_try_send(&c, 0xABCDEF); + assert(delivered); + + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 0); /* chan_recv won (index 0) */ + assert(inputs[0].value == 0xABCDEF); /* captured value */ + + /* Loser was disarmed; deinit is safe (no-op since fired). */ + assert(l.waiters == NULL); + select_event_deinit(&sel); +} + +static void test_chan_recv_fifo(void) { + /* Three receivers park; three sends deliver in arrival order. + * The two waitlists are mutually exclusive (try-then-park is + * atomic in single-threaded use), so the channel state at any + * moment has at most one side queued. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + waiter_t r[3]; + for (int i = 0; i < 3; i++) { + waiter_init(&r[i], &rt, &c.recv); + xstep(&r[i].base, 0); + assert(xstep_status(&r[i].base) == XSTEP_SUSPENDED); + } + assert(c.recv_head == &r[0].sw.base); + assert(c.recv_tail == &r[2].sw.base); + assert(c.send_head == NULL); /* never both sides */ + + for (int i = 0; i < 3; i++) { + bool delivered = chan_try_send(&c, (uintptr_t)(200 + i)); + assert(delivered); + } + assert(c.recv_head == NULL); + + rt_run(&rt); + for (int i = 0; i < 3; i++) { + assert(xstep_status(&r[i].base) == XSTEP_DEAD); + assert(r[i].got == (uintptr_t)(200 + i)); + } +} + +/* ---- Send op (selectable send) ------------------------------------- */ + +static void test_chan_send_op_inline(void) { + /* Receiver already parked: op_init delivers immediately, done set. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + waiter_t r; waiter_init(&r, &rt, &c.recv); + xstep(&r.base, 0); + assert(xstep_status(&r.base) == XSTEP_SUSPENDED); + + chan_send_op_t op; + chan_send_op_init(&op, &c, 0xFEED); + assert(op.done.set); /* delivered inline */ + assert(c.send_head == NULL); /* nothing parked */ + + rt_run(&rt); + assert(xstep_status(&r.base) == XSTEP_DEAD); + assert(r.got == 0xFEED); + + chan_send_op_deinit(&op); +} + +static void test_chan_send_op_blocks(void) { + /* No receiver: op parks. Receiver arrives later, op.done fires. */ + chan_t c; chan_init(&c); + + chan_send_op_t op; + chan_send_op_init(&op, &c, 0xBEAD); + assert(!op.done.set); + assert(c.send_head == &op.csw.sw.base); + + /* Recv pulls value and fires op's waker → sets op.done. */ + uintptr_t v; + assert(event_try(&c.recv, &v)); + assert(v == 0xBEAD); + assert(op.done.set); + assert(c.send_head == NULL); + + chan_send_op_deinit(&op); +} + +static void test_chan_select_send(void) { + /* select(send_op, latch). Receiver pulls; send wins. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + latch_t timeout; latch_init(&timeout); + + chan_send_op_t op; + chan_send_op_init(&op, &c, 0x5EED); + assert(!op.done.set); /* parked, no recv */ + + select_event_t sel; + select_input_t inputs[2]; + event_t *srcs[2] = {&op.done.base, &timeout.base}; + select_event_init(&sel, inputs, 2, srcs); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + /* A receiver arrives and pulls. op.done fires → select fires. */ + uintptr_t v; + assert(event_try(&c.recv, &v)); + assert(v == 0x5EED); + + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 0); /* send op won */ + assert(timeout.waiters == NULL); /* loser disarmed */ + + select_event_deinit(&sel); + chan_send_op_deinit(&op); +} + +static void test_chan_select_send_loses(void) { + /* select(send_op, latch). Latch fires first; send is canceled + * cleanly via op_deinit (chan side disarmed, no dangling waker). */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + latch_t l; latch_init(&l); + + chan_send_op_t op; + chan_send_op_init(&op, &c, 0xABBA); + assert(c.send_head == &op.csw.sw.base); + + select_event_t sel; + select_input_t inputs[2]; + event_t *srcs[2] = {&op.done.base, &l.base}; + select_event_init(&sel, inputs, 2, srcs); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + + latch_set(&l, 0xDEAD); + rt_run(&rt); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 1); /* latch (index 1) won */ + assert(inputs[1].value == 0xDEAD); + + /* send didn't happen — op still parked on chan. deinit unparks. */ + assert(c.send_head == &op.csw.sw.base); + select_event_deinit(&sel); + chan_send_op_deinit(&op); + assert(c.send_head == NULL); +} + +static void test_chan_select_recv_fast_path(void) { + /* Sender already parked when select arms: fast path captures value. */ + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + sender_t snd; sender_init(&snd, &rt, &c, 0x12345); + xstep(&snd.base, 0); + assert(xstep_status(&snd.base) == XSTEP_SUSPENDED); + + latch_t l; latch_init(&l); + select_event_t sel; + select_input_t inputs[2]; + event_t *srcs[2] = {&c.recv, &l.base}; + select_event_init(&sel, inputs, 2, srcs); + + /* Fast path fired immediately. */ + uintptr_t winner; + assert(event_try(&sel.done.base, &winner)); + assert(winner == 0); + assert(inputs[0].value == 0x12345); + + rt_run(&rt); + assert(xstep_status(&snd.base) == XSTEP_DEAD); + select_event_deinit(&sel); +} + /* ---- Runtime test -------------------------------------------------- */ static void test_runtime_drains(void) { @@ -310,6 +657,19 @@ int main(void) { test_select_deinit_unparks(); test_select_compose(); + test_chan_send_blocks_until_recv(); + test_chan_recv_blocks_until_send(); + test_chan_try_send_no_recv(); + test_chan_fifo(); + test_chan_recv_fifo(); + test_chan_unpark_send(); + test_chan_select_recv(); + test_chan_select_recv_fast_path(); + test_chan_send_op_inline(); + test_chan_send_op_blocks(); + test_chan_select_send(); + test_chan_select_send_loses(); + test_runtime_drains(); printf("ok\n");