xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit 4d2f6297ebb294ab4f31bdaae93404c817e770df
parent 7cc24269a31cc36717b7d32f3dfcb3218c9ddca2
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 10:31:43 -0700

Add countdown, notify, mutex, ticker, task_group, send-op, and chan/queue close

Seven new primitives building on the existing event substrate:

- countdown_t: latch-backed fan-in counter (add/done/event).
- notify_t: transient wake-one/wake-all signal, FIFO waitlist.
- mutex_t: semaphore_t alias with binary-semaphore wrappers.
- queue_send_op_t: selectable send mirroring chan_send_op_t.
- ticker_t: re-armable timer-driven signal with skip-ahead.
- task_group_t: fan-in join + fan-out cancel over caller-allocated slots.
- chan/queue close: EOF semantics, delivered-bit on send-wakers, typed
  chan_recv / queue_recv returning RECV_GOT/EMPTY/CLOSED, idempotent
  close that drains parked senders and wakes parked receivers.

Diffstat:
Mevent.c | 392++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mevent.h | 246+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Mtests/test_event.c | 902++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 1507 insertions(+), 33 deletions(-)

diff --git a/event.c b/event.c @@ -349,15 +349,25 @@ static inline chan_t *chan_of_recv(event_t *e) { static bool chan_recv_try(event_t *e, uintptr_t *out) { chan_t *c = chan_of_recv(e); waker_t *w = chan_q_pop(&c->send_head, &c->send_tail); - if (!w) return false; - /* w is &csw->sw.base; sw is the first field of chan_send_waker_t, - * and base is the first field of step_waker_t, so addresses align. */ - chan_send_waker_t *csw = (chan_send_waker_t *)w; - if (out) *out = csw->value; - /* Resume the sender. The fire value (delivery confirmation) is - * unused — receivers learn the value, senders just learn "done". */ - waker_fire(w, 0); - return true; + if (w) { + /* w is &csw->sw.base; sw is the first field of chan_send_waker_t, + * and base is the first field of step_waker_t, so addresses align. */ + chan_send_waker_t *csw = (chan_send_waker_t *)w; + if (out) *out = csw->value; + csw->delivered = true; + /* Resume the sender. The fire value is unused for step-wakers; + * for op senders, _chan_send_op_fire reads csw->delivered. */ + waker_fire(w, 0); + return true; + } + /* Close makes the recv event "ready" with no value: the receiver is + * expected to call chan_recv to learn it's RECV_CLOSED. *out is + * undefined in that case (set to 0 here for determinism). */ + if (c->closed) { + if (out) *out = 0; + return true; + } + return false; } static void chan_recv_park(event_t *e, waker_t *w) { @@ -377,6 +387,7 @@ const event_vtable_t _chan_recv_vt = { }; bool chan_try_send(chan_t *c, uintptr_t value) { + if (c->closed) return false; waker_t *w = chan_q_pop(&c->recv_head, &c->recv_tail); if (!w) return false; /* Hand the value to the recv-side waker. step_waker stashes it as @@ -386,6 +397,9 @@ bool chan_try_send(chan_t *c, uintptr_t value) { } void chan_park_send(chan_t *c, chan_send_waker_t *csw) { + /* park_send after close is UB — caller must check chan_is_closed + * (typically via chan_try_send returning false plus chan_is_closed). */ + assert(!c->closed); chan_q_push(&c->send_head, &c->send_tail, &csw->sw.base); } @@ -393,17 +407,52 @@ void chan_unpark_send(chan_t *c, chan_send_waker_t *csw) { chan_q_remove(&c->send_head, &c->send_tail, &csw->sw.base); } +recv_status_t chan_recv(chan_t *c, uintptr_t *out) { + waker_t *w = chan_q_pop(&c->send_head, &c->send_tail); + if (w) { + chan_send_waker_t *csw = (chan_send_waker_t *)w; + if (out) *out = csw->value; + csw->delivered = true; + waker_fire(w, 0); + return RECV_GOT; + } + if (c->closed) return RECV_CLOSED; + return RECV_EMPTY; +} + +void chan_close(chan_t *c) { + if (c->closed) return; + c->closed = true; + + /* Drain parked senders with delivered=false. waker_fire detaches + * before invoking the callback, so re-park inside fire (e.g. to + * land on the runtime ready queue) is safe. */ + waker_t *w; + while ((w = chan_q_pop(&c->send_head, &c->send_tail)) != NULL) { + chan_send_waker_t *csw = (chan_send_waker_t *)w; + csw->delivered = false; + waker_fire(w, 0); + } + /* Wake parked receivers so they observe RECV_CLOSED via chan_recv. + * Fire value is irrelevant — the recv event_try will return true + * because c->closed is set, but receivers should use chan_recv. */ + while ((w = chan_q_pop(&c->recv_head, &c->recv_tail)) != NULL) { + waker_fire(w, 0); + } +} + /* ---- Send op (selectable send) ---------------------------------------- */ void _chan_send_op_fire(waker_t *w, uintptr_t value) { /* Receiver hands no payload to the sender on delivery — the sender - * just learns "delivered." The value is what flowed the other way. */ + * learns whether its value reached someone via op->csw.delivered, + * set by chan_recv* (true) or chan_close (false). */ (void)value; /* csw is the first field of chan_send_op_t; sw is first of * chan_send_waker_t; base is first of step_waker_t. All offsets * coincide, so w aliases op. */ chan_send_op_t *op = (chan_send_op_t *)w; - latch_set(&op->done, 0); + latch_set(&op->done, op->csw.delivered ? 1 : 0); } void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value) { @@ -415,9 +464,15 @@ void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value) { op->chan = c; latch_init(&op->done); + if (c->closed) { + /* Closed channel: no delivery possible, resolve immediately. */ + latch_set(&op->done, 0); + return; + } if (chan_try_send(c, value)) { /* Inline delivery: no parking, done set immediately. */ - latch_set(&op->done, 0); + op->csw.delivered = true; + latch_set(&op->done, 1); return; } chan_park_send(c, &op->csw); @@ -467,6 +522,7 @@ static void queue_drain_one_sender(queue_t *q) { waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); queue_send_waker_t *qsw = (queue_send_waker_t *)w; queue_push_buf(q, qsw->value); + qsw->delivered = true; /* Fire after pushing so the sender sees its delivery as complete. */ waker_fire(w, 0); } @@ -485,9 +541,16 @@ static bool queue_recv_try(event_t *e, uintptr_t *out) { waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); queue_send_waker_t *qsw = (queue_send_waker_t *)w; if (out) *out = qsw->value; + qsw->delivered = true; waker_fire(w, 0); return true; } + /* Closed and drained: receivers learn EOF via queue_recv; out is + * undefined. */ + if (q->closed) { + if (out) *out = 0; + return true; + } return false; } @@ -508,6 +571,12 @@ const event_vtable_t _queue_recv_vt = { }; bool queue_try_send(queue_t *q, uintptr_t value) { + if (q->closed) { + /* Send-after-close. BLOCK: no delivery, signal failure. DROP_*: + * silently drop (queue policy already says "may be lost"). */ + if (q->policy == QUEUE_BLOCK) return false; + return true; + } /* Direct handoff first: parked receivers always win over the buffer. * This is the rendezvous case and the cap==0 case. */ waker_t *w = chan_q_pop(&q->recv_head, &q->recv_tail); @@ -535,8 +604,9 @@ bool queue_try_send(queue_t *q, uintptr_t value) { void queue_park_send(queue_t *q, queue_send_waker_t *qsw) { /* DROP_* never parks (try_send always returns true); only valid - * for BLOCK. */ + * for BLOCK. park_send after close is UB. */ assert(q->policy == QUEUE_BLOCK); + assert(!q->closed); chan_q_push(&q->send_head, &q->send_tail, &qsw->sw.base); } @@ -544,6 +614,82 @@ void queue_unpark_send(queue_t *q, queue_send_waker_t *qsw) { chan_q_remove(&q->send_head, &q->send_tail, &qsw->sw.base); } +recv_status_t queue_recv(queue_t *q, uintptr_t *out) { + if (q->len > 0) { + uintptr_t v = queue_pop_buf(q); + if (out) *out = v; + queue_drain_one_sender(q); + return RECV_GOT; + } + if (q->send_head) { + waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); + queue_send_waker_t *qsw = (queue_send_waker_t *)w; + if (out) *out = qsw->value; + qsw->delivered = true; + waker_fire(w, 0); + return RECV_GOT; + } + if (q->closed) return RECV_CLOSED; + return RECV_EMPTY; +} + +void queue_close(queue_t *q) { + if (q->closed) return; + q->closed = true; + + /* Drain parked senders with delivered=false. Senders only park + * under BLOCK, so this is no-op for DROP_* (their waitlist is + * always empty). */ + waker_t *w; + while ((w = chan_q_pop(&q->send_head, &q->send_tail)) != NULL) { + queue_send_waker_t *qsw = (queue_send_waker_t *)w; + qsw->delivered = false; + waker_fire(w, 0); + } + /* Wake parked receivers so they can observe closed via queue_recv. + * Receivers may still drain buffered values first — queue_recv's + * RECV_GOT path is hit before the RECV_CLOSED branch. */ + while ((w = chan_q_pop(&q->recv_head, &q->recv_tail)) != NULL) { + waker_fire(w, 0); + } +} + +/* ---- Queue send op (selectable send) ---------------------------------- */ + +void _queue_send_op_fire(waker_t *w, uintptr_t value) { + (void)value; + queue_send_op_t *op = (queue_send_op_t *)w; + latch_set(&op->done, op->qsw.delivered ? 1 : 0); +} + +void queue_send_op_init(queue_send_op_t *op, queue_t *q, uintptr_t value) { + queue_send_waker_init(&op->qsw, NULL, NULL, value); + op->qsw.sw.base.fire = _queue_send_op_fire; + op->queue = q; + latch_init(&op->done); + + if (q->closed) { + /* BLOCK: no delivery; DROP_*: dropped per policy. Either way the + * value did not reach a receiver, so delivered=false. */ + latch_set(&op->done, 0); + return; + } + if (queue_try_send(q, value)) { + /* Inline accept: handoff to receiver, buffered, or DROP_* policy + * accepted it. */ + op->qsw.delivered = true; + latch_set(&op->done, 1); + return; + } + /* Only BLOCK policy with full buffer reaches here. */ + queue_park_send(q, &op->qsw); +} + +void queue_send_op_deinit(queue_send_op_t *op) { + if (op->done.set) return; + queue_unpark_send(op->queue, &op->qsw); +} + /* ---- Broadcast (slot) ------------------------------------------------- */ /* The waitlist uses the same doubly-linked LIFO shape as latch — there @@ -599,6 +745,53 @@ void broadcast_publish(broadcast_t *b, uintptr_t value) { } } +/* ---- Notify ----------------------------------------------------------- */ + +/* Doubly-linked FIFO waitlist (same shape as the chan/queue waitlists). + * notify_one fires the head; notify_all detaches the whole list before + * iterating so callbacks can re-park onto a fresh waitlist without + * iterator hazards (same pattern as latch_set). event_try is always + * false: notify is purely transient. */ + +static bool notify_try(event_t *e, uintptr_t *out) { + (void)e; (void)out; + return false; +} + +static void notify_park(event_t *e, waker_t *w) { + notify_t *n = (notify_t *)e; + chan_q_push(&n->head, &n->tail, w); +} + +static void notify_unpark(event_t *e, waker_t *w) { + notify_t *n = (notify_t *)e; + chan_q_remove(&n->head, &n->tail, w); +} + +const event_vtable_t _notify_vt = { + .try_ = notify_try, + .park = notify_park, + .unpark = notify_unpark, +}; + +void notify_one(notify_t *n) { + waker_t *w = chan_q_pop(&n->head, &n->tail); + if (!w) return; + waker_fire(w, 0); +} + +void notify_all(notify_t *n) { + /* Detach before iterating: re-parking inside fire lands on a fresh + * (empty) list. Walk the snapshot via saved next pointers. */ + waker_t *w = n->head; + n->head = n->tail = NULL; + while (w) { + waker_t *next = w->next; + waker_fire(w, 0); + w = next; + } +} + /* ---- Timer ------------------------------------------------------------ */ /* The timer-as-event surface is just the embedded latch's vtable: try @@ -798,3 +991,176 @@ void timeout_deinit(timeout_t *to) { event_unpark(timer_event(&to->timer), &to->bridge); timer_deinit(&to->timer); } + +/* ---- Ticker ----------------------------------------------------------- */ + +/* The ticker's event surface uses the broadcast-style LIFO doubly-linked + * waitlist: subscribers are fired all-at-once on each tick, so order + * doesn't matter; doubly-linked gives O(1) unpark for cancellation. */ + +static bool ticker_try(event_t *e, uintptr_t *out) { + (void)e; (void)out; + return false; /* transient — wait for the next tick */ +} + +static void ticker_park(event_t *e, waker_t *w) { + ticker_t *t = (ticker_t *)((char *)e - offsetof(ticker_t, base)); + assert(!w->prev && !w->next); + w->next = t->waiters; + if (t->waiters) t->waiters->prev = w; + t->waiters = w; +} + +static void ticker_unpark(event_t *e, waker_t *w) { + ticker_t *t = (ticker_t *)((char *)e - offsetof(ticker_t, base)); + if (!w->prev && t->waiters != w) return; + if (w->prev) w->prev->next = w->next; + else t->waiters = w->next; + if (w->next) w->next->prev = w->prev; + w->prev = w->next = NULL; +} + +const event_vtable_t _ticker_vt = { + .try_ = ticker_try, + .park = ticker_park, + .unpark = ticker_unpark, +}; + +/* Bridge waker: parks on the underlying timer's latch. On fire, compute + * the next deadline (skip-ahead-safe), reinstall the timer, re-park the + * bridge on the new timer, then fire every parked subscriber with the + * just-fired deadline. The waitlist is detached before iteration so + * subscribers can re-park inside their fire callbacks. */ +static void _ticker_bridge_fire(waker_t *w, uintptr_t value) { + ticker_t *t = (ticker_t *)((char *)w - offsetof(ticker_t, bridge)); + uint64_t fired = (uint64_t)value; + uint64_t next = fired + t->period; + /* Skip-ahead: in the rare overflow case (period = 0 or wraparound), + * step forward enough to keep next > fired. */ + if (next <= fired) { + next += ((fired - next) / t->period + 1) * t->period; + } + /* Reinstall the timer for the next tick. The latch's storage is + * reused — timer_init runs latch_init on it. */ + timer_init(&t->timer, t->src, next); + /* Bridge waker is fully detached (waker_fire just cleared its + * links); park it on the freshly-armed timer. */ + event_park(timer_event(&t->timer), &t->bridge); + + /* Fire the subscribers. Detach the waitlist first so re-park inside + * fire lands on the now-empty list. */ + waker_t *waiters = t->waiters; + t->waiters = NULL; + while (waiters) { + waker_t *nxt = waiters->next; + waker_fire(waiters, (uintptr_t)fired); + waiters = nxt; + } +} + +void ticker_init(ticker_t *t, timers_t *ts, + uint64_t period, uint64_t first_deadline) { + /* period must be positive — the skip-ahead computation in the bridge + * divides by period, and a zero-period ticker would loop forever + * inside ph_advance. */ + assert(period > 0); + t->base.vt = &_ticker_vt; + t->src = ts; + t->period = period; + t->waiters = NULL; + + timer_init(&t->timer, ts, first_deadline); + + t->bridge.next = NULL; + t->bridge.prev = NULL; + t->bridge.fire = _ticker_bridge_fire; + event_park(timer_event(&t->timer), &t->bridge); +} + +void ticker_deinit(ticker_t *t) { + /* Pull the bridge off the timer (no-op if already fired) and cancel + * the timer. Subscribers' wakers are the caller's storage; nothing + * to free here. */ + event_unpark(timer_event(&t->timer), &t->bridge); + timer_deinit(&t->timer); +} + +/* ---- Task group ------------------------------------------------------- */ + +/* Each attach contributes one to the countdown and parks a bridge waker + * on the task's done event. Bridge fire splices the slot out of the + * group's list and decrements the countdown — the join event fires when + * the last attached task finishes. + * + * Cancellation is fan-out: walk the list, set each task's cancel, then + * set the group-level cancel. Bodies cooperate by composing their work + * with task_cancel(self); the group-level cancel is for non-task + * waiters that want to react to "the group has been told to stop." */ + +static void _task_group_detach_slot(task_group_t *g, group_attach_t *slot) { + /* Doubly-linked, head/tail tracked; same shape as other waitlists. */ + if (slot->prev) slot->prev->next = slot->next; + else g->head = slot->next; + if (slot->next) slot->next->prev = slot->prev; + else g->tail = slot->prev; + slot->prev = slot->next = NULL; +} + +static void _task_group_bridge_fire(waker_t *w, uintptr_t value) { + (void)value; + group_attach_t *slot = (group_attach_t *)((char *)w - offsetof(group_attach_t, bridge)); + task_group_t *g = slot->group; + _task_group_detach_slot(g, slot); + countdown_done(&g->pending); +} + +void task_group_init(task_group_t *g) { + /* Don't go through countdown_init(0) — that fires the latch + * immediately, which would make the very first attach's + * countdown_add UB. The group's join must remain not-fired until at + * least one attached task has finished, so we open with + * remaining=0 and an unset latch. The first attach lifts remaining + * to 1, and matching countdown_dones bring it back to 0, firing + * the latch. */ + latch_init(&g->pending.done); + g->pending.remaining = 0; + cancel_init(&g->cancel); + g->head = g->tail = NULL; +} + +void task_group_attach(task_group_t *g, task_t *t, group_attach_t *slot) { + countdown_add(&g->pending, 1); + + slot->task = t; + slot->group = g; + + /* Append to the group's list (FIFO; ordering doesn't affect cancel + * fan-out semantics, but consistent with other waitlists in the + * codebase). */ + slot->prev = g->tail; + slot->next = NULL; + if (g->tail) g->tail->next = slot; + else g->head = slot; + g->tail = slot; + + slot->bridge.next = NULL; + slot->bridge.prev = NULL; + slot->bridge.fire = _task_group_bridge_fire; + + /* Park on the task's done event. If the task has already finished + * (re-attaching is UB per the contract, but if the task fired + * before attach finished initializing — e.g. an inline-spawned + * synchronous task), the latch_park assert would catch it. */ + event_park(task_done_event(t), &slot->bridge); +} + +void task_group_cancel(task_group_t *g) { + /* Fan-out cancel: signal each attached task. Walk the snapshot + * (cancel doesn't detach the slot — only task done does — so the + * list is stable across iteration). */ + for (group_attach_t *s = g->head; s; s = s->next) { + cancel_set(&s->task->cancel); + } + /* Group-level cancel for anyone awaiting "the group as a whole." */ + cancel_set(&g->cancel); +} diff --git a/event.h b/event.h @@ -211,6 +211,63 @@ static inline void latch_init(latch_t *l) { void latch_set(latch_t *l, uintptr_t value); +/* ---- Countdown -------------------------------------------------------- */ + +/* One-shot fan-in counter. Fires its embedded latch (payload 0) when + * remaining hits 0. countdown_add(n) is legal while remaining > 0; + * countdown_done decrements; both are UB once the latch has fired. + * + * Compose with the standard event API via countdown_event(). */ +typedef struct countdown { + latch_t done; + size_t remaining; +} countdown_t; + +static inline void countdown_init(countdown_t *c, size_t n) { + latch_init(&c->done); + c->remaining = n; + if (n == 0) latch_set(&c->done, 0); +} + +static inline void countdown_add(countdown_t *c, size_t n) { + /* UB after fire — caller's contract. */ + c->remaining += n; +} + +static inline void countdown_done(countdown_t *c) { + /* UB at 0 — caller's contract. */ + if (--c->remaining == 0) latch_set(&c->done, 0); +} + +static inline event_t *countdown_event(countdown_t *c) { return &c->done.base; } +static inline bool countdown_fired(const countdown_t *c) { return c->done.set; } + +/* ---- Notify (wake-one / wake-all) ------------------------------------- */ + +/* Transient signal with no sticky state. notify_one fires (and detaches) + * the head of a FIFO waitlist; notify_all fires every parked waker. Both + * are no-ops when the waitlist is empty. Subscribers must re-park to see + * subsequent notifications. + * + * event_try always returns false: there is no "ready now" state — a + * subscriber waits for the *next* notify. */ +typedef struct notify { + event_t base; + waker_t *head, *tail; +} notify_t; + +extern const event_vtable_t _notify_vt; + +static inline void notify_init(notify_t *n) { + n->base.vt = &_notify_vt; + n->head = n->tail = NULL; +} + +static inline event_t *notify_event(notify_t *n) { return &n->base; } + +void notify_one(notify_t *n); +void notify_all(notify_t *n); + /* ---- Semaphore -------------------------------------------------------- */ /* Counting semaphore. acquire is exposed as event_t (composable with @@ -246,6 +303,17 @@ static inline event_t *semaphore_event(semaphore_t *s) { return &s->acquire; } void semaphore_release(semaphore_t *s, size_t n); +/* ---- Mutex ------------------------------------------------------------ */ + +/* Binary semaphore wrapper for vocabulary at call sites. mutex_init is + * semaphore_init(s, 1); the event_t fires once per release; mutex_release + * hands the permit to the next waiter (or returns it to the count). */ +typedef semaphore_t mutex_t; + +static inline void mutex_init (mutex_t *m) { semaphore_init(m, 1); } +static inline event_t *mutex_event (mutex_t *m) { return semaphore_event(m); } +static inline void mutex_release(mutex_t *m) { semaphore_release(m, 1); } + /* ---- Select / all-of -------------------------------------------------- */ /* Wait over N input events. Two semantics share the same storage shape, @@ -322,12 +390,27 @@ void select_event_deinit(select_event_t *s); * recv + no sender receiver parks (event_park on recv); peer * delivers later. * - * FIFO order on both waitlists. */ + * FIFO order on both waitlists. + * + * Close: optional EOF semantics. After chan_close, try_send fails (no + * delivery), parked senders are drained with delivered=false, and parked + * receivers are woken so they can observe RECV_CLOSED via chan_recv. The + * recv event is "ready" iff a value is available OR the channel is + * closed — receivers must call chan_recv to disambiguate value vs EOF. + * chan_park_send after close is UB. */ + +/* Result of a typed receive on a channel or queue. */ +typedef enum { + RECV_GOT, /* *out holds the delivered value */ + RECV_EMPTY, /* nothing available right now; caller may park */ + RECV_CLOSED, /* peer closed and no values remain */ +} recv_status_t; typedef struct chan { event_t recv; /* the recv-side event */ waker_t *send_head, *send_tail; /* parked chan_send_waker_t bases */ waker_t *recv_head, *recv_tail; /* parked recv-side wakers */ + bool closed; } chan_t; extern const event_vtable_t _chan_recv_vt; @@ -336,46 +419,71 @@ static inline void chan_init(chan_t *c) { c->recv.vt = &_chan_recv_vt; c->send_head = c->send_tail = NULL; c->recv_head = c->recv_tail = NULL; + c->closed = false; } /* Sender-side waker. Embeds a step_waker (the actual scheduler bridge) * and the value to deliver. The receiver reads .value, then fires the - * step_waker — the sender resumes via the runtime, no payload passed. */ + * step_waker — the sender resumes via the runtime, no payload passed. + * + * `delivered` is set by the closing side before fire: true on a normal + * recv handoff, false when chan_close drains the parked-sender list. + * Senders read it after resume to know whether their value reached a + * receiver. */ typedef struct { step_waker_t sw; uintptr_t value; + bool delivered; } chan_send_waker_t; static inline void chan_send_waker_init(chan_send_waker_t *csw, runtime_t *rt, xstep_t *s, uintptr_t value) { step_waker_init(&csw->sw, rt, s); - csw->value = value; + csw->value = value; + csw->delivered = false; } -/* Try to deliver value inline. Returns true iff a receiver was parked; - * its waker fires with the value, the receiver becomes ready, and the - * sender continues without parking. */ +/* Try to deliver value inline. Returns true iff a receiver was parked + * AND the channel is open; its waker fires with the value, the receiver + * becomes ready, and the sender continues without parking. Returns false + * after close (no delivery). */ bool chan_try_send(chan_t *c, uintptr_t value); /* Park a sender. csw->value must already be set (use chan_send_waker_init). - * The sender's xstep is resumed when a receiver consumes the value. */ + * The sender's xstep is resumed when a receiver consumes the value, or + * when chan_close drains the list (sender resumes with csw->delivered + * false). Calling chan_park_send on a closed channel is UB. */ void chan_park_send(chan_t *c, chan_send_waker_t *csw); /* Remove a parked sender (cancellation). No-op if not parked. */ void chan_unpark_send(chan_t *c, chan_send_waker_t *csw); +/* Typed receive. Disambiguates value vs EOF where event_try cannot. */ +recv_status_t chan_recv(chan_t *c, uintptr_t *out); + +/* Close the channel. Idempotent. Drains parked senders (each resumes + * with csw->delivered = false) and wakes parked receivers (which then + * observe RECV_CLOSED via chan_recv). Subsequent chan_try_send returns + * false; chan_park_send is UB. */ +void chan_close(chan_t *c); +static inline bool chan_is_closed(const chan_t *c) { return c->closed; } + /* Selectable send op. A per-call object that holds the value, parks * on the channel, and exposes &op->done.base as the event that fires - * when delivery completes. Compose with select like any other event. + * when delivery resolves. Compose with select like any other event. * * The op embeds a chan_send_waker_t (so the chan's send list stays * uniform — receivers read .value at the same offset for both direct * and op senders) but rewires its fire callback: instead of resuming * an xstep, fire sets op->done. Polymorphism via the function pointer. * + * The done latch's payload is 1 on a successful delivery and 0 on + * close-drain (sender's value did not reach a receiver). The init path + * resolves to 0 inline if the channel is already closed. + * * Lifecycle: init → wait on &op->done.base → deinit. Always deinit; - * it's a no-op after delivery and unparks the chan-side waker if not. */ + * it's a no-op after resolution and unparks the chan-side waker if not. */ typedef struct { chan_send_waker_t csw; /* parked on chan; fire overridden */ chan_t *chan; @@ -423,6 +531,7 @@ typedef struct queue { queue_policy_t policy; waker_t *send_head, *send_tail; waker_t *recv_head, *recv_tail; + bool closed; } queue_t; extern const event_vtable_t _queue_recv_vt; @@ -437,6 +546,7 @@ static inline void queue_init(queue_t *q, uintptr_t *buf, size_t cap, q->policy = policy; q->send_head = q->send_tail = NULL; q->recv_head = q->recv_tail = NULL; + q->closed = false; } static inline event_t *queue_recv_event(queue_t *q) { return &q->recv; } @@ -450,22 +560,58 @@ bool queue_try_send(queue_t *q, uintptr_t value); /* Sender-side waker for QUEUE_BLOCK. Same shape as chan_send_waker_t: * a step_waker plus the value to deliver. Receivers read .value at the - * same offset so the queue's send list stays uniform. */ + * same offset so the queue's send list stays uniform. `delivered` is + * set by the closing side: true on a normal handoff, false on a close + * drain. */ typedef struct { step_waker_t sw; uintptr_t value; + bool delivered; } queue_send_waker_t; static inline void queue_send_waker_init(queue_send_waker_t *qsw, runtime_t *rt, xstep_t *s, uintptr_t value) { step_waker_init(&qsw->sw, rt, s); - qsw->value = value; + qsw->value = value; + qsw->delivered = false; } void queue_park_send (queue_t *q, queue_send_waker_t *qsw); void queue_unpark_send(queue_t *q, queue_send_waker_t *qsw); +/* Typed receive. Mirrors chan_recv: returns RECV_GOT (value popped from + * the buffer or directly from a parked sender), RECV_CLOSED (closed and + * drained), or RECV_EMPTY (caller may park). */ +recv_status_t queue_recv(queue_t *q, uintptr_t *out); + +/* Close the queue. Idempotent. Drains parked senders (delivered=false) + * and wakes parked receivers. After close, queue_try_send under + * QUEUE_BLOCK returns false; under QUEUE_DROP_* the value is silently + * dropped (returns true). queue_park_send after close is UB. */ +void queue_close(queue_t *q); +static inline bool queue_is_closed(const queue_t *q) { return q->closed; } + +/* Selectable send op. Mirrors chan_send_op_t: a per-call object that + * holds the value, parks on the queue (only meaningful under QUEUE_BLOCK), + * and exposes &op->done.base as the event that fires when the send + * resolves. The done latch's payload is 1 on a successful delivery + * (handed to a receiver, buffered, or accepted under DROP_*) and 0 on + * close-drain. + * + * Under DROP_* policies the send always resolves inline at init: the + * try_send path returns true and op->done fires immediately. */ +typedef struct { + queue_send_waker_t qsw; /* parked on queue; fire overridden */ + queue_t *queue; + latch_t done; +} queue_send_op_t; + +void _queue_send_op_fire(waker_t *w, uintptr_t value); + +void queue_send_op_init (queue_send_op_t *op, queue_t *q, uintptr_t value); +void queue_send_op_deinit(queue_send_op_t *op); + /* ---- Broadcast (slot) ------------------------------------------------- */ /* Re-armable signal carrying a "latest value" slot. Subscribers park on @@ -653,6 +799,36 @@ typedef struct timeout { void timeout_init (timeout_t *to, timers_t *ts, uint64_t deadline); void timeout_deinit(timeout_t *to); +/* ---- Ticker ----------------------------------------------------------- */ + +/* Re-armable transient signal driven by a timer source. Each time the + * underlying timer fires, the ticker computes the next deadline (period + * past the just-fired one, with skip-ahead for catch-up after overflow), + * reinstalls the timer, and fires every parked subscriber with the + * just-fired deadline as the payload. Subscribers that aren't parked at + * a fire miss it (transient — same coalescing semantics as broadcast). + * + * ticker_init(&t, ts, period, first_deadline); + * ... wait on ticker_event(&t), re-park to see further ticks ... + * ticker_deinit(&t); // cancels the in-flight timer + * + * event_try always returns false; subscribers wait for the *next* tick. */ +typedef struct ticker { + timer_t timer; + timers_t *src; + uint64_t period; + event_t base; + waker_t *waiters; + waker_t bridge; /* internal: parks on timer_event */ +} ticker_t; + +extern const event_vtable_t _ticker_vt; + +void ticker_init (ticker_t *t, timers_t *ts, + uint64_t period, uint64_t first_deadline); +void ticker_deinit(ticker_t *t); +static inline event_t *ticker_event(ticker_t *t) { return &t->base; } + /* ---- Task ------------------------------------------------------------- */ /* Lifecycle handle for a running xstep. Bundles a done latch (fires when @@ -698,4 +874,52 @@ static inline bool task_finished (const task_t *t) { return t->done.set; static inline bool task_is_cancelled(const task_t *t) { return cancel_is_set(&t->cancel); } static inline xstep_t *task_step (task_t *t) { return t->step; } +/* ---- Task group ------------------------------------------------------- */ + +/* Fan-in join + fan-out cancel for a dynamic set of tasks. Caller + * provides storage for each per-attachment record (group_attach_t), so + * the group itself does no allocation. + * + * task_group_attach(g, t, slot): + * countdown_add(g->pending, 1); slot's bridge waker parks on + * task_done_event(t); slot is appended to g's list. When the task's + * done fires, the bridge fires: it splices the slot out of g's list + * and calls countdown_done(&g->pending). Re-attaching a finished + * task is UB. + * + * task_group_cancel(g): walks the attachment list and cancel_set's + * each &slot->task->cancel, then cancel_set's g->cancel. Bodies that + * compose wait_or_cancel against task_cancel(t) wind down cooperatively; + * meanwhile, anything waiting on g->cancel observes the group-level + * signal directly. + * + * task_group_join_event(g): fires when every attached task has reached + * task_done. Compose with select / wait_or_cancel like any event. */ + +typedef struct group_attach group_attach_t; + +typedef struct task_group { + countdown_t pending; + cancel_t cancel; + group_attach_t *head, *tail; +} task_group_t; + +struct group_attach { + waker_t bridge; /* parked on task_done_event(task) */ + task_t *task; + task_group_t *group; + group_attach_t *next, *prev; +}; + +void task_group_init (task_group_t *g); +void task_group_attach (task_group_t *g, task_t *t, + group_attach_t *slot); +void task_group_cancel (task_group_t *g); +static inline event_t *task_group_join_event (task_group_t *g) { + return countdown_event(&g->pending); +} +static inline cancel_t *task_group_cancel_handle(task_group_t *g) { + return &g->cancel; +} + #endif /* EVENT_H */ diff --git a/tests/test_event.c b/tests/test_event.c @@ -1820,10 +1820,10 @@ typedef struct { task_t *task; int remaining; uintptr_t final; -} countdown_t; +} taskbody_t; -static xstep_result_t countdown_step(xstep_t *s, uintptr_t v) { - countdown_t *cd = (countdown_t *)s; +static xstep_result_t taskbody_step(xstep_t *s, uintptr_t v) { + taskbody_t *cd = (taskbody_t *)s; (void)v; if (task_is_cancelled(cd->task)) { task_done(cd->task, 0); /* cooperative unwind */ @@ -1836,8 +1836,8 @@ static xstep_result_t countdown_step(xstep_t *s, uintptr_t v) { return (xstep_result_t){0, XSTEP_SUSPENDED}; } -static void countdown_init(countdown_t *cd, task_t *t, int n, uintptr_t final) { - cd->base = (xstep_t){.step = countdown_step, .status = XSTEP_INIT}; +static void taskbody_init(taskbody_t *cd, task_t *t, int n, uintptr_t final) { + cd->base = (xstep_t){.step = taskbody_step, .status = XSTEP_INIT}; cd->task = t; cd->remaining = n; cd->final = final; @@ -1847,9 +1847,9 @@ static void test_task_state_machine_join(void) { /* Drive a countdown SM as a task; join via task_done_event. */ runtime_t rt; rt_init(&rt); - countdown_t cd; + taskbody_t cd; task_t t; - countdown_init(&cd, &t, 3, 0xAAAA); + taskbody_init(&cd, &t, 3, 0xAAAA); task_init(&t, &cd.base); assert(!task_finished(&t)); @@ -1871,9 +1871,9 @@ static void test_task_state_machine_cancel(void) { * the cooperative-unwind sentinel value (here, 0). */ runtime_t rt; rt_init(&rt); - countdown_t cd; + taskbody_t cd; task_t t; - countdown_init(&cd, &t, 100, 0xAAAA); + taskbody_init(&cd, &t, 100, 0xAAAA); task_init(&t, &cd.base); /* A second xstep waiting for completion via wait_or_cancel-style @@ -1899,6 +1899,846 @@ static void test_task_state_machine_cancel(void) { assert(joiner.got == 0); } +/* ---- Countdown ----------------------------------------------------- */ + +static void test_countdown_basic(void) { + countdown_t cd; countdown_init(&cd, 3); + assert(!countdown_fired(&cd)); + countdown_done(&cd); + countdown_done(&cd); + assert(!countdown_fired(&cd)); + countdown_done(&cd); + assert(countdown_fired(&cd)); + + uintptr_t v = 0xBAD; + assert(event_try(countdown_event(&cd), &v)); + assert(v == 0); +} + +static void test_countdown_zero_init(void) { + /* n=0: latch fires immediately. */ + countdown_t cd; countdown_init(&cd, 0); + assert(countdown_fired(&cd)); +} + +static void test_countdown_add(void) { + countdown_t cd; countdown_init(&cd, 1); + countdown_add(&cd, 2); + countdown_done(&cd); + countdown_done(&cd); + assert(!countdown_fired(&cd)); + countdown_done(&cd); + assert(countdown_fired(&cd)); +} + +static void test_countdown_park_wake(void) { + runtime_t rt; rt_init(&rt); + countdown_t cd; countdown_init(&cd, 2); + + waiter_t w; waiter_init(&w, &rt, countdown_event(&cd)); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + countdown_done(&cd); + assert(rt.head == NULL); + countdown_done(&cd); + rt_run(&rt, 0); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 0); +} + +/* ---- Notify -------------------------------------------------------- */ + +static void test_notify_try_always_false(void) { + notify_t n; notify_init(&n); + assert(!event_try(notify_event(&n), NULL)); + notify_one(&n); /* empty: no-op */ + notify_all(&n); /* empty: no-op */ + assert(!event_try(notify_event(&n), NULL)); +} + +static void test_notify_one_fifo(void) { + runtime_t rt; rt_init(&rt); + notify_t n; notify_init(&n); + + waiter_t a, b, c; + waiter_init(&a, &rt, notify_event(&n)); + waiter_init(&b, &rt, notify_event(&n)); + waiter_init(&c, &rt, notify_event(&n)); + xstep(&a.base, 0); xstep(&b.base, 0); xstep(&c.base, 0); + + notify_one(&n); + rt_run(&rt, 0); + assert(xstep_status(&a.base) == XSTEP_DEAD); + assert(xstep_status(&b.base) == XSTEP_SUSPENDED); + + notify_one(&n); + rt_run(&rt, 0); + assert(xstep_status(&b.base) == XSTEP_DEAD); + assert(xstep_status(&c.base) == XSTEP_SUSPENDED); + + notify_one(&n); + rt_run(&rt, 0); + assert(xstep_status(&c.base) == XSTEP_DEAD); + assert(n.head == NULL); + + notify_one(&n); /* empty: no-op */ + assert(rt.head == NULL); +} + +static void test_notify_all(void) { + runtime_t rt; rt_init(&rt); + notify_t n; notify_init(&n); + + waiter_t a, b, c; + waiter_init(&a, &rt, notify_event(&n)); + waiter_init(&b, &rt, notify_event(&n)); + waiter_init(&c, &rt, notify_event(&n)); + xstep(&a.base, 0); xstep(&b.base, 0); xstep(&c.base, 0); + + notify_all(&n); + assert(n.head == NULL); + rt_run(&rt, 0); + assert(xstep_status(&a.base) == XSTEP_DEAD); + assert(xstep_status(&b.base) == XSTEP_DEAD); + assert(xstep_status(&c.base) == XSTEP_DEAD); +} + +static void test_notify_select(void) { + runtime_t rt; rt_init(&rt); + notify_t n; notify_init(&n); + latch_t l; latch_init(&l); + + select_event_t sel; + select_input_t inputs[2]; + event_t *srcs[2] = {notify_event(&n), &l.base}; + select_event_init(&sel, inputs, 2, srcs); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + + notify_one(&n); + rt_run(&rt, 0); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 0); + assert(l.waiters == NULL); + + select_event_deinit(&sel); +} + +static void test_notify_unpark(void) { + runtime_t rt; rt_init(&rt); + notify_t n; notify_init(&n); + + step_waker_t s1, s2, s3; + step_waker_init(&s1, &rt, (xstep_t *)0x1); + step_waker_init(&s2, &rt, (xstep_t *)0x2); + step_waker_init(&s3, &rt, (xstep_t *)0x3); + event_park(notify_event(&n), &s1.base); + event_park(notify_event(&n), &s2.base); + event_park(notify_event(&n), &s3.base); + + event_unpark(notify_event(&n), &s2.base); + event_unpark(notify_event(&n), &s2.base); /* idempotent */ + + /* FIFO head is s1; pop s1, then s3 remains. */ + notify_one(&n); + assert(n.head == &s3.base); + event_unpark(notify_event(&n), &s3.base); + assert(n.head == NULL); +} + +/* ---- Mutex --------------------------------------------------------- */ + +static void test_mutex_basic(void) { + runtime_t rt; rt_init(&rt); + mutex_t mu; mutex_init(&mu); + + assert(event_try(mutex_event(&mu), NULL)); + assert(!event_try(mutex_event(&mu), NULL)); + + sem_acquirer_t b; sem_acquirer_init(&b, &rt, &mu); + xstep(&b.base, 0); + assert(xstep_status(&b.base) == XSTEP_SUSPENDED); + + mutex_release(&mu); + rt_run(&rt, 0); + assert(xstep_status(&b.base) == XSTEP_DEAD); + assert(b.got); +} + +/* ---- Queue send op ------------------------------------------------- */ + +static void test_queue_send_op_inline(void) { + /* BLOCK with room: init delivers immediately, payload 1. */ + uintptr_t buf[2]; + queue_t q; queue_init(&q, buf, 2, QUEUE_BLOCK); + + queue_send_op_t op; + queue_send_op_init(&op, &q, 0xAA); + assert(op.done.set); + uintptr_t v; + assert(event_try(&op.done.base, &v)); + assert(v == 1); + assert(q.len == 1); + + queue_send_op_deinit(&op); +} + +static void test_queue_send_op_blocks(void) { + /* BLOCK full: parks. Recv frees a slot, sender drains, op fires. */ + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_BLOCK); + assert(queue_try_send(&q, 1)); + + queue_send_op_t op; + queue_send_op_init(&op, &q, 2); + assert(!op.done.set); + assert(q.send_head == &op.qsw.sw.base); + + uintptr_t v; + assert(event_try(queue_recv_event(&q), &v) && v == 1); + assert(op.done.set); + uintptr_t pv; + assert(event_try(&op.done.base, &pv)); + assert(pv == 1); + assert(event_try(queue_recv_event(&q), &v) && v == 2); + + queue_send_op_deinit(&op); +} + +static void test_queue_send_op_drop_inline(void) { + /* DROP_NEWEST: op resolves inline regardless of fullness. */ + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_DROP_NEWEST); + assert(queue_try_send(&q, 1)); + + queue_send_op_t op; + queue_send_op_init(&op, &q, 2); /* dropped */ + assert(op.done.set); + uintptr_t v; + assert(event_try(&op.done.base, &v)); + assert(v == 1); + + queue_send_op_deinit(&op); +} + +static void test_queue_send_op_select_loses(void) { + runtime_t rt; rt_init(&rt); + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_BLOCK); + latch_t l; latch_init(&l); + assert(queue_try_send(&q, 1)); + + queue_send_op_t op; + queue_send_op_init(&op, &q, 2); + assert(q.send_head == &op.qsw.sw.base); + + select_event_t sel; + select_input_t inputs[2]; + event_t *srcs[2] = {&op.done.base, &l.base}; + select_event_init(&sel, inputs, 2, srcs); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + + latch_set(&l, 0xCAFE); + rt_run(&rt, 0); + assert(w.got == 1); + assert(q.send_head == &op.qsw.sw.base); + + select_event_deinit(&sel); + queue_send_op_deinit(&op); + assert(q.send_head == NULL); +} + +/* ---- Chan close ---------------------------------------------------- */ + +typedef struct { + xstep_t base; + chan_t *c; + runtime_t *rt; + step_waker_t sw; + int phase; + recv_status_t status; + uintptr_t value; +} chan_rx_t; + +static xstep_result_t chan_rx_step(xstep_t *s, uintptr_t v) { + chan_rx_t *r = (chan_rx_t *)s; + (void)v; + switch (r->phase) { + case 0: { + recv_status_t st = chan_recv(r->c, &r->value); + if (st != RECV_EMPTY) { + r->status = st; + r->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + step_waker_init(&r->sw, r->rt, &r->base); + event_park(&r->c->recv, &r->sw.base); + r->phase = 1; + return (xstep_result_t){0, XSTEP_SUSPENDED}; + } + case 1: + r->status = chan_recv(r->c, &r->value); + r->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + __builtin_unreachable(); +} + +static void chan_rx_init(chan_rx_t *r, runtime_t *rt, chan_t *c) { + r->base = (xstep_t){.step = chan_rx_step, .status = XSTEP_INIT}; + r->c = c; + r->rt = rt; + r->phase = 0; + r->status = RECV_EMPTY; + r->value = 0; +} + +static void test_chan_close_empty(void) { + chan_t c; chan_init(&c); + assert(!chan_is_closed(&c)); + chan_close(&c); + assert(chan_is_closed(&c)); + chan_close(&c); /* idempotent */ + + uintptr_t v; + assert(chan_recv(&c, &v) == RECV_CLOSED); +} + +static void test_chan_close_wakes_receiver(void) { + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + chan_rx_t r; chan_rx_init(&r, &rt, &c); + xstep(&r.base, 0); + assert(xstep_status(&r.base) == XSTEP_SUSPENDED); + assert(c.recv_head == &r.sw.base); + + chan_close(&c); + assert(c.recv_head == NULL); + rt_run(&rt, 0); + assert(xstep_status(&r.base) == XSTEP_DEAD); + assert(r.status == RECV_CLOSED); +} + +typedef struct { + xstep_t base; + chan_t *c; + runtime_t *rt; + chan_send_waker_t csw; + uintptr_t value; + int phase; + bool done; + bool delivered; +} chan_tx_t; + +static xstep_result_t chan_tx_step(xstep_t *s, uintptr_t v) { + chan_tx_t *snd = (chan_tx_t *)s; + (void)v; + switch (snd->phase) { + case 0: + if (chan_try_send(snd->c, snd->value)) { + snd->done = true; + snd->delivered = true; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + if (chan_is_closed(snd->c)) { + snd->done = true; + snd->delivered = false; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + chan_send_waker_init(&snd->csw, snd->rt, &snd->base, snd->value); + chan_park_send(snd->c, &snd->csw); + snd->phase = 1; + return (xstep_result_t){0, XSTEP_SUSPENDED}; + case 1: + snd->done = true; + snd->delivered = snd->csw.delivered; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + __builtin_unreachable(); +} + +static void chan_tx_init(chan_tx_t *snd, runtime_t *rt, chan_t *c, uintptr_t value) { + snd->base = (xstep_t){.step = chan_tx_step, .status = XSTEP_INIT}; + snd->c = c; + snd->rt = rt; + snd->value = value; + snd->phase = 0; + snd->done = false; + snd->delivered = false; +} + +static void test_chan_close_drains_senders(void) { + runtime_t rt; rt_init(&rt); + chan_t c; chan_init(&c); + + chan_tx_t s[3]; + for (int i = 0; i < 3; i++) { + chan_tx_init(&s[i], &rt, &c, (uintptr_t)(100 + i)); + xstep(&s[i].base, 0); + } + assert(c.send_head == &s[0].csw.sw.base); + + chan_close(&c); + assert(c.send_head == NULL); + rt_run(&rt, 0); + for (int i = 0; i < 3; i++) { + assert(xstep_status(&s[i].base) == XSTEP_DEAD); + assert(s[i].done); + assert(!s[i].delivered); + } +} + +static void test_chan_try_send_after_close(void) { + chan_t c; chan_init(&c); + chan_close(&c); + assert(!chan_try_send(&c, 1)); +} + +static void test_chan_send_op_close_inline(void) { + chan_t c; chan_init(&c); + chan_close(&c); + + chan_send_op_t op; + chan_send_op_init(&op, &c, 0xABBA); + assert(op.done.set); + uintptr_t v; + assert(event_try(&op.done.base, &v)); + assert(v == 0); /* delivered=false */ + chan_send_op_deinit(&op); +} + +static void test_chan_send_op_close_drain(void) { + chan_t c; chan_init(&c); + + chan_send_op_t op; + chan_send_op_init(&op, &c, 0xABBA); + assert(!op.done.set); + + chan_close(&c); + assert(op.done.set); + uintptr_t v; + assert(event_try(&op.done.base, &v)); + assert(v == 0); + chan_send_op_deinit(&op); +} + +static void test_chan_send_op_delivered_payload(void) { + chan_t c; chan_init(&c); + + chan_send_op_t op; + chan_send_op_init(&op, &c, 0xFEED); + assert(!op.done.set); + + uintptr_t v; + assert(chan_recv(&c, &v) == RECV_GOT); + assert(v == 0xFEED); + assert(op.done.set); + uintptr_t pv; + assert(event_try(&op.done.base, &pv)); + assert(pv == 1); + + chan_send_op_deinit(&op); +} + +/* ---- Queue close --------------------------------------------------- */ + +typedef struct { + xstep_t base; + queue_t *q; + runtime_t *rt; + step_waker_t sw; + int phase; + recv_status_t status; + uintptr_t value; +} queue_rx_t; + +static xstep_result_t queue_rx_step(xstep_t *s, uintptr_t v) { + queue_rx_t *r = (queue_rx_t *)s; + (void)v; + switch (r->phase) { + case 0: { + recv_status_t st = queue_recv(r->q, &r->value); + if (st != RECV_EMPTY) { + r->status = st; + r->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + step_waker_init(&r->sw, r->rt, &r->base); + event_park(queue_recv_event(r->q), &r->sw.base); + r->phase = 1; + return (xstep_result_t){0, XSTEP_SUSPENDED}; + } + case 1: + r->status = queue_recv(r->q, &r->value); + r->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + __builtin_unreachable(); +} + +static void queue_rx_init(queue_rx_t *r, runtime_t *rt, queue_t *q) { + r->base = (xstep_t){.step = queue_rx_step, .status = XSTEP_INIT}; + r->q = q; + r->rt = rt; + r->phase = 0; + r->status = RECV_EMPTY; + r->value = 0; +} + +static void test_queue_close_drains_buffered_then_eof(void) { + uintptr_t buf[3]; + queue_t q; queue_init(&q, buf, 3, QUEUE_BLOCK); + assert(queue_try_send(&q, 1)); + assert(queue_try_send(&q, 2)); + + queue_close(&q); + assert(queue_is_closed(&q)); + + uintptr_t v; + assert(queue_recv(&q, &v) == RECV_GOT && v == 1); + assert(queue_recv(&q, &v) == RECV_GOT && v == 2); + assert(queue_recv(&q, &v) == RECV_CLOSED); + queue_close(&q); /* idempotent */ + assert(queue_recv(&q, &v) == RECV_CLOSED); +} + +static void test_queue_close_wakes_receiver(void) { + runtime_t rt; rt_init(&rt); + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_BLOCK); + + queue_rx_t r; queue_rx_init(&r, &rt, &q); + xstep(&r.base, 0); + assert(xstep_status(&r.base) == XSTEP_SUSPENDED); + + queue_close(&q); + rt_run(&rt, 0); + assert(xstep_status(&r.base) == XSTEP_DEAD); + assert(r.status == RECV_CLOSED); +} + +typedef struct { + xstep_t base; + queue_t *q; + runtime_t *rt; + queue_send_waker_t qsw; + uintptr_t value; + int phase; + bool done; + bool delivered; +} queue_tx_t; + +static xstep_result_t queue_tx_step(xstep_t *s, uintptr_t v) { + queue_tx_t *snd = (queue_tx_t *)s; + (void)v; + switch (snd->phase) { + case 0: + if (queue_try_send(snd->q, snd->value)) { + snd->done = true; + snd->delivered = true; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + if (queue_is_closed(snd->q)) { + snd->done = true; + snd->delivered = false; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + queue_send_waker_init(&snd->qsw, snd->rt, &snd->base, snd->value); + queue_park_send(snd->q, &snd->qsw); + snd->phase = 1; + return (xstep_result_t){0, XSTEP_SUSPENDED}; + case 1: + snd->done = true; + snd->delivered = snd->qsw.delivered; + snd->phase = 2; + return (xstep_result_t){0, XSTEP_DEAD}; + } + __builtin_unreachable(); +} + +static void queue_tx_init(queue_tx_t *s, runtime_t *rt, queue_t *q, uintptr_t v) { + s->base = (xstep_t){.step = queue_tx_step, .status = XSTEP_INIT}; + s->q = q; + s->rt = rt; + s->value = v; + s->phase = 0; + s->done = false; + s->delivered = false; +} + +static void test_queue_close_drains_senders(void) { + runtime_t rt; rt_init(&rt); + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_BLOCK); + assert(queue_try_send(&q, 1)); + + queue_tx_t s[2]; + for (int i = 0; i < 2; i++) { + queue_tx_init(&s[i], &rt, &q, (uintptr_t)(100 + i)); + xstep(&s[i].base, 0); + assert(xstep_status(&s[i].base) == XSTEP_SUSPENDED); + } + + queue_close(&q); + rt_run(&rt, 0); + for (int i = 0; i < 2; i++) { + assert(xstep_status(&s[i].base) == XSTEP_DEAD); + assert(!s[i].delivered); + } +} + +static void test_queue_try_send_after_close_block(void) { + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_BLOCK); + queue_close(&q); + assert(!queue_try_send(&q, 42)); +} + +static void test_queue_try_send_after_close_drop(void) { + uintptr_t buf[1]; + queue_t q1; queue_init(&q1, buf, 1, QUEUE_DROP_NEWEST); + queue_close(&q1); + assert(queue_try_send(&q1, 42)); + + uintptr_t buf2[1]; + queue_t q2; queue_init(&q2, buf2, 1, QUEUE_DROP_OLDEST); + queue_close(&q2); + assert(queue_try_send(&q2, 42)); +} + +static void test_queue_send_op_close_drain(void) { + uintptr_t buf[1]; + queue_t q; queue_init(&q, buf, 1, QUEUE_BLOCK); + assert(queue_try_send(&q, 1)); + + queue_send_op_t op; + queue_send_op_init(&op, &q, 2); + assert(!op.done.set); + + queue_close(&q); + assert(op.done.set); + uintptr_t v; + assert(event_try(&op.done.base, &v)); + assert(v == 0); + queue_send_op_deinit(&op); +} + +/* ---- Ticker -------------------------------------------------------- */ + +typedef struct { + xstep_t base; + ticker_t *t; + runtime_t *rt; + step_waker_t sw; + int n_seen; + int target; + uint64_t last; +} ticker_sub_t; + +static xstep_result_t ticker_sub_step(xstep_t *s, uintptr_t v) { + ticker_sub_t *sub = (ticker_sub_t *)s; + if (sub->n_seen > 0) sub->last = (uint64_t)v; + if (sub->n_seen >= sub->target) return (xstep_result_t){v, XSTEP_DEAD}; + event_park(ticker_event(sub->t), &sub->sw.base); + sub->n_seen++; + return (xstep_result_t){0, XSTEP_SUSPENDED}; +} + +static void ticker_sub_init(ticker_sub_t *s, runtime_t *rt, ticker_t *t, int target) { + s->base = (xstep_t){.step = ticker_sub_step, .status = XSTEP_INIT}; + s->t = t; + s->rt = rt; + s->n_seen = 0; + s->target = target; + s->last = 0; + step_waker_init(&s->sw, rt, &s->base); +} + +static void test_ticker_single_tick(void) { + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + rt_attach_timers(&rt, &h.base); + + ticker_t ti; + ticker_init(&ti, &h.base, 100, 100); + + ticker_sub_t s; ticker_sub_init(&s, &rt, &ti, 1); + xstep(&s.base, 0); + assert(xstep_status(&s.base) == XSTEP_SUSPENDED); + + rt_run(&rt, 100); + assert(xstep_status(&s.base) == XSTEP_DEAD); + assert(s.last == 100); + + ticker_deinit(&ti); +} + +static void test_ticker_multiple_ticks(void) { + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + rt_attach_timers(&rt, &h.base); + + ticker_t ti; + ticker_init(&ti, &h.base, 50, 50); + + ticker_sub_t s; ticker_sub_init(&s, &rt, &ti, 2); + xstep(&s.base, 0); + + rt_run(&rt, 50); + assert(xstep_status(&s.base) == XSTEP_SUSPENDED); + assert(s.last == 50); + + rt_run(&rt, 100); + assert(xstep_status(&s.base) == XSTEP_DEAD); + assert(s.last == 100); /* fired = 50 + period */ + + ticker_deinit(&ti); +} + +static void test_ticker_deinit_before_fire(void) { + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + rt_attach_timers(&rt, &h.base); + + ticker_t ti; + ticker_init(&ti, &h.base, 100, 100); + assert(ti.timer.in_heap); + + ticker_deinit(&ti); + assert(!ti.timer.in_heap); + + uint64_t out; + assert(!timers_peek(&h.base, &out)); + rt_run(&rt, 10000); +} + +static void test_ticker_select(void) { + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + rt_attach_timers(&rt, &h.base); + + ticker_t ti; ticker_init(&ti, &h.base, 100, 100); + cancel_t c; cancel_init(&c); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, ticker_event(&ti), &c); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + + rt_run(&rt, 100); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == WAIT_OK); + assert(inputs[WAIT_OK].value == 100); + + select_event_deinit(&sel); + ticker_deinit(&ti); +} + +static void test_ticker_try_always_false(void) { + pairing_heap_t h; pairing_heap_init(&h); + ticker_t ti; ticker_init(&ti, &h.base, 100, 100); + assert(!event_try(ticker_event(&ti), NULL)); + ticker_deinit(&ti); +} + +/* ---- Task group ---------------------------------------------------- */ + +static void test_task_group_empty_join(void) { + /* An empty (no-attach) group's join is not fired — fan-in semantics + * require at least one attach. */ + task_group_t g; task_group_init(&g); + assert(!event_try(task_group_join_event(&g), NULL)); +} + +static void test_task_group_join_after_all_finish(void) { + runtime_t rt; rt_init(&rt); (void)rt; + + taskbody_t a_body, b_body; + task_t a, b; + taskbody_init(&a_body, &a, 1, 0xA); + task_init(&a, &a_body.base); + taskbody_init(&b_body, &b, 1, 0xB); + task_init(&b, &b_body.base); + + task_group_t g; task_group_init(&g); + group_attach_t sa, sb; + task_group_attach(&g, &a, &sa); + task_group_attach(&g, &b, &sb); + assert(!event_try(task_group_join_event(&g), NULL)); + + while (xstep_status(&a_body.base) != XSTEP_DEAD) xstep(&a_body.base, 0); + assert(task_finished(&a)); + assert(!event_try(task_group_join_event(&g), NULL)); + + while (xstep_status(&b_body.base) != XSTEP_DEAD) xstep(&b_body.base, 0); + assert(task_finished(&b)); + uintptr_t v; + assert(event_try(task_group_join_event(&g), &v)); + assert(v == 0); +} + +static void test_task_group_cancel_fanout(void) { + runtime_t rt; rt_init(&rt); (void)rt; + + taskbody_t a_body, b_body; + task_t a, b; + taskbody_init(&a_body, &a, 100, 0xA); + task_init(&a, &a_body.base); + taskbody_init(&b_body, &b, 100, 0xB); + task_init(&b, &b_body.base); + + task_group_t g; task_group_init(&g); + group_attach_t sa, sb; + task_group_attach(&g, &a, &sa); + task_group_attach(&g, &b, &sb); + + task_group_cancel(&g); + assert(task_is_cancelled(&a)); + assert(task_is_cancelled(&b)); + assert(cancel_is_set(task_group_cancel_handle(&g))); + + while (xstep_status(&a_body.base) != XSTEP_DEAD) xstep(&a_body.base, 0); + while (xstep_status(&b_body.base) != XSTEP_DEAD) xstep(&b_body.base, 0); + + assert(event_try(task_group_join_event(&g), NULL)); +} + +static void test_task_group_finished_detaches(void) { + taskbody_t a_body, b_body; + task_t a, b; + taskbody_init(&a_body, &a, 1, 0xA); + task_init(&a, &a_body.base); + taskbody_init(&b_body, &b, 100, 0xB); + task_init(&b, &b_body.base); + + task_group_t g; task_group_init(&g); + group_attach_t sa, sb; + task_group_attach(&g, &a, &sa); + task_group_attach(&g, &b, &sb); + assert(g.head == &sa && g.tail == &sb); + + while (xstep_status(&a_body.base) != XSTEP_DEAD) xstep(&a_body.base, 0); + /* a's bridge has fired → sa is detached. */ + assert(g.head == &sb && g.tail == &sb); + assert(sa.next == NULL && sa.prev == NULL); + + /* Cancel only touches still-attached b. */ + task_group_cancel(&g); + assert(task_is_cancelled(&b)); +} + /* ---- Runtime test -------------------------------------------------- */ static void test_runtime_drains(void) { @@ -2006,6 +2846,50 @@ int main(void) { test_task_state_machine_join(); test_task_state_machine_cancel(); + test_countdown_basic(); + test_countdown_zero_init(); + test_countdown_add(); + test_countdown_park_wake(); + + test_notify_try_always_false(); + test_notify_one_fifo(); + test_notify_all(); + test_notify_select(); + test_notify_unpark(); + + test_mutex_basic(); + + test_queue_send_op_inline(); + test_queue_send_op_blocks(); + test_queue_send_op_drop_inline(); + test_queue_send_op_select_loses(); + + test_chan_close_empty(); + test_chan_close_wakes_receiver(); + test_chan_close_drains_senders(); + test_chan_try_send_after_close(); + test_chan_send_op_close_inline(); + test_chan_send_op_close_drain(); + test_chan_send_op_delivered_payload(); + + test_queue_close_drains_buffered_then_eof(); + test_queue_close_wakes_receiver(); + test_queue_close_drains_senders(); + test_queue_try_send_after_close_block(); + test_queue_try_send_after_close_drop(); + test_queue_send_op_close_drain(); + + test_ticker_single_tick(); + test_ticker_multiple_ticks(); + test_ticker_deinit_before_fire(); + test_ticker_select(); + test_ticker_try_always_false(); + + test_task_group_empty_join(); + test_task_group_join_after_all_finish(); + test_task_group_cancel_fanout(); + test_task_group_finished_detaches(); + printf("ok\n"); return 0; }