xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit ec177b0d2ad8ed0c7ac11b3ee08836bbd0b41169
parent 3a82709a0f8258ffa4e0a224ce0a45dfc493ce46
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 09:13:16 -0700

Add semaphore, queue, broadcast, task, and ergonomic helpers

Filling out the basic communication primitives over the event substrate:
- semaphore: counting permits with FIFO direct-handoff release
- queue: bounded FIFO with BLOCK / DROP_NEWEST / DROP_OLDEST policies
- broadcast: re-armable slot for coalescing fan-out
- task: lifecycle handle (xstep + done latch + cancel) with an xco
  specialization that auto-fires done from the trampoline

Helpers that compress the standard try-park-suspend dance:
- xco_yield, xco_await, xco_await_or_cancel
- task_is_cancelled

Plumbing: waker_fire centralizes the "fire receives a fully detached
waker" contract, and rt_dequeue clears prev too so step-wakers can
re-park without re-init.

Diffstat:
Mevent.c | 263+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mevent.h | 221++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mtests/test_event.c | 652+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/test_xco.c | 184+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mxco.c | 23+++++++++++++++++++++++
Mxco.h | 91+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 1426 insertions(+), 8 deletions(-)

diff --git a/event.c b/event.c @@ -21,7 +21,13 @@ static waker_t *rt_dequeue(runtime_t *rt) { if (!w) return NULL; rt->head = w->next; if (!rt->head) rt->tail = NULL; + /* Hand the waker back fully detached. Every fire path already clears + * prev before firing (and we just walked off the ready-queue), so + * w->prev is NULL in practice — making it explicit here means + * step_waker users can re-park without re-init, regardless of which + * fire path resumed them. */ w->next = NULL; + w->prev = NULL; return w; } @@ -109,14 +115,90 @@ void latch_set(latch_t *l, uintptr_t value) { waker_t *w = l->waiters; l->waiters = NULL; while (w) { - waker_t *next = w->next; - w->next = NULL; - w->prev = NULL; - w->fire(w, value); + waker_t *next = w->next; /* save before waker_fire clears */ + waker_fire(w, value); w = next; } } +/* ---- Semaphore -------------------------------------------------------- */ + +/* FIFO doubly-linked waitlist, same shape as the chan_q_* helpers below + * but specialized to a semaphore_t (so we don't have to thread the + * head/tail pair through chan_q_*). */ + +static void sem_q_push(semaphore_t *s, waker_t *w) { + assert(!w->prev && !w->next); + w->prev = s->tail; + w->next = NULL; + if (s->tail) s->tail->next = w; + else s->head = w; + s->tail = w; +} + +static waker_t *sem_q_pop(semaphore_t *s) { + waker_t *w = s->head; + if (!w) return NULL; + s->head = w->next; + if (s->head) s->head->prev = NULL; + else s->tail = NULL; + w->prev = w->next = NULL; + return w; +} + +static void sem_q_remove(semaphore_t *s, waker_t *w) { + if (!w->prev && s->head != w) return; + if (w->prev) w->prev->next = w->next; + else s->head = w->next; + if (w->next) w->next->prev = w->prev; + else s->tail = w->prev; + w->prev = w->next = NULL; +} + +static bool semaphore_try(event_t *e, uintptr_t *out) { + semaphore_t *s = (semaphore_t *)e; + if (s->permits == 0) return false; + s->permits--; + if (out) *out = 1; + return true; +} + +static void semaphore_park(event_t *e, waker_t *w) { + semaphore_t *s = (semaphore_t *)e; + /* Single-threaded contract: caller just observed try=false (permits=0). */ + assert(s->permits == 0); + sem_q_push(s, w); +} + +static void semaphore_unpark(event_t *e, waker_t *w) { + semaphore_t *s = (semaphore_t *)e; + sem_q_remove(s, w); +} + +const event_vtable_t _semaphore_acquire_vt = { + .try_ = semaphore_try, + .park = semaphore_park, + .unpark = semaphore_unpark, +}; + +void semaphore_release(semaphore_t *s, size_t n) { + /* Hand a permit directly to each FIFO waiter, then drop any leftover + * into the count. Direct handoff prevents a fresh try from jumping + * the queue: an arriving acquirer that called try_ would see permits=0 + * and park behind the existing waiters until everyone ahead has been + * served. */ + while (n > 0) { + waker_t *w = sem_q_pop(s); + if (!w) break; + n--; + /* Fire value is conventional 1 — "you got a permit". Step-waker + * users ignore the value; select inputs capture it as the input's + * value field. */ + waker_fire(w, 1); + } + s->permits += n; +} + /* ---- Select / all-of -------------------------------------------------- */ /* One fire callback serves both modes. A counter `remaining` is decremented @@ -274,7 +356,7 @@ static bool chan_recv_try(event_t *e, uintptr_t *out) { if (out) *out = csw->value; /* Resume the sender. The fire value (delivery confirmation) is * unused — receivers learn the value, senders just learn "done". */ - w->fire(w, 0); + waker_fire(w, 0); return true; } @@ -299,7 +381,7 @@ bool chan_try_send(chan_t *c, uintptr_t value) { if (!w) return false; /* Hand the value to the recv-side waker. step_waker stashes it as * resume_value; select_input_fire stashes it in input.value. */ - w->fire(w, value); + waker_fire(w, value); return true; } @@ -348,6 +430,175 @@ void chan_send_op_deinit(chan_send_op_t *op) { chan_unpark_send(op->chan, &op->csw); } +/* ---- Queue ------------------------------------------------------------ */ + +/* The FIFO list helpers (chan_q_push/pop/remove) are reused for the + * queue's send and recv waitlists — same shape, same invariants. The + * ring buffer lives in caller-provided storage; we just track head and + * len. cap == 0 leaves the buffer logic dormant: every send either + * direct-hands or parks, every recv either takes from a parked sender + * or parks — i.e. it degenerates to chan rendezvous. */ + +static inline queue_t *queue_of_recv(event_t *e) { + return (queue_t *)((char *)e - offsetof(queue_t, recv)); +} + +static inline void queue_push_buf(queue_t *q, uintptr_t v) { + assert(q->len < q->cap); + q->buf[(q->head + q->len) % q->cap] = v; + q->len++; +} + +static inline uintptr_t queue_pop_buf(queue_t *q) { + assert(q->len > 0); + uintptr_t v = q->buf[q->head]; + q->head = (q->head + 1) % q->cap; + q->len--; + return v; +} + +/* Pop one parked sender's value into the now-free buffer slot, firing + * the sender. Caller must have ensured a free slot exists (just popped + * from the buffer, or cap > len). Maintains FIFO across the buffer + + * sender-waitlist boundary: oldest buffered values come out before any + * sender's value (which was queued later). No-op if no sender parked. */ +static void queue_drain_one_sender(queue_t *q) { + if (!q->send_head) return; + waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); + queue_send_waker_t *qsw = (queue_send_waker_t *)w; + queue_push_buf(q, qsw->value); + /* Fire after pushing so the sender sees its delivery as complete. */ + waker_fire(w, 0); +} + +static bool queue_recv_try(event_t *e, uintptr_t *out) { + queue_t *q = queue_of_recv(e); + if (q->len > 0) { + uintptr_t v = queue_pop_buf(q); + if (out) *out = v; + queue_drain_one_sender(q); + return true; + } + /* Empty buffer. If a sender is parked here it can only mean cap==0 + * (otherwise the sender would have used the buffer). Hand directly. */ + if (q->send_head) { + waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); + queue_send_waker_t *qsw = (queue_send_waker_t *)w; + if (out) *out = qsw->value; + waker_fire(w, 0); + return true; + } + return false; +} + +static void queue_recv_park(event_t *e, waker_t *w) { + queue_t *q = queue_of_recv(e); + chan_q_push(&q->recv_head, &q->recv_tail, w); +} + +static void queue_recv_unpark(event_t *e, waker_t *w) { + queue_t *q = queue_of_recv(e); + chan_q_remove(&q->recv_head, &q->recv_tail, w); +} + +const event_vtable_t _queue_recv_vt = { + .try_ = queue_recv_try, + .park = queue_recv_park, + .unpark = queue_recv_unpark, +}; + +bool queue_try_send(queue_t *q, uintptr_t value) { + /* Direct handoff first: parked receivers always win over the buffer. + * This is the rendezvous case and the cap==0 case. */ + waker_t *w = chan_q_pop(&q->recv_head, &q->recv_tail); + if (w) { + waker_fire(w, value); + return true; + } + if (q->len < q->cap) { + queue_push_buf(q, value); + return true; + } + /* Buffer full and no waiting receiver. */ + switch (q->policy) { + case QUEUE_BLOCK: + return false; + case QUEUE_DROP_NEWEST: + return true; + case QUEUE_DROP_OLDEST: + (void)queue_pop_buf(q); + queue_push_buf(q, value); + return true; + } + __builtin_unreachable(); +} + +void queue_park_send(queue_t *q, queue_send_waker_t *qsw) { + /* DROP_* never parks (try_send always returns true); only valid + * for BLOCK. */ + assert(q->policy == QUEUE_BLOCK); + chan_q_push(&q->send_head, &q->send_tail, &qsw->sw.base); +} + +void queue_unpark_send(queue_t *q, queue_send_waker_t *qsw) { + chan_q_remove(&q->send_head, &q->send_tail, &qsw->sw.base); +} + +/* ---- Broadcast (slot) ------------------------------------------------- */ + +/* The waitlist uses the same doubly-linked LIFO shape as latch — there + * is no FIFO requirement because publish wakes everyone at once. The + * key difference from latch is publish vs set: publish never marks a + * sticky bit on the event, so try always returns false (subscribers + * wait for the *next* publish), and the waitlist is reusable across + * publishes — subscribers re-park to receive subsequent values. */ + +static bool broadcast_try(event_t *e, uintptr_t *out) { + (void)e; (void)out; + return false; +} + +static void broadcast_park(event_t *e, waker_t *w) { + broadcast_t *b = (broadcast_t *)e; + assert(!w->prev && !w->next); + w->next = b->waiters; + if (b->waiters) b->waiters->prev = w; + b->waiters = w; +} + +static void broadcast_unpark(event_t *e, waker_t *w) { + broadcast_t *b = (broadcast_t *)e; + if (!w->prev && b->waiters != w) return; + if (w->prev) w->prev->next = w->next; + else b->waiters = w->next; + if (w->next) w->next->prev = w->prev; + w->prev = w->next = NULL; +} + +const event_vtable_t _broadcast_vt = { + .try_ = broadcast_try, + .park = broadcast_park, + .unpark = broadcast_unpark, +}; + +void broadcast_publish(broadcast_t *b, uintptr_t value) { + b->has_value = true; + b->value = value; + + /* Detach the waitlist before iterating — same hazard-free pattern as + * latch_set. A waker's fire callback may re-park itself on us (the + * common case for a re-arming subscriber); decoupling means that + * re-park lands on a fresh waitlist, not on the snapshot we're + * walking. */ + waker_t *w = b->waiters; + b->waiters = NULL; + while (w) { + waker_t *next = w->next; /* save before waker_fire clears */ + waker_fire(w, value); + w = next; + } +} + /* ---- Timer ------------------------------------------------------------ */ /* The timer-as-event surface is just the embedded latch's vtable: try diff --git a/event.h b/event.h @@ -76,10 +76,25 @@ struct waker { /* Fire callback. value is the event's payload at fire time — sticky * events also store it on themselves, transient events (channels, * one-shot signals) deliver only here. Wakers that don't care - * about the value just ignore the parameter. */ + * about the value just ignore the parameter. + * + * Invoke via waker_fire (below), not directly: the helper enforces + * the "fire receives a fully detached waker" contract that makes it + * safe to re-park inside the callback. */ void (*fire)(waker_t *w, uintptr_t value); }; +/* Canonical way to invoke a waker's fire callback. Hands the callback a + * fully detached waker so the callback (or whatever the resumed step + * does) can re-park on a fresh waitlist without colliding with stale + * link state. Detachers that lead into fire (queue pops, latch drains, + * etc.) don't need to clear prev/next themselves. */ +static inline void waker_fire(waker_t *w, uintptr_t value) { + w->prev = NULL; + w->next = NULL; + w->fire(w, value); +} + /* ---- Event ------------------------------------------------------------ */ typedef struct event event_t; @@ -147,7 +162,12 @@ void rt_run(runtime_t *rt, uint64_t now); /* The canonical bridge between events and the scheduler. When fired, * stashes the value and enqueues itself onto rt; rt_run pops it and * calls xstep(step, value), so the resumed step receives the event's - * payload directly without a re-try. */ + * payload directly without a re-try. + * + * Init once, re-park freely. The runtime hands the waker back fully + * detached (next/prev cleared) before invoking the resumed step, so a + * subscriber that wants to wait on the next event can call event_park + * directly — no re-init needed unless rt or step changes. */ typedef struct { waker_t base; runtime_t *rt; @@ -191,6 +211,41 @@ static inline void latch_init(latch_t *l) { void latch_set(latch_t *l, uintptr_t value); +/* ---- Semaphore -------------------------------------------------------- */ + +/* Counting semaphore. acquire is exposed as event_t (composable with + * select / wait_or_cancel): event_try succeeds and decrements when + * permits > 0; otherwise the waker parks FIFO. semaphore_release(n) + * hands one permit to each of up to n waiting wakers (each is fired, + * which the receiver treats as "you got a permit") before adding any + * leftover to the count. + * + * One permit per acquire. Bulk acquire isn't expressible in event_t's + * shape; if you need it, call sequentially. For binary use (mutex-style + * critical section across awaits) init with permits = 1. + * + * Fairness: FIFO at the waitlist. A racing inline event_try by a fresh + * caller can jump ahead of parked waiters when permits are released + * back to count rather than directly handed off — release prefers + * parked waiters first to avoid that. */ +typedef struct semaphore { + event_t acquire; + size_t permits; + waker_t *head, *tail; +} semaphore_t; + +extern const event_vtable_t _semaphore_acquire_vt; + +static inline void semaphore_init(semaphore_t *s, size_t initial) { + s->acquire.vt = &_semaphore_acquire_vt; + s->permits = initial; + s->head = s->tail = NULL; +} + +static inline event_t *semaphore_event(semaphore_t *s) { return &s->acquire; } + +void semaphore_release(semaphore_t *s, size_t n); + /* ---- Select / all-of -------------------------------------------------- */ /* Wait over N input events. Two semantics share the same storage shape, @@ -333,6 +388,123 @@ void _chan_send_op_fire(waker_t *w, uintptr_t value); void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value); void chan_send_op_deinit(chan_send_op_t *op); +/* ---- Queue ------------------------------------------------------------ */ + +/* Bounded FIFO of uintptr_t. Caller provides the ring buffer storage. + * Recv side is exposed as event_t (composable with select). Send side is + * a typed API (carries a value), shaped after chan's send. + * + * Three full-buffer policies, fixed at init: + * QUEUE_BLOCK senders park until a receiver makes room. + * QUEUE_DROP_NEWEST queue_try_send silently discards the new value. + * QUEUE_DROP_OLDEST queue_try_send evicts the head and pushes new tail. + * + * Senders never park under DROP_* policies — queue_park_send is only + * meaningful under QUEUE_BLOCK, and only after queue_try_send returned + * false. queue_unpark_send is idempotent (cancellation-safe). + * + * Direct-handoff: queue_try_send first checks for a parked receiver and + * delivers inline if present (payload bypasses the buffer), regardless + * of policy. Symmetric to chan. + * + * cap == 0 with QUEUE_BLOCK degenerates to a rendezvous channel; chan_t + * remains the more direct expression of that case. */ + +typedef enum { + QUEUE_BLOCK, + QUEUE_DROP_NEWEST, + QUEUE_DROP_OLDEST, +} queue_policy_t; + +typedef struct queue { + event_t recv; + uintptr_t *buf; + size_t cap, head, len; + queue_policy_t policy; + waker_t *send_head, *send_tail; + waker_t *recv_head, *recv_tail; +} queue_t; + +extern const event_vtable_t _queue_recv_vt; + +static inline void queue_init(queue_t *q, uintptr_t *buf, size_t cap, + queue_policy_t policy) { + q->recv.vt = &_queue_recv_vt; + q->buf = buf; + q->cap = cap; + q->head = 0; + q->len = 0; + q->policy = policy; + q->send_head = q->send_tail = NULL; + q->recv_head = q->recv_tail = NULL; +} + +static inline event_t *queue_recv_event(queue_t *q) { return &q->recv; } + +/* Try to enqueue. Direct-delivers to a parked receiver if one is waiting. + * Returns: + * QUEUE_BLOCK true if delivered or buffered; false if full. + * QUEUE_DROP_NEWEST always true (silently drops if full). + * QUEUE_DROP_OLDEST always true (evicts head if full). */ +bool queue_try_send(queue_t *q, uintptr_t value); + +/* Sender-side waker for QUEUE_BLOCK. Same shape as chan_send_waker_t: + * a step_waker plus the value to deliver. Receivers read .value at the + * same offset so the queue's send list stays uniform. */ +typedef struct { + step_waker_t sw; + uintptr_t value; +} queue_send_waker_t; + +static inline void queue_send_waker_init(queue_send_waker_t *qsw, + runtime_t *rt, xstep_t *s, + uintptr_t value) { + step_waker_init(&qsw->sw, rt, s); + qsw->value = value; +} + +void queue_park_send (queue_t *q, queue_send_waker_t *qsw); +void queue_unpark_send(queue_t *q, queue_send_waker_t *qsw); + +/* ---- Broadcast (slot) ------------------------------------------------- */ + +/* Re-armable signal carrying a "latest value" slot. Subscribers park on + * the event; broadcast_publish stores the new value, fires every parked + * subscriber with that value, and clears the waitlist — subscribers must + * re-park to see further publishes. Subscribers that aren't parked at + * publish time miss that publish but will see the next one. This is the + * coalescing "watch a slot" semantics, not lossless fan-out. + * + * event_try always returns false: there is no "ready now" state — a + * subscriber waits for the *next* publish. To read the latest published + * value at any time, use broadcast_value (valid once broadcast_has_value + * returns true). + * + * For lossless multi-consumer delivery, give each subscriber its own + * queue and have the producer write to all of them. */ + +typedef struct broadcast { + event_t base; + bool has_value; + uintptr_t value; + waker_t *waiters; +} broadcast_t; + +extern const event_vtable_t _broadcast_vt; + +static inline void broadcast_init(broadcast_t *b) { + b->base.vt = &_broadcast_vt; + b->has_value = false; + b->value = 0; + b->waiters = NULL; +} + +static inline event_t *broadcast_event (broadcast_t *b) { return &b->base; } +static inline bool broadcast_has_value(const broadcast_t *b) { return b->has_value; } +static inline uintptr_t broadcast_value (const broadcast_t *b) { return b->value; } + +void broadcast_publish(broadcast_t *b, uintptr_t value); + /* ---- Cancellation ----------------------------------------------------- */ /* A cancellation token is a sticky latch — these aliases exist for @@ -481,4 +653,49 @@ typedef struct timeout { void timeout_init (timeout_t *to, timers_t *ts, uint64_t deadline); void timeout_deinit(timeout_t *to); +/* ---- Task ------------------------------------------------------------- */ + +/* Lifecycle handle for a running xstep. Bundles a done latch (fires when + * the xstep returns, payload = its return value) with a cancel latch + * (the canonical signal to ask the xstep to wind down). The xstep itself + * is caller-allocated; the task holds a pointer to it. + * + * Who fires done: + * - Hand-coded state machine: call task_done(t, ret) in the same arm + * that returns XSTEP_DEAD. + * - xco-backed task (see xco_task_t in xco.h): the trampoline calls + * task_done automatically with the coroutine's return value. + * + * Cooperation: cancellation only notifies — the xstep is responsible for + * draining what it owns and reaching XSTEP_DEAD. The task's cancel is a + * normal cancel_t, so the xstep typically composes wait_or_cancel against + * it on every blocking await. + * + * Joining: callers wait on task_done_event with the standard event API + * (try / park, or compose into select / wait_or_cancel). On fire the + * latch's payload is the xstep's return value. */ + +typedef struct task { + xstep_t *step; + latch_t done; + cancel_t cancel; +} task_t; + +static inline void task_init(task_t *t, xstep_t *step) { + t->step = step; + latch_init(&t->done); + cancel_init(&t->cancel); +} + +/* Mark the task complete with its return value. Idempotent (latch_set is). */ +static inline void task_done(task_t *t, uintptr_t value) { + latch_set(&t->done, value); +} + +static inline event_t *task_done_event (task_t *t) { return &t->done.base; } +static inline cancel_t *task_cancel (task_t *t) { return &t->cancel; } +static inline bool task_finished (const task_t *t) { return t->done.set; } +static inline bool task_is_cancelled(const task_t *t) { return cancel_is_set(&t->cancel); } +static inline xstep_t *task_step (task_t *t) { return t->step; } + #endif /* EVENT_H */ diff --git a/tests/test_event.c b/tests/test_event.c @@ -1272,6 +1272,633 @@ static void test_rt_run_no_timers(void) { assert(w.got == 7); } +/* ---- Semaphore ----------------------------------------------------- */ + +/* Acquirer state machine: one-shot acquire of 1 permit. Mirrors waiter_t + * but specialized for the "permit handed at fire time" semantics — the + * resume is itself the proof of acquisition. */ +typedef struct { + xstep_t base; + semaphore_t *sem; + runtime_t *rt; + step_waker_t sw; + int phase; + bool got; +} sem_acquirer_t; + +static xstep_result_t sem_acquirer_step(xstep_t *s, uintptr_t v) { + sem_acquirer_t *a = (sem_acquirer_t *)s; + (void)v; + switch (a->phase) { + case 0: + if (event_try(semaphore_event(a->sem), NULL)) { + a->got = true; + a->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + step_waker_init(&a->sw, a->rt, &a->base); + event_park(semaphore_event(a->sem), &a->sw.base); + a->phase = 1; + return (xstep_result_t){0, XSTEP_SUSPENDED}; + case 1: + a->got = true; + a->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + __builtin_unreachable(); +} + +static void sem_acquirer_init(sem_acquirer_t *a, runtime_t *rt, semaphore_t *s) { + a->base = (xstep_t){.step = sem_acquirer_step, .status = XSTEP_INIT}; + a->sem = s; + a->rt = rt; + a->phase = 0; + a->got = false; +} + +static void test_semaphore_inline_acquire(void) { + /* permits > 0: try succeeds and decrements. */ + semaphore_t s; semaphore_init(&s, 2); + assert(event_try(semaphore_event(&s), NULL)); + assert(s.permits == 1); + assert(event_try(semaphore_event(&s), NULL)); + assert(s.permits == 0); + assert(!event_try(semaphore_event(&s), NULL)); + assert(s.permits == 0); +} + +static void test_semaphore_park_then_release(void) { + /* Empty sem: acquirer parks; release wakes it with a permit. */ + runtime_t rt; rt_init(&rt); + semaphore_t s; semaphore_init(&s, 0); + + sem_acquirer_t a; sem_acquirer_init(&a, &rt, &s); + xstep(&a.base, 0); + assert(xstep_status(&a.base) == XSTEP_SUSPENDED); + assert(s.head == &a.sw.base); + + semaphore_release(&s, 1); + /* Release prefers parked waiters: permit went straight to a, count stays 0. */ + assert(s.permits == 0); + assert(s.head == NULL); + + rt_run(&rt, 0); + assert(xstep_status(&a.base) == XSTEP_DEAD); + assert(a.got); +} + +static void test_semaphore_fifo(void) { + /* Three parked acquirers; release(2) wakes the first two in order; + * the third stays parked. release(1) again wakes it. */ + runtime_t rt; rt_init(&rt); + semaphore_t s; semaphore_init(&s, 0); + + sem_acquirer_t a[3]; + for (int i = 0; i < 3; i++) { + sem_acquirer_init(&a[i], &rt, &s); + xstep(&a[i].base, 0); + } + assert(s.head == &a[0].sw.base); + assert(s.tail == &a[2].sw.base); + + semaphore_release(&s, 2); + rt_run(&rt, 0); + assert(xstep_status(&a[0].base) == XSTEP_DEAD); + assert(xstep_status(&a[1].base) == XSTEP_DEAD); + assert(xstep_status(&a[2].base) == XSTEP_SUSPENDED); + assert(s.head == &a[2].sw.base); + + semaphore_release(&s, 1); + rt_run(&rt, 0); + assert(xstep_status(&a[2].base) == XSTEP_DEAD); + assert(s.head == NULL); + assert(s.permits == 0); +} + +static void test_semaphore_release_overflow_to_count(void) { + /* release(n) where n > waiters: hand to all waiters first, then add + * the leftover to permits. */ + runtime_t rt; rt_init(&rt); + semaphore_t s; semaphore_init(&s, 0); + + sem_acquirer_t a; sem_acquirer_init(&a, &rt, &s); + xstep(&a.base, 0); + + semaphore_release(&s, 5); + /* a got 1; 4 leftover sat as permits. */ + assert(s.permits == 4); + rt_run(&rt, 0); + assert(a.got); + assert(s.permits == 4); +} + +static void test_semaphore_select_acquire(void) { + /* Compose a sem acquire with a cancel via wait_or_cancel: cancel wins. */ + runtime_t rt; rt_init(&rt); + semaphore_t s; semaphore_init(&s, 0); + cancel_t c; cancel_init(&c); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, semaphore_event(&s), &c); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + assert(s.head != NULL); /* sem-side waker parked */ + + cancel_set(&c); + rt_run(&rt, 0); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == WAIT_CANCELLED); + assert(s.head == NULL); /* select_input_fire detached us */ + + select_event_deinit(&sel); + /* No leak: a future release does not over-grant past the count. */ + semaphore_release(&s, 1); + assert(s.permits == 1); +} + +static void test_semaphore_binary_mutex(void) { + /* Binary semaphore as mutex: init permits=1; first take succeeds inline, + * second parks, release on the way out wakes the waiter. */ + runtime_t rt; rt_init(&rt); + semaphore_t mu; semaphore_init(&mu, 1); + + /* Holder takes it inline. */ + assert(event_try(semaphore_event(&mu), NULL)); + assert(mu.permits == 0); + + /* Contender parks. */ + sem_acquirer_t b; sem_acquirer_init(&b, &rt, &mu); + xstep(&b.base, 0); + assert(xstep_status(&b.base) == XSTEP_SUSPENDED); + + /* Holder releases on exit. */ + semaphore_release(&mu, 1); + rt_run(&rt, 0); + assert(xstep_status(&b.base) == XSTEP_DEAD); + assert(b.got); +} + +/* ---- Queue --------------------------------------------------------- */ + +/* Queue sender: same shape as the chan sender_t. */ +typedef struct { + xstep_t base; + queue_t *q; + runtime_t *rt; + queue_send_waker_t qsw; + uintptr_t value; + int phase; + bool done; +} queue_sender_t; + +static xstep_result_t queue_sender_step(xstep_t *s, uintptr_t v) { + queue_sender_t *snd = (queue_sender_t *)s; + (void)v; + switch (snd->phase) { + case 0: + if (queue_try_send(snd->q, snd->value)) { + snd->done = true; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + queue_send_waker_init(&snd->qsw, snd->rt, &snd->base, snd->value); + queue_park_send(snd->q, &snd->qsw); + snd->phase = 1; + return (xstep_result_t){0, XSTEP_SUSPENDED}; + case 1: + snd->done = true; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + __builtin_unreachable(); +} + +static void queue_sender_init(queue_sender_t *s, runtime_t *rt, queue_t *q, uintptr_t value) { + s->base = (xstep_t){.step = queue_sender_step, .status = XSTEP_INIT}; + s->q = q; + s->rt = rt; + s->value = value; + s->phase = 0; + s->done = false; +} + +static void test_queue_block_buffered(void) { + /* cap=3, BLOCK: three sends fill the buffer; three recvs drain in order. */ + runtime_t rt; rt_init(&rt); + uintptr_t buf[3]; + queue_t q; queue_init(&q, buf, 3, QUEUE_BLOCK); + + assert(queue_try_send(&q, 10)); + assert(queue_try_send(&q, 20)); + assert(queue_try_send(&q, 30)); + assert(q.len == 3); + /* Buffer full: try_send fails. */ + assert(!queue_try_send(&q, 40)); + + uintptr_t v; + assert(event_try(queue_recv_event(&q), &v) && v == 10); + assert(event_try(queue_recv_event(&q), &v) && v == 20); + assert(event_try(queue_recv_event(&q), &v) && v == 30); + assert(!event_try(queue_recv_event(&q), &v)); /* empty */ + assert(q.len == 0); + (void)rt; +} + +static void test_queue_block_sender_parks(void) { + /* Buffer fills, next sender parks; a recv unblocks it. */ + runtime_t rt; rt_init(&rt); + uintptr_t buf[2]; + queue_t q; queue_init(&q, buf, 2, QUEUE_BLOCK); + + assert(queue_try_send(&q, 1)); + assert(queue_try_send(&q, 2)); + + queue_sender_t s; queue_sender_init(&s, &rt, &q, 3); + xstep(&s.base, 0); + assert(xstep_status(&s.base) == XSTEP_SUSPENDED); + assert(q.send_head == &s.qsw.sw.base); + + /* Recv pops 1; 3 should slot into the buffer and the parked sender wakes. */ + uintptr_t v; + assert(event_try(queue_recv_event(&q), &v) && v == 1); + rt_run(&rt, 0); + assert(xstep_status(&s.base) == XSTEP_DEAD); + assert(s.done); + assert(q.send_head == NULL); + + /* Buffer should now hold 2, 3. */ + assert(event_try(queue_recv_event(&q), &v) && v == 2); + assert(event_try(queue_recv_event(&q), &v) && v == 3); + assert(q.len == 0); +} + +static void test_queue_drop_newest(void) { + /* When full, try_send returns true but silently discards. */ + uintptr_t buf[2]; + queue_t q; queue_init(&q, buf, 2, QUEUE_DROP_NEWEST); + + assert(queue_try_send(&q, 1)); + assert(queue_try_send(&q, 2)); + assert(queue_try_send(&q, 3)); /* full → drop 3 */ + assert(queue_try_send(&q, 4)); /* still full → drop 4 */ + + uintptr_t v; + assert(event_try(queue_recv_event(&q), &v) && v == 1); + assert(event_try(queue_recv_event(&q), &v) && v == 2); + assert(!event_try(queue_recv_event(&q), &v)); +} + +static void test_queue_drop_oldest(void) { + /* When full, try_send returns true and evicts head, pushes new tail. */ + uintptr_t buf[2]; + queue_t q; queue_init(&q, buf, 2, QUEUE_DROP_OLDEST); + + assert(queue_try_send(&q, 1)); + assert(queue_try_send(&q, 2)); + assert(queue_try_send(&q, 3)); /* full → evict 1, buffer = [2, 3] */ + assert(queue_try_send(&q, 4)); /* full → evict 2, buffer = [3, 4] */ + + uintptr_t v; + assert(event_try(queue_recv_event(&q), &v) && v == 3); + assert(event_try(queue_recv_event(&q), &v) && v == 4); + assert(!event_try(queue_recv_event(&q), &v)); +} + +static void test_queue_direct_handoff(void) { + /* Receiver parked; try_send hands off directly, bypassing the buffer. + * Works the same regardless of policy — exercise BLOCK here. */ + runtime_t rt; rt_init(&rt); + uintptr_t buf[2]; + queue_t q; queue_init(&q, buf, 2, QUEUE_BLOCK); + + waiter_t r; waiter_init(&r, &rt, queue_recv_event(&q)); + xstep(&r.base, 0); + assert(xstep_status(&r.base) == XSTEP_SUSPENDED); + assert(q.recv_head == &r.sw.base); + + assert(queue_try_send(&q, 0xCAFE)); + assert(q.len == 0); /* didn't touch the buffer */ + assert(q.recv_head == NULL); + + rt_run(&rt, 0); + assert(xstep_status(&r.base) == XSTEP_DEAD); + assert(r.got == 0xCAFE); +} + +static void test_queue_recv_blocks_then_drains_buffered(void) { + /* Receivers wait when empty. After buffer warms up, recvs pop FIFO + * — no value-skipping when both sides race past one another. */ + runtime_t rt; rt_init(&rt); + uintptr_t buf[2]; + queue_t q; queue_init(&q, buf, 2, QUEUE_BLOCK); + + /* 2 receivers parked. */ + waiter_t r1, r2; + waiter_init(&r1, &rt, queue_recv_event(&q)); + waiter_init(&r2, &rt, queue_recv_event(&q)); + xstep(&r1.base, 0); + xstep(&r2.base, 0); + + /* Two sends → direct handoff to the two parked receivers, not buffered. */ + assert(queue_try_send(&q, 100)); + assert(queue_try_send(&q, 200)); + assert(q.len == 0); + + rt_run(&rt, 0); + assert(xstep_status(&r1.base) == XSTEP_DEAD && r1.got == 100); + assert(xstep_status(&r2.base) == XSTEP_DEAD && r2.got == 200); +} + +static void test_queue_select_recv(void) { + /* Queue recv composes into select. */ + runtime_t rt; rt_init(&rt); + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_BLOCK); + latch_t l; latch_init(&l); + + select_event_t sel; + select_input_t inputs[2]; + event_t *srcs[2] = {queue_recv_event(&q), &l.base}; + select_event_init(&sel, inputs, 2, srcs); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + assert(queue_try_send(&q, 0xBEEF)); + rt_run(&rt, 0); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 0); + assert(inputs[0].value == 0xBEEF); + + select_event_deinit(&sel); +} + +static void test_queue_unpark_send(void) { + /* Cancel a parked sender; remaining FIFO order intact. */ + runtime_t rt; rt_init(&rt); + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_BLOCK); + + assert(queue_try_send(&q, 1)); /* fills buffer */ + + queue_sender_t a, b, d; + queue_sender_init(&a, &rt, &q, 2); + queue_sender_init(&b, &rt, &q, 3); + queue_sender_init(&d, &rt, &q, 4); + xstep(&a.base, 0); + xstep(&b.base, 0); + xstep(&d.base, 0); + + queue_unpark_send(&q, &b.qsw); + queue_unpark_send(&q, &b.qsw); /* idempotent */ + + /* Drain: 1, then a's 2 (slots into buffer when 1 popped), then d's 4. */ + uintptr_t v; + assert(event_try(queue_recv_event(&q), &v) && v == 1); + rt_run(&rt, 0); + assert(xstep_status(&a.base) == XSTEP_DEAD); + assert(event_try(queue_recv_event(&q), &v) && v == 2); + rt_run(&rt, 0); + assert(xstep_status(&d.base) == XSTEP_DEAD); + assert(event_try(queue_recv_event(&q), &v) && v == 4); + /* b never sent its value, stayed SUSPENDED. */ + assert(xstep_status(&b.base) == XSTEP_SUSPENDED); +} + +/* ---- Broadcast ----------------------------------------------------- */ + +/* Subscriber: parks on the broadcast, captures fire payload, re-parks + * on resume. n_seen counts how many publishes it received. */ +typedef struct { + xstep_t base; + broadcast_t *b; + runtime_t *rt; + step_waker_t sw; + uintptr_t last; + int n_seen; + int target; /* unsubscribe after this many */ +} bcast_sub_t; + +static xstep_result_t bcast_sub_step(xstep_t *s, uintptr_t v) { + bcast_sub_t *b = (bcast_sub_t *)s; + if (b->n_seen > 0) { + /* Resume from a fire — capture and decide whether to re-park. */ + b->last = v; + } + if (b->n_seen >= b->target) { + return (xstep_result_t){b->last, XSTEP_DEAD}; + } + /* Re-park: the runtime returned sw fully detached, so event_park + * works without re-init. (Init was done once in bcast_sub_init.) */ + event_park(broadcast_event(b->b), &b->sw.base); + b->n_seen++; + return (xstep_result_t){0, XSTEP_SUSPENDED}; +} + +static void bcast_sub_init(bcast_sub_t *s, runtime_t *rt, broadcast_t *b, int target) { + s->base = (xstep_t){.step = bcast_sub_step, .status = XSTEP_INIT}; + s->b = b; + s->rt = rt; + s->last = 0; + s->n_seen = 0; + s->target = target; + step_waker_init(&s->sw, rt, &s->base); +} + +static void test_broadcast_try_always_false(void) { + /* No "ready now" state; subscribers always wait for the next publish. */ + broadcast_t b; broadcast_init(&b); + assert(!event_try(broadcast_event(&b), NULL)); + broadcast_publish(&b, 7); + /* Even after a publish, try is still false — value is read via accessor. */ + assert(!event_try(broadcast_event(&b), NULL)); + assert(broadcast_has_value(&b)); + assert(broadcast_value(&b) == 7); +} + +static void test_broadcast_publish_wakes_all(void) { + /* Three subscribers parked; one publish wakes all with the value. */ + runtime_t rt; rt_init(&rt); + broadcast_t b; broadcast_init(&b); + + bcast_sub_t s[3]; + for (int i = 0; i < 3; i++) { + bcast_sub_init(&s[i], &rt, &b, 1); /* one publish then exit */ + xstep(&s[i].base, 0); + assert(xstep_status(&s[i].base) == XSTEP_SUSPENDED); + } + + broadcast_publish(&b, 0xBEEF); + assert(b.waiters == NULL); /* drained */ + + rt_run(&rt, 0); + for (int i = 0; i < 3; i++) { + assert(xstep_status(&s[i].base) == XSTEP_DEAD); + assert(s[i].last == 0xBEEF); + } +} + +static void test_broadcast_rearm(void) { + /* Subscriber parks for two publishes; receives both values in order. */ + runtime_t rt; rt_init(&rt); + broadcast_t b; broadcast_init(&b); + + bcast_sub_t s; bcast_sub_init(&s, &rt, &b, 2); + xstep(&s.base, 0); + + broadcast_publish(&b, 11); + rt_run(&rt, 0); + assert(xstep_status(&s.base) == XSTEP_SUSPENDED); /* re-armed */ + assert(s.last == 11); + + broadcast_publish(&b, 22); + rt_run(&rt, 0); + assert(xstep_status(&s.base) == XSTEP_DEAD); + assert(s.last == 22); +} + +static void test_broadcast_missed_publish(void) { + /* Subscriber not parked at publish time misses it; the next publish + * wakes them with that value (no replay, no coalescing into one). */ + runtime_t rt; rt_init(&rt); + broadcast_t b; broadcast_init(&b); + + /* Publish before anyone subscribes — value is in the slot but no one wakes. */ + broadcast_publish(&b, 1); + assert(broadcast_value(&b) == 1); + + bcast_sub_t s; bcast_sub_init(&s, &rt, &b, 1); + xstep(&s.base, 0); + assert(xstep_status(&s.base) == XSTEP_SUSPENDED); /* didn't see the prior */ + + broadcast_publish(&b, 2); + rt_run(&rt, 0); + assert(xstep_status(&s.base) == XSTEP_DEAD); + assert(s.last == 2); +} + +static void test_broadcast_unpark(void) { + /* event_unpark removes a subscriber cleanly. */ + runtime_t rt; rt_init(&rt); + broadcast_t b; broadcast_init(&b); + + step_waker_t sw1, sw2, sw3; + step_waker_init(&sw1, &rt, (xstep_t *)0x1); + step_waker_init(&sw2, &rt, (xstep_t *)0x2); + step_waker_init(&sw3, &rt, (xstep_t *)0x3); + event_park(broadcast_event(&b), &sw1.base); + event_park(broadcast_event(&b), &sw2.base); + event_park(broadcast_event(&b), &sw3.base); + + event_unpark(broadcast_event(&b), &sw2.base); + event_unpark(broadcast_event(&b), &sw2.base); /* idempotent */ + + int seen1 = 0, seen3 = 0; + for (waker_t *w = b.waiters; w; w = w->next) { + if (w == &sw1.base) seen1 = 1; + if (w == &sw3.base) seen3 = 1; + assert(w != &sw2.base); + } + assert(seen1 && seen3); + + event_unpark(broadcast_event(&b), &sw1.base); + event_unpark(broadcast_event(&b), &sw3.base); +} + +/* ---- Task (state-machine xstep) ------------------------------------ */ + +/* A countdown SM that completes after N steps, returning a final value. + * On every step it checks task->cancel via event_try and bails early + * (winding down to XSTEP_DEAD) if it's set. Demonstrates the cooperation + * pattern: cancel is a signal, the SM owns the unwind. */ +typedef struct { + xstep_t base; + task_t *task; + int remaining; + uintptr_t final; +} countdown_t; + +static xstep_result_t countdown_step(xstep_t *s, uintptr_t v) { + countdown_t *cd = (countdown_t *)s; + (void)v; + if (task_is_cancelled(cd->task)) { + task_done(cd->task, 0); /* cooperative unwind */ + return (xstep_result_t){0, XSTEP_DEAD}; + } + if (cd->remaining-- == 0) { + task_done(cd->task, cd->final); + return (xstep_result_t){cd->final, XSTEP_DEAD}; + } + return (xstep_result_t){0, XSTEP_SUSPENDED}; +} + +static void countdown_init(countdown_t *cd, task_t *t, int n, uintptr_t final) { + cd->base = (xstep_t){.step = countdown_step, .status = XSTEP_INIT}; + cd->task = t; + cd->remaining = n; + cd->final = final; +} + +static void test_task_state_machine_join(void) { + /* Drive a countdown SM as a task; join via task_done_event. */ + runtime_t rt; rt_init(&rt); + + countdown_t cd; + task_t t; + countdown_init(&cd, &t, 3, 0xAAAA); + task_init(&t, &cd.base); + + assert(!task_finished(&t)); + + /* Pump it manually. */ + while (xstep_status(&cd.base) != XSTEP_DEAD) { + xstep(&cd.base, 0); + } + assert(task_finished(&t)); + + /* A latched join: try the done event for the return value. */ + uintptr_t v; + assert(event_try(task_done_event(&t), &v)); + assert(v == 0xAAAA); +} + +static void test_task_state_machine_cancel(void) { + /* Cancel mid-flight; SM observes and unwinds. Joiner sees done with + * the cooperative-unwind sentinel value (here, 0). */ + runtime_t rt; rt_init(&rt); + + countdown_t cd; + task_t t; + countdown_init(&cd, &t, 100, 0xAAAA); + task_init(&t, &cd.base); + + /* A second xstep waiting for completion via wait_or_cancel-style + * shape — here, just a direct waiter on the done event. */ + waiter_t joiner; waiter_init(&joiner, &rt, task_done_event(&t)); + xstep(&joiner.base, 0); + assert(xstep_status(&joiner.base) == XSTEP_SUSPENDED); + + /* Step the task a few times. */ + xstep(&cd.base, 0); + xstep(&cd.base, 0); + assert(!task_finished(&t)); + + cancel_set(task_cancel(&t)); + /* Next step observes cancel and dies. */ + xstep(&cd.base, 0); + assert(xstep_status(&cd.base) == XSTEP_DEAD); + assert(task_finished(&t)); + + /* Joiner wakes with the done payload (0 from the cooperative unwind). */ + rt_run(&rt, 0); + assert(xstep_status(&joiner.base) == XSTEP_DEAD); + assert(joiner.got == 0); +} + /* ---- Runtime test -------------------------------------------------- */ static void test_runtime_drains(void) { @@ -1354,6 +1981,31 @@ int main(void) { test_runtime_drains(); + test_semaphore_inline_acquire(); + test_semaphore_park_then_release(); + test_semaphore_fifo(); + test_semaphore_release_overflow_to_count(); + test_semaphore_select_acquire(); + test_semaphore_binary_mutex(); + + test_queue_block_buffered(); + test_queue_block_sender_parks(); + test_queue_drop_newest(); + test_queue_drop_oldest(); + test_queue_direct_handoff(); + test_queue_recv_blocks_then_drains_buffered(); + test_queue_select_recv(); + test_queue_unpark_send(); + + test_broadcast_try_always_false(); + test_broadcast_publish_wakes_all(); + test_broadcast_rearm(); + test_broadcast_missed_publish(); + test_broadcast_unpark(); + + test_task_state_machine_join(); + test_task_state_machine_cancel(); + printf("ok\n"); return 0; } diff --git a/tests/test_xco.c b/tests/test_xco.c @@ -19,6 +19,12 @@ static alignas(XCO_STACK_ALIGN) unsigned char stack_a[STACK_BYTES]; static alignas(XCO_STACK_ALIGN) unsigned char stack_b[STACK_BYTES]; static alignas(XCO_STACK_ALIGN) unsigned char stack_c[STACK_BYTES]; +static alignas(XCO_STACK_ALIGN) unsigned char stack_y1[STACK_BYTES]; +static alignas(XCO_STACK_ALIGN) unsigned char stack_y2[STACK_BYTES]; +static alignas(XCO_STACK_ALIGN) unsigned char stack_t1[STACK_BYTES]; +static alignas(XCO_STACK_ALIGN) unsigned char stack_t2[STACK_BYTES]; +static alignas(XCO_STACK_ALIGN) unsigned char stack_t3[STACK_BYTES]; +static alignas(XCO_STACK_ALIGN) unsigned char stack_t4[STACK_BYTES]; /* Receives n via the first resume, then yields n+1, n+2, n+3, returns n+4. */ static uintptr_t counter(uintptr_t n) { @@ -93,6 +99,178 @@ static uintptr_t outer(uintptr_t arg) { return 999; } +/* ---- xco_yield ----------------------------------------------------- */ + +/* Each yielder appends its id to the trace on every step; ends after N + * yields. With two yielders alternately-scheduled by rt, the trace + * shows interleaving — the proof that yield gives the runtime control. */ +static int trace[64]; +static int trace_len; + +typedef struct { + int id; + int n; + runtime_t *rt; +} yielder_args_t; + +static uintptr_t yielder(uintptr_t arg) { + yielder_args_t *ya = (yielder_args_t *)arg; + for (int i = 0; i < ya->n; i++) { + trace[trace_len++] = ya->id; + xco_yield(ya->rt); + } + return (uintptr_t)ya->id; +} + +static void test_xco_yield_alternates(void) { + runtime_t rt; rt_init(&rt); + trace_len = 0; + + yielder_args_t a1 = {.id = 1, .n = 3, .rt = &rt}; + yielder_args_t a2 = {.id = 2, .n = 3, .rt = &rt}; + + xco_t c1, c2; + /* Spawn each: first step records the id, then yields back to caller + * via the runtime ready queue. After spawn, both are SUSPENDED and + * enqueued. */ + xstep_result_t r1 = xco_spawn(&c1, yielder, stack_y1, STACK_BYTES, (uintptr_t)&a1); + assert(r1.status == XSTEP_SUSPENDED); + xstep_result_t r2 = xco_spawn(&c2, yielder, stack_y2, STACK_BYTES, (uintptr_t)&a2); + assert(r2.status == XSTEP_SUSPENDED); + + /* Drain the runtime: each yielder runs another step, yields again, + * until both reach DEAD. */ + rt_run(&rt, 0); + assert(xco_status(&c1) == XSTEP_DEAD); + assert(xco_status(&c2) == XSTEP_DEAD); + + /* Trace alternates: 1 2 1 2 1 2 (FIFO ready-queue ordering). */ + assert(trace_len == 6); + int expect[6] = {1, 2, 1, 2, 1, 2}; + for (int i = 0; i < 6; i++) assert(trace[i] == expect[i]); +} + +/* ---- xco_task ------------------------------------------------------ */ + +/* Body returns immediately with arg + 1. The cleanest possible task — + * verifies trampoline wires task_done with the return value. */ +static uintptr_t task_body_simple(task_t *self, uintptr_t arg) { + (void)self; + return arg + 1; +} + +static void test_xco_task_join_inline(void) { + /* Task runs to completion synchronously inside xco_task_spawn; + * task.done is set by the trampoline. */ + xco_task_t xt; + xstep_result_t r = xco_task_spawn(&xt, task_body_simple, + stack_t1, STACK_BYTES, 41); + assert(r.status == XSTEP_DEAD); + assert(r.value == 42); + assert(task_finished(&xt.task)); + + uintptr_t v; + assert(event_try(task_done_event(&xt.task), &v)); + assert(v == 42); +} + +/* Body that suspends on a latch then returns. Uses xco_await — the + * try-park-suspend dance compresses to one call. */ +static latch_t task_gate; +static uintptr_t task_body_gated(task_t *self, uintptr_t arg) { + (void)self; + runtime_t *rt = (runtime_t *)arg; + return xco_await(rt, &task_gate.base) + 100; +} + +static void test_xco_task_join_via_runtime(void) { + /* Task suspends inside body; gate fires, body returns, trampoline + * sets task.done. Verifying via task_done_event read after rt_run. */ + runtime_t rt; rt_init(&rt); + latch_init(&task_gate); + + xco_task_t xt; + xstep_result_t r = xco_task_spawn(&xt, task_body_gated, + stack_t2, STACK_BYTES, (uintptr_t)&rt); + assert(r.status == XSTEP_SUSPENDED); + assert(!task_finished(&xt.task)); + + latch_set(&task_gate, 7); + rt_run(&rt, 0); + assert(task_finished(&xt.task)); + + uintptr_t v; + assert(event_try(task_done_event(&xt.task), &v)); + assert(v == 107); +} + +/* Joiner is itself a task — its body awaits the target's done event, + * captures the return value as its own return. The cleanest expression + * of "wait for another task to finish, get its result." */ +typedef struct { + runtime_t *rt; + xco_task_t *target; +} joiner_args_t; + +static uintptr_t task_joiner(task_t *self, uintptr_t arg) { + (void)self; + joiner_args_t *ja = (joiner_args_t *)arg; + return xco_await(ja->rt, task_done_event(&ja->target->task)); +} + +static void test_xco_task_joined_by_other_task(void) { + runtime_t rt; rt_init(&rt); + latch_init(&task_gate); + + xco_task_t target; + xco_task_spawn(&target, task_body_gated, + stack_t2, STACK_BYTES, (uintptr_t)&rt); + assert(!task_finished(&target.task)); + + joiner_args_t ja = {.rt = &rt, .target = &target}; + xco_task_t joiner; + xco_task_spawn(&joiner, task_joiner, + stack_t4, STACK_BYTES, (uintptr_t)&ja); + assert(!task_finished(&joiner.task)); + + latch_set(&task_gate, 7); + rt_run(&rt, 0); + assert(task_finished(&target.task)); + assert(task_finished(&joiner.task)); + + /* Joiner's return is the target's return propagated through. */ + uintptr_t v; + assert(event_try(task_done_event(&joiner.task), &v)); + assert(v == 107); +} + +/* Cancellable body: awaits a never-firing latch under the task's cancel. + * Returns 1 on event, 0 on cancel — surfacing the outcome in done. */ +static uintptr_t task_body_cancellable(task_t *self, uintptr_t arg) { + runtime_t *rt = (runtime_t *)arg; + latch_t never; latch_init(&never); + uintptr_t v; + return xco_await_or_cancel(rt, &never.base, task_cancel(self), &v) ? 1 : 0; +} + +static void test_xco_task_cancel(void) { + runtime_t rt; rt_init(&rt); + + xco_task_t xt; + xstep_result_t r = xco_task_spawn(&xt, task_body_cancellable, + stack_t3, STACK_BYTES, (uintptr_t)&rt); + assert(r.status == XSTEP_SUSPENDED); + assert(!task_finished(&xt.task)); + + cancel_set(task_cancel(&xt.task)); + rt_run(&rt, 0); + assert(task_finished(&xt.task)); + + uintptr_t v; + assert(event_try(task_done_event(&xt.task), &v)); + assert(v == 0); /* cancelled */ +} + int main(void) { assert(xco_self() == NULL); @@ -122,6 +300,12 @@ int main(void) { counter_sm_init(&sm); drive_counter(&sm.base); + test_xco_yield_alternates(); + test_xco_task_join_inline(); + test_xco_task_join_via_runtime(); + test_xco_task_joined_by_other_task(); + test_xco_task_cancel(); + printf("ok\n"); return 0; } diff --git a/xco.c b/xco.c @@ -103,3 +103,26 @@ uintptr_t xco_suspend(uintptr_t value) { xco_t *xco_self(void) { return (xco_t *)t_current; } + +/* ---- xco-backed task -------------------------------------------------- */ + +/* The trampoline: runs as the coroutine's xco_fn. Recovers the owning + * xco_task_t via container_of on the embedded co (xco_self() returns + * the running xco), dispatches the user fn, and surfaces the return + * value through task_done — so a joiner waiting on task_done_event + * wakes with the body's return without the body needing to know about + * the task surface at all. */ +static uintptr_t xco_task_trampoline(uintptr_t arg) { + xco_t *self = xco_self(); + xco_task_t *xt = (xco_task_t *)((char *)self - offsetof(xco_task_t, co)); + uintptr_t r = xt->fn(&xt->task, arg); + task_done(&xt->task, r); + return r; +} + +void xco_task_init(xco_task_t *xt, xco_task_fn fn, + void *stack_base, size_t stack_len) { + task_init(&xt->task, &xt->co.base); + xt->fn = fn; + xco_init(&xt->co, xco_task_trampoline, stack_base, stack_len); +} diff --git a/xco.h b/xco.h @@ -27,6 +27,7 @@ #include <stdint.h> #include "xstep.h" +#include "event.h" /* Provides XCO_SIZE, XCO_ALIGN, XCO_STACK_ALIGN; resolved by the * build to the arch-specific copy via the include path. */ #include "xco_arch.h" @@ -83,4 +84,94 @@ static inline xstep_result_t xco_spawn(xco_t *c, xco_fn fn, return xco_resume(c, arg); } +/* Cooperative yield. Enqueues self on rt's ready queue and suspends; + * the next rt_run pass resumes us. Useful for fairness when a coroutine + * wants to give other ready work a turn between long-running steps. + * Must be called from inside a coroutine driven by rt. */ +static inline void xco_yield(runtime_t *rt) { + step_waker_t sw; + step_waker_init(&sw, rt, &xco_self()->base); + rt_enqueue(rt, &sw.base); + xco_suspend(0); +} + +/* Await an event from inside a coroutine. The standard try-park-suspend + * dance, in one call. Returns the event's value (delivered by fire on + * the slow path, by event_try on the fast path). Must be called from + * inside a coroutine driven by rt. */ +static inline uintptr_t xco_await(runtime_t *rt, event_t *e) { + uintptr_t v; + if (event_try(e, &v)) return v; + step_waker_t sw; + step_waker_init(&sw, rt, &xco_self()->base); + event_park(e, &sw.base); + xco_suspend(0); + (void)event_try(e, &v); + return v; +} + +/* Await ev or be cancelled by c. Returns true if ev fired (its payload + * is written to *out, which may be NULL); false if cancelled. The + * internal select_event is always cleaned up before return. + * + * The canonical shape for cooperative work inside a task body — pair + * with task_cancel(self) on every blocking await. */ +static inline bool xco_await_or_cancel(runtime_t *rt, event_t *ev, + cancel_t *c, uintptr_t *out) { + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, ev, c); + uintptr_t winner = xco_await(rt, &sel.done.base); + bool ok = (winner == WAIT_OK); + if (ok && out) *out = inputs[WAIT_OK].value; + select_event_deinit(&sel); + return ok; +} + +/* ---- xco-backed task ------------------------------------------------- */ + +/* xco specialization of task_t (event.h). Bundles the user-visible task + * handle with the xco that runs it; the trampoline calls fn(&xt->task, arg) + * and then task_done with its return value, so wait_or_cancel-style + * teardown works without the user wiring anything. + * + * The trampoline recovers xt from xco_self() at first entry (container_of + * on the embedded co), so the first-resume uintptr_t is preserved as + * fn's arg under normal xco semantics. Subsequent resumes pass values + * to the coroutine in the usual way. + * + * Storage shape: caller allocates xco_task_t and a stack. The task_t + * inside is the handle to wait/cancel on; cancel via cancel_set on + * &xt->task.cancel and the body is expected to observe it (typically + * by composing wait_or_cancel against task_cancel(&xt->task)). */ + +typedef uintptr_t (*xco_task_fn)(task_t *t, uintptr_t arg); + +typedef struct xco_task { + task_t task; + xco_t co; + xco_task_fn fn; +} xco_task_t; + +/* Initialize xt to run fn on the given stack. After this the embedded + * xco is XSTEP_INIT; drive it with xco_task_resume / xco_task_spawn. */ +void xco_task_init(xco_task_t *xt, xco_task_fn fn, + void *stack_base, size_t stack_len); + +/* Resume the task. value is delivered as fn's arg on the first call and + * as the result of the pending xco_suspend on subsequent calls — same + * semantics as xco_resume on a bare xco. */ +static inline xstep_result_t xco_task_resume(xco_task_t *xt, uintptr_t value) { + return xco_resume(&xt->co, value); +} + +/* Convenience: init then first-resume in one call. arg is delivered as + * fn's argument. */ +static inline xstep_result_t xco_task_spawn(xco_task_t *xt, xco_task_fn fn, + void *stack_base, size_t stack_len, + uintptr_t arg) { + xco_task_init(xt, fn, stack_base, stack_len); + return xco_task_resume(xt, arg); +} + #endif /* XCO_H */