xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit e2e8f4aa33773708c90d37689849c56cf3be6c24
parent 4d2f6297ebb294ab4f31bdaae93404c817e770df
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 11:06:34 -0700

consolidate into xco.h xco.c

Diffstat:
MMakefile | 25+++++++++++++------------
Darch/arm64/xco_arch.c | 132-------------------------------------------------------------------------------
Darch/arm64/xco_arch.h | 27---------------------------
Devent.c | 1166-------------------------------------------------------------------------------
Devent.h | 925-------------------------------------------------------------------------------
Aplatform/arm64/xco_platform.c | 132+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aplatform/arm64/xco_platform.h | 27+++++++++++++++++++++++++++
Aplatform/xco_platform_internal.h | 34++++++++++++++++++++++++++++++++++
Mtests/test_event.c | 3+--
Mxco.c | 1212++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mxco.h | 1019+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Dxco_platform.h | 34----------------------------------
Dxstep.h | 71-----------------------------------------------------------------------
13 files changed, 2404 insertions(+), 2403 deletions(-)

diff --git a/Makefile b/Makefile @@ -1,22 +1,23 @@ # xco — minimal asymmetric coroutines. # -# Per-arch selection: ARCH names a directory under arch/. The build -# adds -Iarch/$(ARCH) (so xco_arch.h resolves to that arch's copy) and -# compiles arch/$(ARCH)/xco_arch.c. +# Per-platform selection: PLATFORM names a directory under platform/. +# The build adds -Iplatform/$(PLATFORM) (so xco_platform_internal.h +# resolves to that platform's copy) and compiles +# platform/$(PLATFORM)/xco_platform.c. # # All build artifacts land under build/, mirroring the source tree. -CC ?= cc -AR ?= ar -CFLAGS ?= -std=c11 -Wall -Wextra -O2 -g +CC ?= cc +AR ?= ar +CFLAGS ?= -std=c11 -Wall -Wextra -O2 -g -ARCH ?= $(shell uname -m | sed 's/aarch64/arm64/') -ARCHDIR := arch/$(ARCH) -BUILD := build +PLATFORM ?= $(shell uname -m | sed 's/aarch64/arm64/') +PLATFORMDIR := platform/$(PLATFORM) +BUILD := build -CPPFLAGS += -I. -I$(ARCHDIR) +CPPFLAGS += -Iplatform -I$(PLATFORMDIR) -SRCS := xco.c event.c $(ARCHDIR)/xco_arch.c +SRCS := xco.c $(PLATFORMDIR)/xco_platform.c OBJS := $(SRCS:%.c=$(BUILD)/%.o) LIB := $(BUILD)/libxco.a @@ -34,7 +35,7 @@ $(BUILD)/%.o: %.c $(BUILD)/test_%: tests/test_%.c $(LIB) @mkdir -p $(dir $@) - $(CC) $(CPPFLAGS) $(CFLAGS) -o $@ $< $(LIB) + $(CC) -I. $(CPPFLAGS) $(CFLAGS) -o $@ $< $(LIB) test: $(TEST_BINS) @for t in $(TEST_BINS); do echo "==> $$t"; $$t || exit 1; done diff --git a/arch/arm64/xco_arch.c b/arch/arm64/xco_arch.c @@ -1,132 +0,0 @@ -/* - * arch/arm64/xco_arch.c — AArch64 init + switch + trampoline thunk. - * - * The switch primitive and the trampoline thunk are written as - * file-scope global asm rather than __attribute__((naked)) functions: - * GCC silently ignores `naked` on AArch64, so naked-function inline - * asm only works under Clang. File-scope __asm__ is portable across - * both GCC and Clang and keeps everything in one .c file. - * - * Layout offsets used in the asm: - * 0 regs[0..9] x19-x28 - * 80 regs[10..11] fp (x29), lr (x30) - * 96 regs[12] sp - * 104 fp_regs[0..7] d8-d15 - * - * Symbol naming: asm labels are built with the compiler-provided - * __USER_LABEL_PREFIX__ (empty on ELF, "_" on Mach-O) so the labels - * emitted here match what the C compiler generates for references to - * xco_platform_switch / xco_platform_trampoline_thunk on either OS. - */ - -#include "xco_arch.h" -#include "xco_platform.h" - -#include <assert.h> -#include <stddef.h> -#include <stdint.h> -#include <string.h> - -/* The platform context type forward-declared in xco_platform.h. */ -struct xco_platform_ctx { - uintptr_t regs[13]; - uint64_t fp_regs[8]; -} __attribute__((aligned(16))); - -/* ---- layout checks -------------------------------------------------- - * - * xco.c sized its embedded ctx buffer using these macros — verify - * they still match the real struct, and verify the byte offsets the - * asm hardcodes are still correct. - */ -_Static_assert(sizeof(struct xco_platform_ctx) == _XCO_CTX_SIZE, - "_XCO_CTX_SIZE out of sync with struct layout"); -_Static_assert(_Alignof(struct xco_platform_ctx) == _XCO_CTX_ALIGN, - "_XCO_CTX_ALIGN out of sync with struct layout"); -_Static_assert(sizeof(uintptr_t) == 8, "AArch64"); -_Static_assert(offsetof(struct xco_platform_ctx, regs) == 0, ""); -_Static_assert(offsetof(struct xco_platform_ctx, fp_regs) == 104, ""); - -/* Forward declaration: the symbol is defined by the file-scope asm - * below. Its address is taken in xco_platform_init. */ -extern void xco_platform_trampoline_thunk(void); - -/* ---- xco_platform_init --------------------------------------------- */ - -void xco_platform_init(xco_platform_ctx_t *ctx, - void *stack_base, size_t stack_len, - void (*entry)(uintptr_t)) { - assert(((uintptr_t)stack_base & (XCO_STACK_ALIGN - 1)) == 0); - - /* AArch64 stacks grow down. Compute and align the top. */ - uintptr_t top = (uintptr_t)stack_base + stack_len; - top &= ~(uintptr_t)(XCO_STACK_ALIGN - 1); - - memset(ctx, 0, sizeof(*ctx)); - - /* The switch primitive will load these into x19, fp, lr, sp. */ - ctx->regs[0] = (uintptr_t)entry; /* x19 */ - ctx->regs[10] = 0; /* fp */ - ctx->regs[11] = (uintptr_t)xco_platform_trampoline_thunk; /* lr */ - ctx->regs[12] = top; /* sp */ -} - -/* ---- xco_platform_switch and trampoline thunk (file-scope asm) ----- - * - * xco_platform_switch(from x0, to x1, value x2) -> x0: - * saves callee-saved state into *from, restores from *to, and - * delivers `value` to the destination as x0 — which is the first-arg - * register on the trampoline thunk's first run, and the return-value - * register when resuming a previously-suspended switch. - * - * xco_platform_trampoline_thunk(): - * On entry x0 = value, x19 = entry; tail-calls entry(value); brk if - * it ever returns. - */ - -/* Stringify-after-expand so __USER_LABEL_PREFIX__ (a token, possibly - * empty) becomes a string literal we can concatenate into the asm. */ -#define XCO_STR_(x) #x -#define XCO_STR(x) XCO_STR_(x) -#define XCO_SYM(name) XCO_STR(__USER_LABEL_PREFIX__) #name - -__asm__ ( - ".text\n" - ".align 4\n" - - ".globl " XCO_SYM(xco_platform_switch) "\n" - XCO_SYM(xco_platform_switch) ":\n" - " stp x19, x20, [x0, #0]\n" - " stp x21, x22, [x0, #16]\n" - " stp x23, x24, [x0, #32]\n" - " stp x25, x26, [x0, #48]\n" - " stp x27, x28, [x0, #64]\n" - " stp fp, lr, [x0, #80]\n" - " mov x9, sp\n" - " str x9, [x0, #96]\n" - " stp d8, d9, [x0, #104]\n" - " stp d10, d11, [x0, #120]\n" - " stp d12, d13, [x0, #136]\n" - " stp d14, d15, [x0, #152]\n" - - " ldp d8, d9, [x1, #104]\n" - " ldp d10, d11, [x1, #120]\n" - " ldp d12, d13, [x1, #136]\n" - " ldp d14, d15, [x1, #152]\n" - " ldp x19, x20, [x1, #0]\n" - " ldp x21, x22, [x1, #16]\n" - " ldp x23, x24, [x1, #32]\n" - " ldp x25, x26, [x1, #48]\n" - " ldp x27, x28, [x1, #64]\n" - " ldp fp, lr, [x1, #80]\n" - " ldr x9, [x1, #96]\n" - " mov sp, x9\n" - - " mov x0, x2\n" - " ret\n" - - ".globl " XCO_SYM(xco_platform_trampoline_thunk) "\n" - XCO_SYM(xco_platform_trampoline_thunk) ":\n" - " blr x19\n" - " brk #0\n" -); diff --git a/arch/arm64/xco_arch.h b/arch/arm64/xco_arch.h @@ -1,27 +0,0 @@ -/* - * xco_arch.h (arm64) — sizing and alignment constants. - * - * Found by the build's -I path (arch/arm64) and included by xco.h. - * Only #defines: the actual platform context struct definition is - * private to arch/arm64/xco_arch.c. - */ - -#ifndef XCO_ARCH_H -#define XCO_ARCH_H - -/* Stack alignment required by AAPCS at function call boundaries. */ -#define XCO_STACK_ALIGN 16 - -/* Implementation-detail constants used by xco.c to embed the platform - * context inside xco_impl_t without seeing the struct definition. - * arch/arm64/xco_arch.c verifies these match the real layout. */ -#define _XCO_CTX_SIZE 176 /* sizeof(struct xco_platform_ctx) */ -#define _XCO_CTX_ALIGN 16 /* _Alignof(struct xco_platform_ctx) */ - -/* Size and alignment of xco_t._private. Must hold an xco_impl_t - * (defined in xco.c): a platform context + ~3 words of bookkeeping. - * xco.c verifies sufficiency with _Static_assert. */ -#define XCO_SIZE (_XCO_CTX_SIZE + 48) -#define XCO_ALIGN _XCO_CTX_ALIGN - -#endif /* XCO_ARCH_H */ diff --git a/event.c b/event.c @@ -1,1166 +0,0 @@ -/* - * event.c — implementations for event.h. - * - * Each event type is a small struct with a static vtable. Waitlists - * are singly-linked LIFO (push at head, pop at head); fire-all detaches - * the whole list before iterating so callbacks can do anything, - * including re-park or unpark sibling wakers, without iterator hazards. - */ - -#include "event.h" - -#include <assert.h> -#include <stddef.h> - -/* ---- Runtime ---------------------------------------------------------- */ - -/* rt_init and rt_enqueue are defined inline in event.h. */ - -static waker_t *rt_dequeue(runtime_t *rt) { - waker_t *w = rt->head; - if (!w) return NULL; - rt->head = w->next; - if (!rt->head) rt->tail = NULL; - /* Hand the waker back fully detached. Every fire path already clears - * prev before firing (and we just walked off the ready-queue), so - * w->prev is NULL in practice — making it explicit here means - * step_waker users can re-park without re-init, regardless of which - * fire path resumed them. */ - w->next = NULL; - w->prev = NULL; - return w; -} - -void rt_run(runtime_t *rt, uint64_t now) { - /* The runtime ready queue holds only step-wakers. Other waker - * shapes (e.g. select_input) fire synchronously inside event - * notify paths and never reach here. - * - * If a timer source is attached, advance it at the top of each - * iteration: any timer whose deadline <= now fires, enqueueing its - * step-wakers, which the inner loop then drains. A step may insert - * a fresh already-expired timer; the outer loop catches it on the - * next pass. Termination: each pass either drains a non-empty - * queue or exits, and advance only fires timers it then removes, - * so total work is bounded. */ - for (;;) { - if (rt->timers) timers_advance(rt->timers, now); - if (!rt->head) return; - for (waker_t *w; (w = rt_dequeue(rt));) { - step_waker_t *sw = (step_waker_t *)w; - xstep(sw->step, sw->resume_value); - } - } -} - -/* ---- Step waker ------------------------------------------------------- */ - -/* Exposed (with leading underscore) so the inline step_waker_init in - * event.h can install it without dragging the body into the header. */ -void _step_waker_fire(waker_t *w, uintptr_t value) { - step_waker_t *sw = (step_waker_t *)w; - sw->resume_value = value; - rt_enqueue(sw->rt, w); -} - -/* ---- Latch ------------------------------------------------------------ */ - -static bool latch_try(event_t *e, uintptr_t *out) { - latch_t *l = (latch_t *)e; - if (!l->set) return false; - if (out) *out = l->value; - return true; -} - -static void latch_park(event_t *e, waker_t *w) { - latch_t *l = (latch_t *)e; - /* Single-threaded contract: caller must have just observed try=false. */ - assert(!l->set); - /* One-waker invariant: w must arrive clean. Detach paths (latch_set - * iterator, latch_unpark, select_event_deinit) all leave wakers in - * this state. A trip here means a double-park bug. */ - assert(!w->prev && !w->next); - w->next = l->waiters; - if (l->waiters) l->waiters->prev = w; - l->waiters = w; -} - -static void latch_unpark(event_t *e, waker_t *w) { - latch_t *l = (latch_t *)e; - /* Detect "not on this list" via the invariant maintained by park - * and the detach paths: a parked waker has prev set OR is the - * head; a detached one has prev == NULL and is not the head. */ - if (!w->prev && l->waiters != w) return; - if (w->prev) w->prev->next = w->next; - else l->waiters = w->next; - if (w->next) w->next->prev = w->prev; - w->prev = w->next = NULL; -} - -/* Exposed so the inline latch_init in event.h can reference it. */ -const event_vtable_t _latch_vt = { - .try_ = latch_try, - .park = latch_park, - .unpark = latch_unpark, -}; - -void latch_set(latch_t *l, uintptr_t value) { - if (l->set) return; - l->set = true; - l->value = value; - - /* Detach the whole waitlist before firing. A waker's fire callback - * might do anything (including unpark a sibling on another event), - * but it cannot mutate this list — it's already gone. */ - waker_t *w = l->waiters; - l->waiters = NULL; - while (w) { - waker_t *next = w->next; /* save before waker_fire clears */ - waker_fire(w, value); - w = next; - } -} - -/* ---- Semaphore -------------------------------------------------------- */ - -/* FIFO doubly-linked waitlist, same shape as the chan_q_* helpers below - * but specialized to a semaphore_t (so we don't have to thread the - * head/tail pair through chan_q_*). */ - -static void sem_q_push(semaphore_t *s, waker_t *w) { - assert(!w->prev && !w->next); - w->prev = s->tail; - w->next = NULL; - if (s->tail) s->tail->next = w; - else s->head = w; - s->tail = w; -} - -static waker_t *sem_q_pop(semaphore_t *s) { - waker_t *w = s->head; - if (!w) return NULL; - s->head = w->next; - if (s->head) s->head->prev = NULL; - else s->tail = NULL; - w->prev = w->next = NULL; - return w; -} - -static void sem_q_remove(semaphore_t *s, waker_t *w) { - if (!w->prev && s->head != w) return; - if (w->prev) w->prev->next = w->next; - else s->head = w->next; - if (w->next) w->next->prev = w->prev; - else s->tail = w->prev; - w->prev = w->next = NULL; -} - -static bool semaphore_try(event_t *e, uintptr_t *out) { - semaphore_t *s = (semaphore_t *)e; - if (s->permits == 0) return false; - s->permits--; - if (out) *out = 1; - return true; -} - -static void semaphore_park(event_t *e, waker_t *w) { - semaphore_t *s = (semaphore_t *)e; - /* Single-threaded contract: caller just observed try=false (permits=0). */ - assert(s->permits == 0); - sem_q_push(s, w); -} - -static void semaphore_unpark(event_t *e, waker_t *w) { - semaphore_t *s = (semaphore_t *)e; - sem_q_remove(s, w); -} - -const event_vtable_t _semaphore_acquire_vt = { - .try_ = semaphore_try, - .park = semaphore_park, - .unpark = semaphore_unpark, -}; - -void semaphore_release(semaphore_t *s, size_t n) { - /* Hand a permit directly to each FIFO waiter, then drop any leftover - * into the count. Direct handoff prevents a fresh try from jumping - * the queue: an arriving acquirer that called try_ would see permits=0 - * and park behind the existing waiters until everyone ahead has been - * served. */ - while (n > 0) { - waker_t *w = sem_q_pop(s); - if (!w) break; - n--; - /* Fire value is conventional 1 — "you got a permit". Step-waker - * users ignore the value; select inputs capture it as the input's - * value field. */ - waker_fire(w, 1); - } - s->permits += n; -} - -/* ---- Select / all-of -------------------------------------------------- */ - -/* One fire callback serves both modes. A counter `remaining` is decremented - * on each fire; done is set when it hits 0. select inits remaining=1 (any - * one fire closes); allof inits remaining=n (every input must fire). The - * disarm-siblings loop is a no-op for already-fired wakers, so it runs - * uniformly: for select it cleans up still-parked losers, for allof it - * does nothing (every sibling is already detached by its source). */ -static void select_input_fire(waker_t *w, uintptr_t value) { - select_input_t *in = (select_input_t *)w; - select_event_t *s = in->parent; - - /* Defensive: guard against any straggler that escaped disarm. */ - if (s->done.set) return; - /* Capture the input's payload before resuming anyone. Sticky - * sources also keep it on themselves; transient sources (channels) - * deliver only here, so this is the only durable record. */ - in->value = value; - if (--s->remaining > 0) return; - - size_t i = (size_t)(in - s->inputs); - /* Disarm anyone still parked so their wakers don't dangle on input - * waitlists past s's lifetime. Idempotent on already-detached wakers. */ - for (size_t j = 0; j < s->n; j++) { - if (j != i) event_unpark(s->inputs[j].src, &s->inputs[j].w); - } - latch_set(&s->done, i); -} - -void select_event_init(select_event_t *s, - select_input_t *inputs, size_t n, - event_t *const *srcs) { - latch_init(&s->done); - s->inputs = inputs; - s->n = n; - s->remaining = 1; /* any one fire closes the wait */ - - /* Fast path: an input already ready. Fire and skip parking entirely - * so deinit has nothing to disarm. */ - for (size_t i = 0; i < n; i++) { - uintptr_t v; - if (event_try(srcs[i], &v)) { - inputs[i].value = v; /* captured for inputs[winner].value */ - latch_set(&s->done, i); - return; - } - } - - for (size_t i = 0; i < n; i++) { - inputs[i].w.next = NULL; - inputs[i].w.prev = NULL; - inputs[i].w.fire = select_input_fire; - inputs[i].src = srcs[i]; - inputs[i].parent = s; - inputs[i].value = 0; - event_park(srcs[i], &inputs[i].w); - } -} - -void allof_event_init(select_event_t *s, - select_input_t *inputs, size_t n, - event_t *const *srcs) { - latch_init(&s->done); - s->inputs = inputs; - s->n = n; - s->remaining = n; /* every input must fire to close */ - - if (n == 0) { latch_set(&s->done, 0); return; } - - /* Initialize each input then try-or-park. An already-ready input is - * consumed inline (value captured, remaining--, no parking); the - * rest park. If everyone was inline, fire done at the end. */ - for (size_t i = 0; i < n; i++) { - inputs[i].w.next = NULL; - inputs[i].w.prev = NULL; - inputs[i].w.fire = select_input_fire; - inputs[i].src = srcs[i]; - inputs[i].parent = s; - inputs[i].value = 0; - - uintptr_t v; - if (event_try(srcs[i], &v)) { - inputs[i].value = v; - s->remaining--; - } else { - event_park(srcs[i], &inputs[i].w); - } - } - - /* All inline-ready: fire done with the last input's index, matching - * the "closing index" semantics of the parked path. */ - if (s->remaining == 0) latch_set(&s->done, n - 1); -} - -void select_event_deinit(select_event_t *s) { - /* done.set => the closing fire already disarmed everyone (or the - * fast path skipped parking entirely). Otherwise — possible after a - * partial allof — some inputs may still be parked; unpark is - * idempotent for already-detached wakers. */ - if (s->done.set) return; - for (size_t i = 0; i < s->n; i++) { - event_unpark(s->inputs[i].src, &s->inputs[i].w); - } -} - -/* ---- Channel ---------------------------------------------------------- */ - -/* Doubly-linked FIFO push/pop on a channel waitlist. Same shape as - * latch's list operations but with an explicit tail so arrival order - * is preserved (unlike latch, where waiter order is irrelevant). */ - -static void chan_q_push(waker_t **head, waker_t **tail, waker_t *w) { - assert(!w->prev && !w->next); - w->prev = *tail; - w->next = NULL; - if (*tail) (*tail)->next = w; - else *head = w; - *tail = w; -} - -static waker_t *chan_q_pop(waker_t **head, waker_t **tail) { - waker_t *w = *head; - if (!w) return NULL; - *head = w->next; - if (*head) (*head)->prev = NULL; - else *tail = NULL; - w->prev = w->next = NULL; - return w; -} - -static void chan_q_remove(waker_t **head, waker_t **tail, waker_t *w) { - /* Same not-on-list test as latch_unpark: a queued waker has prev - * set OR is the head; a detached one has prev == NULL and isn't - * the head. */ - if (!w->prev && *head != w) return; - if (w->prev) w->prev->next = w->next; - else *head = w->next; - if (w->next) w->next->prev = w->prev; - else *tail = w->prev; - w->prev = w->next = NULL; -} - -/* Recover the chan_t from its embedded recv event. */ -static inline chan_t *chan_of_recv(event_t *e) { - return (chan_t *)((char *)e - offsetof(chan_t, recv)); -} - -static bool chan_recv_try(event_t *e, uintptr_t *out) { - chan_t *c = chan_of_recv(e); - waker_t *w = chan_q_pop(&c->send_head, &c->send_tail); - if (w) { - /* w is &csw->sw.base; sw is the first field of chan_send_waker_t, - * and base is the first field of step_waker_t, so addresses align. */ - chan_send_waker_t *csw = (chan_send_waker_t *)w; - if (out) *out = csw->value; - csw->delivered = true; - /* Resume the sender. The fire value is unused for step-wakers; - * for op senders, _chan_send_op_fire reads csw->delivered. */ - waker_fire(w, 0); - return true; - } - /* Close makes the recv event "ready" with no value: the receiver is - * expected to call chan_recv to learn it's RECV_CLOSED. *out is - * undefined in that case (set to 0 here for determinism). */ - if (c->closed) { - if (out) *out = 0; - return true; - } - return false; -} - -static void chan_recv_park(event_t *e, waker_t *w) { - chan_t *c = chan_of_recv(e); - chan_q_push(&c->recv_head, &c->recv_tail, w); -} - -static void chan_recv_unpark(event_t *e, waker_t *w) { - chan_t *c = chan_of_recv(e); - chan_q_remove(&c->recv_head, &c->recv_tail, w); -} - -const event_vtable_t _chan_recv_vt = { - .try_ = chan_recv_try, - .park = chan_recv_park, - .unpark = chan_recv_unpark, -}; - -bool chan_try_send(chan_t *c, uintptr_t value) { - if (c->closed) return false; - waker_t *w = chan_q_pop(&c->recv_head, &c->recv_tail); - if (!w) return false; - /* Hand the value to the recv-side waker. step_waker stashes it as - * resume_value; select_input_fire stashes it in input.value. */ - waker_fire(w, value); - return true; -} - -void chan_park_send(chan_t *c, chan_send_waker_t *csw) { - /* park_send after close is UB — caller must check chan_is_closed - * (typically via chan_try_send returning false plus chan_is_closed). */ - assert(!c->closed); - chan_q_push(&c->send_head, &c->send_tail, &csw->sw.base); -} - -void chan_unpark_send(chan_t *c, chan_send_waker_t *csw) { - chan_q_remove(&c->send_head, &c->send_tail, &csw->sw.base); -} - -recv_status_t chan_recv(chan_t *c, uintptr_t *out) { - waker_t *w = chan_q_pop(&c->send_head, &c->send_tail); - if (w) { - chan_send_waker_t *csw = (chan_send_waker_t *)w; - if (out) *out = csw->value; - csw->delivered = true; - waker_fire(w, 0); - return RECV_GOT; - } - if (c->closed) return RECV_CLOSED; - return RECV_EMPTY; -} - -void chan_close(chan_t *c) { - if (c->closed) return; - c->closed = true; - - /* Drain parked senders with delivered=false. waker_fire detaches - * before invoking the callback, so re-park inside fire (e.g. to - * land on the runtime ready queue) is safe. */ - waker_t *w; - while ((w = chan_q_pop(&c->send_head, &c->send_tail)) != NULL) { - chan_send_waker_t *csw = (chan_send_waker_t *)w; - csw->delivered = false; - waker_fire(w, 0); - } - /* Wake parked receivers so they observe RECV_CLOSED via chan_recv. - * Fire value is irrelevant — the recv event_try will return true - * because c->closed is set, but receivers should use chan_recv. */ - while ((w = chan_q_pop(&c->recv_head, &c->recv_tail)) != NULL) { - waker_fire(w, 0); - } -} - -/* ---- Send op (selectable send) ---------------------------------------- */ - -void _chan_send_op_fire(waker_t *w, uintptr_t value) { - /* Receiver hands no payload to the sender on delivery — the sender - * learns whether its value reached someone via op->csw.delivered, - * set by chan_recv* (true) or chan_close (false). */ - (void)value; - /* csw is the first field of chan_send_op_t; sw is first of - * chan_send_waker_t; base is first of step_waker_t. All offsets - * coincide, so w aliases op. */ - chan_send_op_t *op = (chan_send_op_t *)w; - latch_set(&op->done, op->csw.delivered ? 1 : 0); -} - -void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value) { - /* The embedded chan_send_waker carries the value and provides the - * waker layout the chan's send list expects. rt/step are unused — - * fire goes straight to the latch, no scheduler hop. */ - chan_send_waker_init(&op->csw, NULL, NULL, value); - op->csw.sw.base.fire = _chan_send_op_fire; - op->chan = c; - latch_init(&op->done); - - if (c->closed) { - /* Closed channel: no delivery possible, resolve immediately. */ - latch_set(&op->done, 0); - return; - } - if (chan_try_send(c, value)) { - /* Inline delivery: no parking, done set immediately. */ - op->csw.delivered = true; - latch_set(&op->done, 1); - return; - } - chan_park_send(c, &op->csw); -} - -void chan_send_op_deinit(chan_send_op_t *op) { - /* If delivered, the waker is already off the send list. If not - * (e.g., select cancellation), pull it off so it doesn't dangle. */ - if (op->done.set) return; - chan_unpark_send(op->chan, &op->csw); -} - -/* ---- Queue ------------------------------------------------------------ */ - -/* The FIFO list helpers (chan_q_push/pop/remove) are reused for the - * queue's send and recv waitlists — same shape, same invariants. The - * ring buffer lives in caller-provided storage; we just track head and - * len. cap == 0 leaves the buffer logic dormant: every send either - * direct-hands or parks, every recv either takes from a parked sender - * or parks — i.e. it degenerates to chan rendezvous. */ - -static inline queue_t *queue_of_recv(event_t *e) { - return (queue_t *)((char *)e - offsetof(queue_t, recv)); -} - -static inline void queue_push_buf(queue_t *q, uintptr_t v) { - assert(q->len < q->cap); - q->buf[(q->head + q->len) % q->cap] = v; - q->len++; -} - -static inline uintptr_t queue_pop_buf(queue_t *q) { - assert(q->len > 0); - uintptr_t v = q->buf[q->head]; - q->head = (q->head + 1) % q->cap; - q->len--; - return v; -} - -/* Pop one parked sender's value into the now-free buffer slot, firing - * the sender. Caller must have ensured a free slot exists (just popped - * from the buffer, or cap > len). Maintains FIFO across the buffer + - * sender-waitlist boundary: oldest buffered values come out before any - * sender's value (which was queued later). No-op if no sender parked. */ -static void queue_drain_one_sender(queue_t *q) { - if (!q->send_head) return; - waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); - queue_send_waker_t *qsw = (queue_send_waker_t *)w; - queue_push_buf(q, qsw->value); - qsw->delivered = true; - /* Fire after pushing so the sender sees its delivery as complete. */ - waker_fire(w, 0); -} - -static bool queue_recv_try(event_t *e, uintptr_t *out) { - queue_t *q = queue_of_recv(e); - if (q->len > 0) { - uintptr_t v = queue_pop_buf(q); - if (out) *out = v; - queue_drain_one_sender(q); - return true; - } - /* Empty buffer. If a sender is parked here it can only mean cap==0 - * (otherwise the sender would have used the buffer). Hand directly. */ - if (q->send_head) { - waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); - queue_send_waker_t *qsw = (queue_send_waker_t *)w; - if (out) *out = qsw->value; - qsw->delivered = true; - waker_fire(w, 0); - return true; - } - /* Closed and drained: receivers learn EOF via queue_recv; out is - * undefined. */ - if (q->closed) { - if (out) *out = 0; - return true; - } - return false; -} - -static void queue_recv_park(event_t *e, waker_t *w) { - queue_t *q = queue_of_recv(e); - chan_q_push(&q->recv_head, &q->recv_tail, w); -} - -static void queue_recv_unpark(event_t *e, waker_t *w) { - queue_t *q = queue_of_recv(e); - chan_q_remove(&q->recv_head, &q->recv_tail, w); -} - -const event_vtable_t _queue_recv_vt = { - .try_ = queue_recv_try, - .park = queue_recv_park, - .unpark = queue_recv_unpark, -}; - -bool queue_try_send(queue_t *q, uintptr_t value) { - if (q->closed) { - /* Send-after-close. BLOCK: no delivery, signal failure. DROP_*: - * silently drop (queue policy already says "may be lost"). */ - if (q->policy == QUEUE_BLOCK) return false; - return true; - } - /* Direct handoff first: parked receivers always win over the buffer. - * This is the rendezvous case and the cap==0 case. */ - waker_t *w = chan_q_pop(&q->recv_head, &q->recv_tail); - if (w) { - waker_fire(w, value); - return true; - } - if (q->len < q->cap) { - queue_push_buf(q, value); - return true; - } - /* Buffer full and no waiting receiver. */ - switch (q->policy) { - case QUEUE_BLOCK: - return false; - case QUEUE_DROP_NEWEST: - return true; - case QUEUE_DROP_OLDEST: - (void)queue_pop_buf(q); - queue_push_buf(q, value); - return true; - } - __builtin_unreachable(); -} - -void queue_park_send(queue_t *q, queue_send_waker_t *qsw) { - /* DROP_* never parks (try_send always returns true); only valid - * for BLOCK. park_send after close is UB. */ - assert(q->policy == QUEUE_BLOCK); - assert(!q->closed); - chan_q_push(&q->send_head, &q->send_tail, &qsw->sw.base); -} - -void queue_unpark_send(queue_t *q, queue_send_waker_t *qsw) { - chan_q_remove(&q->send_head, &q->send_tail, &qsw->sw.base); -} - -recv_status_t queue_recv(queue_t *q, uintptr_t *out) { - if (q->len > 0) { - uintptr_t v = queue_pop_buf(q); - if (out) *out = v; - queue_drain_one_sender(q); - return RECV_GOT; - } - if (q->send_head) { - waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); - queue_send_waker_t *qsw = (queue_send_waker_t *)w; - if (out) *out = qsw->value; - qsw->delivered = true; - waker_fire(w, 0); - return RECV_GOT; - } - if (q->closed) return RECV_CLOSED; - return RECV_EMPTY; -} - -void queue_close(queue_t *q) { - if (q->closed) return; - q->closed = true; - - /* Drain parked senders with delivered=false. Senders only park - * under BLOCK, so this is no-op for DROP_* (their waitlist is - * always empty). */ - waker_t *w; - while ((w = chan_q_pop(&q->send_head, &q->send_tail)) != NULL) { - queue_send_waker_t *qsw = (queue_send_waker_t *)w; - qsw->delivered = false; - waker_fire(w, 0); - } - /* Wake parked receivers so they can observe closed via queue_recv. - * Receivers may still drain buffered values first — queue_recv's - * RECV_GOT path is hit before the RECV_CLOSED branch. */ - while ((w = chan_q_pop(&q->recv_head, &q->recv_tail)) != NULL) { - waker_fire(w, 0); - } -} - -/* ---- Queue send op (selectable send) ---------------------------------- */ - -void _queue_send_op_fire(waker_t *w, uintptr_t value) { - (void)value; - queue_send_op_t *op = (queue_send_op_t *)w; - latch_set(&op->done, op->qsw.delivered ? 1 : 0); -} - -void queue_send_op_init(queue_send_op_t *op, queue_t *q, uintptr_t value) { - queue_send_waker_init(&op->qsw, NULL, NULL, value); - op->qsw.sw.base.fire = _queue_send_op_fire; - op->queue = q; - latch_init(&op->done); - - if (q->closed) { - /* BLOCK: no delivery; DROP_*: dropped per policy. Either way the - * value did not reach a receiver, so delivered=false. */ - latch_set(&op->done, 0); - return; - } - if (queue_try_send(q, value)) { - /* Inline accept: handoff to receiver, buffered, or DROP_* policy - * accepted it. */ - op->qsw.delivered = true; - latch_set(&op->done, 1); - return; - } - /* Only BLOCK policy with full buffer reaches here. */ - queue_park_send(q, &op->qsw); -} - -void queue_send_op_deinit(queue_send_op_t *op) { - if (op->done.set) return; - queue_unpark_send(op->queue, &op->qsw); -} - -/* ---- Broadcast (slot) ------------------------------------------------- */ - -/* The waitlist uses the same doubly-linked LIFO shape as latch — there - * is no FIFO requirement because publish wakes everyone at once. The - * key difference from latch is publish vs set: publish never marks a - * sticky bit on the event, so try always returns false (subscribers - * wait for the *next* publish), and the waitlist is reusable across - * publishes — subscribers re-park to receive subsequent values. */ - -static bool broadcast_try(event_t *e, uintptr_t *out) { - (void)e; (void)out; - return false; -} - -static void broadcast_park(event_t *e, waker_t *w) { - broadcast_t *b = (broadcast_t *)e; - assert(!w->prev && !w->next); - w->next = b->waiters; - if (b->waiters) b->waiters->prev = w; - b->waiters = w; -} - -static void broadcast_unpark(event_t *e, waker_t *w) { - broadcast_t *b = (broadcast_t *)e; - if (!w->prev && b->waiters != w) return; - if (w->prev) w->prev->next = w->next; - else b->waiters = w->next; - if (w->next) w->next->prev = w->prev; - w->prev = w->next = NULL; -} - -const event_vtable_t _broadcast_vt = { - .try_ = broadcast_try, - .park = broadcast_park, - .unpark = broadcast_unpark, -}; - -void broadcast_publish(broadcast_t *b, uintptr_t value) { - b->has_value = true; - b->value = value; - - /* Detach the waitlist before iterating — same hazard-free pattern as - * latch_set. A waker's fire callback may re-park itself on us (the - * common case for a re-arming subscriber); decoupling means that - * re-park lands on a fresh waitlist, not on the snapshot we're - * walking. */ - waker_t *w = b->waiters; - b->waiters = NULL; - while (w) { - waker_t *next = w->next; /* save before waker_fire clears */ - waker_fire(w, value); - w = next; - } -} - -/* ---- Notify ----------------------------------------------------------- */ - -/* Doubly-linked FIFO waitlist (same shape as the chan/queue waitlists). - * notify_one fires the head; notify_all detaches the whole list before - * iterating so callbacks can re-park onto a fresh waitlist without - * iterator hazards (same pattern as latch_set). event_try is always - * false: notify is purely transient. */ - -static bool notify_try(event_t *e, uintptr_t *out) { - (void)e; (void)out; - return false; -} - -static void notify_park(event_t *e, waker_t *w) { - notify_t *n = (notify_t *)e; - chan_q_push(&n->head, &n->tail, w); -} - -static void notify_unpark(event_t *e, waker_t *w) { - notify_t *n = (notify_t *)e; - chan_q_remove(&n->head, &n->tail, w); -} - -const event_vtable_t _notify_vt = { - .try_ = notify_try, - .park = notify_park, - .unpark = notify_unpark, -}; - -void notify_one(notify_t *n) { - waker_t *w = chan_q_pop(&n->head, &n->tail); - if (!w) return; - waker_fire(w, 0); -} - -void notify_all(notify_t *n) { - /* Detach before iterating: re-parking inside fire lands on a fresh - * (empty) list. Walk the snapshot via saved next pointers. */ - waker_t *w = n->head; - n->head = n->tail = NULL; - while (w) { - waker_t *next = w->next; - waker_fire(w, 0); - w = next; - } -} - -/* ---- Timer ------------------------------------------------------------ */ - -/* The timer-as-event surface is just the embedded latch's vtable: try - * checks done.set, park/unpark mutate done.waiters. The timer source - * triggers the latch from advance(). No bespoke timer vtable needed. */ - -void timer_init(timer_t *t, timers_t *ts, uint64_t deadline) { - latch_init(&t->done); - t->deadline = deadline; - t->src = ts; - t->in_heap = false; - t->child = NULL; - t->prev = NULL; - t->next = NULL; - timers_insert(ts, t); /* sets in_heap = true */ -} - -void timer_deinit(timer_t *t) { - /* Idempotent. After fire, in_heap is false (advance popped us); - * after explicit cancel, also false. Otherwise we're still queued. */ - if (t->in_heap) timers_cancel(t->src, t); -} - -/* ---- Pairing heap ----------------------------------------------------- */ - -/* Standard intrusive pairing heap. Each node carries three link fields: - * - * child : head of children sibling list (NULL if leaf). - * prev : parent if this node is its parent's first child; - * previous sibling otherwise; NULL only for a detached tree - * root (including h->root). - * next : next sibling, or NULL for the last child / a detached root. - * - * The "prev points to either parent or a sibling" trick lets us splice - * a node out in O(1) without an extra parent pointer: parent->child==n - * distinguishes "first child" from "non-first sibling." - * - * meld picks the smaller-deadline root as winner and grafts the loser - * as its new first child. Pop-min and remove both rebuild via the - * classic two-pass pairwise merge of the resulting children list. */ - -/* Merge two detached subtree roots (each with prev=next=NULL). Returns - * the merged root (also detached: prev=next=NULL). */ -static timer_t *ph_meld(timer_t *a, timer_t *b) { - if (!a) return b; - if (!b) return a; - timer_t *small, *large; - if (a->deadline <= b->deadline) { small = a; large = b; } - else { small = b; large = a; } - /* Graft `large` as the new first child of `small`. */ - large->next = small->child; - if (small->child) small->child->prev = large; - large->prev = small; /* parent link via prev */ - small->child = large; - small->prev = NULL; - small->next = NULL; - return small; -} - -/* Two-pass pairwise meld of a children sibling list. Detaches each node - * before melding so meld inputs satisfy its prev=next=NULL contract. - * The output has prev=next=NULL. */ -static timer_t *ph_merge_pairs(timer_t *first) { - /* Pass 1: walk the sibling list left-to-right, melding consecutive - * pairs. Chain results via `next` (ab)use as a temporary list link. */ - timer_t *list = NULL; - while (first) { - timer_t *a = first; - timer_t *b = a->next; - timer_t *rest = b ? b->next : NULL; - a->prev = a->next = NULL; - if (b) { b->prev = b->next = NULL; } - timer_t *m = ph_meld(a, b); - m->next = list; /* prepend to pass-1 list */ - list = m; - first = rest; - } - /* Pass 2: meld the pass-1 list into a single root. */ - timer_t *acc = NULL; - while (list) { - timer_t *nxt = list->next; - list->prev = NULL; - list->next = NULL; - acc = ph_meld(acc, list); - list = nxt; - } - return acc; -} - -/* Detach n from the tree (must currently be in the heap). Returns the - * (possibly new) main-heap root. n is left fully detached: child still - * points to its subtree, but prev/next are NULL — caller decides what - * to do with that subtree. */ -static timer_t *ph_detach(pairing_heap_t *h, timer_t *n) { - if (h->root == n) { - /* n is the main root; pop it and rebuild from its children. */ - timer_t *new_root = ph_merge_pairs(n->child); - n->child = NULL; - n->prev = NULL; - n->next = NULL; - return new_root; - } - /* n has a parent (recorded via prev — either as its parent's first - * child or as some sibling's successor). Splice out of the sibling - * list. */ - if (n->prev->child == n) { - /* First child: parent's child link skips us. */ - n->prev->child = n->next; - } else { - /* Mid/last sibling: previous sibling's next skips us. */ - n->prev->next = n->next; - } - if (n->next) n->next->prev = n->prev; - n->prev = NULL; - n->next = NULL; - /* The main-heap root is unchanged structurally; the caller of this - * function decides how (or whether) to reintroduce n's subtree. */ - return h->root; -} - -static void ph_insert(timers_t *ts, timer_t *t) { - pairing_heap_t *h = (pairing_heap_t *)ts; - /* Singleton tree (prev/next/child already NULL via timer_init). */ - h->root = ph_meld(h->root, t); - t->in_heap = true; -} - -static void ph_cancel(timers_t *ts, timer_t *t) { - pairing_heap_t *h = (pairing_heap_t *)ts; - if (!t->in_heap) return; - h->root = ph_detach(h, t); - /* Now meld t's subtree (its children) back into the main heap. */ - timer_t *sub = ph_merge_pairs(t->child); - t->child = NULL; - h->root = ph_meld(h->root, sub); - t->in_heap = false; -} - -static void ph_advance(timers_t *ts, uint64_t now) { - pairing_heap_t *h = (pairing_heap_t *)ts; - /* Pop while the min-key timer is due. Each fire may run callbacks - * that insert *new* timers (with later deadlines, normally) — those - * land back in the heap, and we keep checking the root. */ - while (h->root && h->root->deadline <= now) { - timer_t *t = h->root; - h->root = ph_merge_pairs(t->child); - t->child = NULL; - t->prev = NULL; - t->next = NULL; - t->in_heap = false; - /* Trigger the latch: drains the waitlist and delivers the - * deadline as the fire payload. */ - latch_set(&t->done, (uintptr_t)t->deadline); - } -} - -static bool ph_peek(const timers_t *ts, uint64_t *out) { - const pairing_heap_t *h = (const pairing_heap_t *)ts; - if (!h->root) return false; - if (out) *out = h->root->deadline; - return true; -} - -const timers_vtable_t _pairing_heap_vt = { - .insert = ph_insert, - .cancel = ph_cancel, - .advance = ph_advance, - .peek = ph_peek, -}; - -/* ---- Timeout ---------------------------------------------------------- */ - -/* Bridge waker: parked on the timer's latch, fires the cancel when the - * timer fires. Two-step indirection so cancel can already have its own - * waiters (e.g. a wait_or_cancel select) without the timer's waitlist - * caring about cancel internals. */ -static void _timeout_bridge_fire(waker_t *w, uintptr_t value) { - (void)value; - timeout_t *to = (timeout_t *)((char *)w - offsetof(timeout_t, bridge)); - cancel_set(&to->cancel); -} - -void timeout_init(timeout_t *to, timers_t *ts, uint64_t deadline) { - timer_init(&to->timer, ts, deadline); - cancel_init(&to->cancel); - to->bridge.next = NULL; - to->bridge.prev = NULL; - to->bridge.fire = _timeout_bridge_fire; - /* Park the bridge on the timer. If the timer fires, latch_set - * detaches the bridge and calls our fire callback inline. */ - event_park(timer_event(&to->timer), &to->bridge); -} - -void timeout_deinit(timeout_t *to) { - /* unpark is idempotent (no-op if the bridge already fired or was - * never parked). timer_deinit is idempotent on the in_heap flag. */ - event_unpark(timer_event(&to->timer), &to->bridge); - timer_deinit(&to->timer); -} - -/* ---- Ticker ----------------------------------------------------------- */ - -/* The ticker's event surface uses the broadcast-style LIFO doubly-linked - * waitlist: subscribers are fired all-at-once on each tick, so order - * doesn't matter; doubly-linked gives O(1) unpark for cancellation. */ - -static bool ticker_try(event_t *e, uintptr_t *out) { - (void)e; (void)out; - return false; /* transient — wait for the next tick */ -} - -static void ticker_park(event_t *e, waker_t *w) { - ticker_t *t = (ticker_t *)((char *)e - offsetof(ticker_t, base)); - assert(!w->prev && !w->next); - w->next = t->waiters; - if (t->waiters) t->waiters->prev = w; - t->waiters = w; -} - -static void ticker_unpark(event_t *e, waker_t *w) { - ticker_t *t = (ticker_t *)((char *)e - offsetof(ticker_t, base)); - if (!w->prev && t->waiters != w) return; - if (w->prev) w->prev->next = w->next; - else t->waiters = w->next; - if (w->next) w->next->prev = w->prev; - w->prev = w->next = NULL; -} - -const event_vtable_t _ticker_vt = { - .try_ = ticker_try, - .park = ticker_park, - .unpark = ticker_unpark, -}; - -/* Bridge waker: parks on the underlying timer's latch. On fire, compute - * the next deadline (skip-ahead-safe), reinstall the timer, re-park the - * bridge on the new timer, then fire every parked subscriber with the - * just-fired deadline. The waitlist is detached before iteration so - * subscribers can re-park inside their fire callbacks. */ -static void _ticker_bridge_fire(waker_t *w, uintptr_t value) { - ticker_t *t = (ticker_t *)((char *)w - offsetof(ticker_t, bridge)); - uint64_t fired = (uint64_t)value; - uint64_t next = fired + t->period; - /* Skip-ahead: in the rare overflow case (period = 0 or wraparound), - * step forward enough to keep next > fired. */ - if (next <= fired) { - next += ((fired - next) / t->period + 1) * t->period; - } - /* Reinstall the timer for the next tick. The latch's storage is - * reused — timer_init runs latch_init on it. */ - timer_init(&t->timer, t->src, next); - /* Bridge waker is fully detached (waker_fire just cleared its - * links); park it on the freshly-armed timer. */ - event_park(timer_event(&t->timer), &t->bridge); - - /* Fire the subscribers. Detach the waitlist first so re-park inside - * fire lands on the now-empty list. */ - waker_t *waiters = t->waiters; - t->waiters = NULL; - while (waiters) { - waker_t *nxt = waiters->next; - waker_fire(waiters, (uintptr_t)fired); - waiters = nxt; - } -} - -void ticker_init(ticker_t *t, timers_t *ts, - uint64_t period, uint64_t first_deadline) { - /* period must be positive — the skip-ahead computation in the bridge - * divides by period, and a zero-period ticker would loop forever - * inside ph_advance. */ - assert(period > 0); - t->base.vt = &_ticker_vt; - t->src = ts; - t->period = period; - t->waiters = NULL; - - timer_init(&t->timer, ts, first_deadline); - - t->bridge.next = NULL; - t->bridge.prev = NULL; - t->bridge.fire = _ticker_bridge_fire; - event_park(timer_event(&t->timer), &t->bridge); -} - -void ticker_deinit(ticker_t *t) { - /* Pull the bridge off the timer (no-op if already fired) and cancel - * the timer. Subscribers' wakers are the caller's storage; nothing - * to free here. */ - event_unpark(timer_event(&t->timer), &t->bridge); - timer_deinit(&t->timer); -} - -/* ---- Task group ------------------------------------------------------- */ - -/* Each attach contributes one to the countdown and parks a bridge waker - * on the task's done event. Bridge fire splices the slot out of the - * group's list and decrements the countdown — the join event fires when - * the last attached task finishes. - * - * Cancellation is fan-out: walk the list, set each task's cancel, then - * set the group-level cancel. Bodies cooperate by composing their work - * with task_cancel(self); the group-level cancel is for non-task - * waiters that want to react to "the group has been told to stop." */ - -static void _task_group_detach_slot(task_group_t *g, group_attach_t *slot) { - /* Doubly-linked, head/tail tracked; same shape as other waitlists. */ - if (slot->prev) slot->prev->next = slot->next; - else g->head = slot->next; - if (slot->next) slot->next->prev = slot->prev; - else g->tail = slot->prev; - slot->prev = slot->next = NULL; -} - -static void _task_group_bridge_fire(waker_t *w, uintptr_t value) { - (void)value; - group_attach_t *slot = (group_attach_t *)((char *)w - offsetof(group_attach_t, bridge)); - task_group_t *g = slot->group; - _task_group_detach_slot(g, slot); - countdown_done(&g->pending); -} - -void task_group_init(task_group_t *g) { - /* Don't go through countdown_init(0) — that fires the latch - * immediately, which would make the very first attach's - * countdown_add UB. The group's join must remain not-fired until at - * least one attached task has finished, so we open with - * remaining=0 and an unset latch. The first attach lifts remaining - * to 1, and matching countdown_dones bring it back to 0, firing - * the latch. */ - latch_init(&g->pending.done); - g->pending.remaining = 0; - cancel_init(&g->cancel); - g->head = g->tail = NULL; -} - -void task_group_attach(task_group_t *g, task_t *t, group_attach_t *slot) { - countdown_add(&g->pending, 1); - - slot->task = t; - slot->group = g; - - /* Append to the group's list (FIFO; ordering doesn't affect cancel - * fan-out semantics, but consistent with other waitlists in the - * codebase). */ - slot->prev = g->tail; - slot->next = NULL; - if (g->tail) g->tail->next = slot; - else g->head = slot; - g->tail = slot; - - slot->bridge.next = NULL; - slot->bridge.prev = NULL; - slot->bridge.fire = _task_group_bridge_fire; - - /* Park on the task's done event. If the task has already finished - * (re-attaching is UB per the contract, but if the task fired - * before attach finished initializing — e.g. an inline-spawned - * synchronous task), the latch_park assert would catch it. */ - event_park(task_done_event(t), &slot->bridge); -} - -void task_group_cancel(task_group_t *g) { - /* Fan-out cancel: signal each attached task. Walk the snapshot - * (cancel doesn't detach the slot — only task done does — so the - * list is stable across iteration). */ - for (group_attach_t *s = g->head; s; s = s->next) { - cancel_set(&s->task->cancel); - } - /* Group-level cancel for anyone awaiting "the group as a whole." */ - cancel_set(&g->cancel); -} diff --git a/event.h b/event.h @@ -1,925 +0,0 @@ -/* - * event.h — minimal pollable event substrate. C11. - * - * Three layers, built on xstep: - * - * waker_t Intrusive notification node. Holds a fire callback and a - * single next-pointer reused across event waitlists and the - * runtime's ready queue (a waker is on at most one list at - * a time). - * - * event_t Abstract pollable: try / park / unpark. Concrete events - * (latch, select, channel, timer, ...) embed event_t as - * their first member and supply a vtable. Every blocking - * primitive in this codebase is just an event. - * - * runtime_t Intrusive FIFO of step-wakers. rt_run drains it, - * resuming each xstep until none are ready. - * - * Concrete events provided here: - * latch_t One-shot sticky bool with optional payload. - * select_event_t Fires when any of N input events fires; carries - * the index of the winner. Composes (a select is - * itself an event_t). - * - * All storage is caller-provided. No allocation, no atomics. - * - * Single-threaded only — matches xco's thread affinity. The contract - * "try first, park only if try failed" is race-free because nothing - * else runs between the two calls. - * - * One-waker invariant. An xstep, while suspended, is parked on at - * most one event. Multi-wait is composed in the event graph: build a - * select_event (or any future combinator — all-of, timeout, ...) and - * park on that. The xstep never sees more than one event directly. - * This is what lets a single step_waker_t live inline in the xstep - * and a single next/prev pair serve both event waitlists and the - * runtime ready queue (the two list memberships are disjoint in time). - * - * Standard usage from a state machine: - * - * uintptr_t v; - * if (event_try(e, &v)) { ... use v ... } - * else { event_park(e, &my_waker.base); return SUSPENDED; } - * - * Standard usage from an xco coroutine wrapper: - * - * uintptr_t v; - * if (!event_try(e, &v)) { - * step_waker_t sw; - * step_waker_init(&sw, rt, &xco_self()->base); - * event_park(e, &sw.base); - * xco_suspend(0); - * (void)event_try(e, &v); // now ready - * } - */ - -#ifndef EVENT_H -#define EVENT_H - -#include <stdbool.h> -#include <stddef.h> -#include <stdint.h> - -#include "xstep.h" - -/* ---- Waker ------------------------------------------------------------ */ - -typedef struct waker waker_t; -struct waker { - /* Doubly-linked while parked on an event waitlist, so unpark is - * O(1). Reused as the singly-linked next pointer while on the - * runtime ready queue (FIFO, no removal from middle); prev is - * undefined in that state and reset on the next park. */ - waker_t *next; - waker_t *prev; - /* Fire callback. value is the event's payload at fire time — sticky - * events also store it on themselves, transient events (channels, - * one-shot signals) deliver only here. Wakers that don't care - * about the value just ignore the parameter. - * - * Invoke via waker_fire (below), not directly: the helper enforces - * the "fire receives a fully detached waker" contract that makes it - * safe to re-park inside the callback. */ - void (*fire)(waker_t *w, uintptr_t value); -}; - -/* Canonical way to invoke a waker's fire callback. Hands the callback a - * fully detached waker so the callback (or whatever the resumed step - * does) can re-park on a fresh waitlist without colliding with stale - * link state. Detachers that lead into fire (queue pops, latch drains, - * etc.) don't need to clear prev/next themselves. */ -static inline void waker_fire(waker_t *w, uintptr_t value) { - w->prev = NULL; - w->next = NULL; - w->fire(w, value); -} - -/* ---- Event ------------------------------------------------------------ */ - -typedef struct event event_t; - -typedef struct { - /* Inline-succeed if possible. *out receives the event's value when - * it has one (latch payload, select winner index, channel data). - * out may be NULL if the caller doesn't need the value. */ - bool (*try_)(event_t *e, uintptr_t *out); - /* Arm w on the event's waitlist. Caller's contract: try_ returned - * false. Single-threaded runtime guarantees no transition between. */ - void (*park)(event_t *e, waker_t *w); - /* Remove w from the waitlist. Idempotent: no-op if not parked. */ - void (*unpark)(event_t *e, waker_t *w); -} event_vtable_t; - -struct event { const event_vtable_t *vt; }; - -static inline bool event_try(event_t *e, uintptr_t *out) { - return e->vt->try_(e, out); -} -static inline void event_park(event_t *e, waker_t *w) { e->vt->park(e, w); } -static inline void event_unpark(event_t *e, waker_t *w) { e->vt->unpark(e, w); } - -/* ---- Runtime ---------------------------------------------------------- */ - -/* Forward-declared: the optional timer source attached to the runtime. - * Defined in the timer section below. */ -typedef struct timers timers_t; - -typedef struct runtime { - waker_t *head, *tail; - timers_t *timers; /* optional; advanced inside rt_run */ -} runtime_t; - -static inline void rt_init(runtime_t *rt) { - rt->head = rt->tail = NULL; - rt->timers = NULL; -} - -/* Attach (or detach with NULL) a timer source. While attached, rt_run - * advances it each pass with the now value the caller supplied; firing - * timers may enqueue more wakers, which the same rt_run call then drains. */ -static inline void rt_attach_timers(runtime_t *rt, timers_t *ts) { - rt->timers = ts; -} - -/* Append w to the ready queue. Used by step_waker_fire and by anyone - * else that wants a waker resumed by the scheduler. */ -static inline void rt_enqueue(runtime_t *rt, waker_t *w) { - w->next = NULL; - if (rt->tail) rt->tail->next = w; - else rt->head = w; - rt->tail = w; -} - -/* Drain the ready queue, resuming each step-waker's xstep, until empty. - * Steps may re-arm on events (and thus leave the queue) or enqueue - * other steps; rt_run keeps going until quiescent. now is forwarded to - * any attached timer source's advance(); pass 0 (or anything) when no - * source is attached. The library never reads a clock — now is always - * caller-supplied. */ -void rt_run(runtime_t *rt, uint64_t now); - -/* The canonical bridge between events and the scheduler. When fired, - * stashes the value and enqueues itself onto rt; rt_run pops it and - * calls xstep(step, value), so the resumed step receives the event's - * payload directly without a re-try. - * - * Init once, re-park freely. The runtime hands the waker back fully - * detached (next/prev cleared) before invoking the resumed step, so a - * subscriber that wants to wait on the next event can call event_park - * directly — no re-init needed unless rt or step changes. */ -typedef struct { - waker_t base; - runtime_t *rt; - xstep_t *step; - uintptr_t resume_value; /* set by fire, consumed by rt_run */ -} step_waker_t; - -/* Defined in event.c; declared here so step_waker_init can install it. */ -void _step_waker_fire(waker_t *w, uintptr_t value); - -static inline void step_waker_init(step_waker_t *sw, runtime_t *rt, xstep_t *s) { - sw->base.next = NULL; - sw->base.prev = NULL; - sw->base.fire = _step_waker_fire; - sw->rt = rt; - sw->step = s; - sw->resume_value = 0; -} - -/* ---- Latch ------------------------------------------------------------ */ - -/* One-shot sticky event. set() flips the bit, stores the payload, and - * fires every waiter. Subsequent set() calls are ignored. To re-arm, - * reinitialize a fresh latch. */ -typedef struct { - event_t base; - bool set; - uintptr_t value; - waker_t *waiters; -} latch_t; - -/* Defined in event.c; referenced by latch_init. */ -extern const event_vtable_t _latch_vt; - -static inline void latch_init(latch_t *l) { - l->base.vt = &_latch_vt; - l->set = false; - l->value = 0; - l->waiters = NULL; -} - -void latch_set(latch_t *l, uintptr_t value); - -/* ---- Countdown -------------------------------------------------------- */ - -/* One-shot fan-in counter. Fires its embedded latch (payload 0) when - * remaining hits 0. countdown_add(n) is legal while remaining > 0; - * countdown_done decrements; both are UB once the latch has fired. - * - * Compose with the standard event API via countdown_event(). */ -typedef struct countdown { - latch_t done; - size_t remaining; -} countdown_t; - -static inline void countdown_init(countdown_t *c, size_t n) { - latch_init(&c->done); - c->remaining = n; - if (n == 0) latch_set(&c->done, 0); -} - -static inline void countdown_add(countdown_t *c, size_t n) { - /* UB after fire — caller's contract. */ - c->remaining += n; -} - -static inline void countdown_done(countdown_t *c) { - /* UB at 0 — caller's contract. */ - if (--c->remaining == 0) latch_set(&c->done, 0); -} - -static inline event_t *countdown_event(countdown_t *c) { return &c->done.base; } -static inline bool countdown_fired(const countdown_t *c) { return c->done.set; } - -/* ---- Notify (wake-one / wake-all) ------------------------------------- */ - -/* Transient signal with no sticky state. notify_one fires (and detaches) - * the head of a FIFO waitlist; notify_all fires every parked waker. Both - * are no-ops when the waitlist is empty. Subscribers must re-park to see - * subsequent notifications. - * - * event_try always returns false: there is no "ready now" state — a - * subscriber waits for the *next* notify. */ -typedef struct notify { - event_t base; - waker_t *head, *tail; -} notify_t; - -extern const event_vtable_t _notify_vt; - -static inline void notify_init(notify_t *n) { - n->base.vt = &_notify_vt; - n->head = n->tail = NULL; -} - -static inline event_t *notify_event(notify_t *n) { return &n->base; } - -void notify_one(notify_t *n); -void notify_all(notify_t *n); - -/* ---- Semaphore -------------------------------------------------------- */ - -/* Counting semaphore. acquire is exposed as event_t (composable with - * select / wait_or_cancel): event_try succeeds and decrements when - * permits > 0; otherwise the waker parks FIFO. semaphore_release(n) - * hands one permit to each of up to n waiting wakers (each is fired, - * which the receiver treats as "you got a permit") before adding any - * leftover to the count. - * - * One permit per acquire. Bulk acquire isn't expressible in event_t's - * shape; if you need it, call sequentially. For binary use (mutex-style - * critical section across awaits) init with permits = 1. - * - * Fairness: FIFO at the waitlist. A racing inline event_try by a fresh - * caller can jump ahead of parked waiters when permits are released - * back to count rather than directly handed off — release prefers - * parked waiters first to avoid that. */ -typedef struct semaphore { - event_t acquire; - size_t permits; - waker_t *head, *tail; -} semaphore_t; - -extern const event_vtable_t _semaphore_acquire_vt; - -static inline void semaphore_init(semaphore_t *s, size_t initial) { - s->acquire.vt = &_semaphore_acquire_vt; - s->permits = initial; - s->head = s->tail = NULL; -} - -static inline event_t *semaphore_event(semaphore_t *s) { return &s->acquire; } - -void semaphore_release(semaphore_t *s, size_t n); - -/* ---- Mutex ------------------------------------------------------------ */ - -/* Binary semaphore wrapper for vocabulary at call sites. mutex_init is - * semaphore_init(s, 1); the event_t fires once per release; mutex_release - * hands the permit to the next waiter (or returns it to the count). */ -typedef semaphore_t mutex_t; - -static inline void mutex_init (mutex_t *m) { semaphore_init(m, 1); } -static inline event_t *mutex_event (mutex_t *m) { return semaphore_event(m); } -static inline void mutex_release(mutex_t *m) { semaphore_release(m, 1); } - -/* ---- Select / all-of -------------------------------------------------- */ - -/* Wait over N input events. Two semantics share the same storage shape, - * so a caller can switch between them by changing only the init call: - * - * select_event_init fires when ANY input fires (any-of) - * allof_event_init fires when ALL inputs fire (all-of) - * - * In both cases done's payload is the index of the input whose firing - * closed the wait — the winner for select, the last-to-fire for allof — - * and inputs[i].value carries each fired input's payload (works - * uniformly for sticky and transient sources, where re-trying the input - * would either succeed or fail). Composes: a select_event is itself an - * event. */ - -typedef struct select_event select_event_t; - -/* Per-input arming record. Caller-allocated as an array of n alongside - * the select_event. After fire, .value holds whatever the input - * delivered; other fields are internal. */ -typedef struct { - waker_t w; - event_t *src; - select_event_t *parent; - uintptr_t value; /* captured at fire time */ -} select_input_t; - -struct select_event { - latch_t done; /* fires with the closing input's index */ - select_input_t *inputs; - size_t n; - size_t remaining; /* counts down; done fires at 0 - * (select: starts at 1, allof: at n) */ -}; - -/* Initialize as a select (any-of). inputs[] is caller-provided storage - * for n nodes; srcs[] is the array of n input event pointers (read - * once during init). If any input is already ready, the select fires - * immediately and no wakers are parked. Use &s->done.base as the - * resulting event_t. */ -void select_event_init(select_event_t *s, - select_input_t *inputs, size_t n, - event_t *const *srcs); - -/* Initialize as an allof (all-of). Inputs already ready at init are - * consumed inline (no parking, value captured); if every input is - * ready, done fires immediately. n == 0 fires done with payload 0. */ -void allof_event_init(select_event_t *s, - select_input_t *inputs, size_t n, - event_t *const *srcs); - -/* Disarm any inputs still parked. Safe to call after fire (no-op) and - * after partial completion (allof). Required before s leaves scope if - * it has not yet fired. */ -void select_event_deinit(select_event_t *s); - -/* ---- Channel ---------------------------------------------------------- */ - -/* Unbuffered rendezvous channel carrying uintptr_t. Senders and receivers - * wait on each other; whichever arrives first parks until its peer - * shows up. The pending value lives in the sender's chan_send_waker_t - * for the duration of any wait — no per-channel buffer. - * - * Recv side is exposed as event_t (composable with select). Send side is - * a typed API because send carries a value that doesn't fit event_t's - * try(out) signature. Selecting on send is therefore not supported in - * this layer; if needed, wrap a send in a per-call op event. - * - * Rendezvous matrix: - * send + parked recv fire recv with value, sender continues inline. - * send + no recv sender parks (chan_park_send); peer pulls later. - * recv + parked sender read sender's value, fire sender (delivery - * confirmation), receiver continues inline. - * recv + no sender receiver parks (event_park on recv); peer - * delivers later. - * - * FIFO order on both waitlists. - * - * Close: optional EOF semantics. After chan_close, try_send fails (no - * delivery), parked senders are drained with delivered=false, and parked - * receivers are woken so they can observe RECV_CLOSED via chan_recv. The - * recv event is "ready" iff a value is available OR the channel is - * closed — receivers must call chan_recv to disambiguate value vs EOF. - * chan_park_send after close is UB. */ - -/* Result of a typed receive on a channel or queue. */ -typedef enum { - RECV_GOT, /* *out holds the delivered value */ - RECV_EMPTY, /* nothing available right now; caller may park */ - RECV_CLOSED, /* peer closed and no values remain */ -} recv_status_t; - -typedef struct chan { - event_t recv; /* the recv-side event */ - waker_t *send_head, *send_tail; /* parked chan_send_waker_t bases */ - waker_t *recv_head, *recv_tail; /* parked recv-side wakers */ - bool closed; -} chan_t; - -extern const event_vtable_t _chan_recv_vt; - -static inline void chan_init(chan_t *c) { - c->recv.vt = &_chan_recv_vt; - c->send_head = c->send_tail = NULL; - c->recv_head = c->recv_tail = NULL; - c->closed = false; -} - -/* Sender-side waker. Embeds a step_waker (the actual scheduler bridge) - * and the value to deliver. The receiver reads .value, then fires the - * step_waker — the sender resumes via the runtime, no payload passed. - * - * `delivered` is set by the closing side before fire: true on a normal - * recv handoff, false when chan_close drains the parked-sender list. - * Senders read it after resume to know whether their value reached a - * receiver. */ -typedef struct { - step_waker_t sw; - uintptr_t value; - bool delivered; -} chan_send_waker_t; - -static inline void chan_send_waker_init(chan_send_waker_t *csw, - runtime_t *rt, xstep_t *s, - uintptr_t value) { - step_waker_init(&csw->sw, rt, s); - csw->value = value; - csw->delivered = false; -} - -/* Try to deliver value inline. Returns true iff a receiver was parked - * AND the channel is open; its waker fires with the value, the receiver - * becomes ready, and the sender continues without parking. Returns false - * after close (no delivery). */ -bool chan_try_send(chan_t *c, uintptr_t value); - -/* Park a sender. csw->value must already be set (use chan_send_waker_init). - * The sender's xstep is resumed when a receiver consumes the value, or - * when chan_close drains the list (sender resumes with csw->delivered - * false). Calling chan_park_send on a closed channel is UB. */ -void chan_park_send(chan_t *c, chan_send_waker_t *csw); - -/* Remove a parked sender (cancellation). No-op if not parked. */ -void chan_unpark_send(chan_t *c, chan_send_waker_t *csw); - -/* Typed receive. Disambiguates value vs EOF where event_try cannot. */ -recv_status_t chan_recv(chan_t *c, uintptr_t *out); - -/* Close the channel. Idempotent. Drains parked senders (each resumes - * with csw->delivered = false) and wakes parked receivers (which then - * observe RECV_CLOSED via chan_recv). Subsequent chan_try_send returns - * false; chan_park_send is UB. */ -void chan_close(chan_t *c); -static inline bool chan_is_closed(const chan_t *c) { return c->closed; } - -/* Selectable send op. A per-call object that holds the value, parks - * on the channel, and exposes &op->done.base as the event that fires - * when delivery resolves. Compose with select like any other event. - * - * The op embeds a chan_send_waker_t (so the chan's send list stays - * uniform — receivers read .value at the same offset for both direct - * and op senders) but rewires its fire callback: instead of resuming - * an xstep, fire sets op->done. Polymorphism via the function pointer. - * - * The done latch's payload is 1 on a successful delivery and 0 on - * close-drain (sender's value did not reach a receiver). The init path - * resolves to 0 inline if the channel is already closed. - * - * Lifecycle: init → wait on &op->done.base → deinit. Always deinit; - * it's a no-op after resolution and unparks the chan-side waker if not. */ -typedef struct { - chan_send_waker_t csw; /* parked on chan; fire overridden */ - chan_t *chan; - latch_t done; -} chan_send_op_t; - -/* Implementation detail: exposed so chan_send_op_init can install it. */ -void _chan_send_op_fire(waker_t *w, uintptr_t value); - -void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value); -void chan_send_op_deinit(chan_send_op_t *op); - -/* ---- Queue ------------------------------------------------------------ */ - -/* Bounded FIFO of uintptr_t. Caller provides the ring buffer storage. - * Recv side is exposed as event_t (composable with select). Send side is - * a typed API (carries a value), shaped after chan's send. - * - * Three full-buffer policies, fixed at init: - * QUEUE_BLOCK senders park until a receiver makes room. - * QUEUE_DROP_NEWEST queue_try_send silently discards the new value. - * QUEUE_DROP_OLDEST queue_try_send evicts the head and pushes new tail. - * - * Senders never park under DROP_* policies — queue_park_send is only - * meaningful under QUEUE_BLOCK, and only after queue_try_send returned - * false. queue_unpark_send is idempotent (cancellation-safe). - * - * Direct-handoff: queue_try_send first checks for a parked receiver and - * delivers inline if present (payload bypasses the buffer), regardless - * of policy. Symmetric to chan. - * - * cap == 0 with QUEUE_BLOCK degenerates to a rendezvous channel; chan_t - * remains the more direct expression of that case. */ - -typedef enum { - QUEUE_BLOCK, - QUEUE_DROP_NEWEST, - QUEUE_DROP_OLDEST, -} queue_policy_t; - -typedef struct queue { - event_t recv; - uintptr_t *buf; - size_t cap, head, len; - queue_policy_t policy; - waker_t *send_head, *send_tail; - waker_t *recv_head, *recv_tail; - bool closed; -} queue_t; - -extern const event_vtable_t _queue_recv_vt; - -static inline void queue_init(queue_t *q, uintptr_t *buf, size_t cap, - queue_policy_t policy) { - q->recv.vt = &_queue_recv_vt; - q->buf = buf; - q->cap = cap; - q->head = 0; - q->len = 0; - q->policy = policy; - q->send_head = q->send_tail = NULL; - q->recv_head = q->recv_tail = NULL; - q->closed = false; -} - -static inline event_t *queue_recv_event(queue_t *q) { return &q->recv; } - -/* Try to enqueue. Direct-delivers to a parked receiver if one is waiting. - * Returns: - * QUEUE_BLOCK true if delivered or buffered; false if full. - * QUEUE_DROP_NEWEST always true (silently drops if full). - * QUEUE_DROP_OLDEST always true (evicts head if full). */ -bool queue_try_send(queue_t *q, uintptr_t value); - -/* Sender-side waker for QUEUE_BLOCK. Same shape as chan_send_waker_t: - * a step_waker plus the value to deliver. Receivers read .value at the - * same offset so the queue's send list stays uniform. `delivered` is - * set by the closing side: true on a normal handoff, false on a close - * drain. */ -typedef struct { - step_waker_t sw; - uintptr_t value; - bool delivered; -} queue_send_waker_t; - -static inline void queue_send_waker_init(queue_send_waker_t *qsw, - runtime_t *rt, xstep_t *s, - uintptr_t value) { - step_waker_init(&qsw->sw, rt, s); - qsw->value = value; - qsw->delivered = false; -} - -void queue_park_send (queue_t *q, queue_send_waker_t *qsw); -void queue_unpark_send(queue_t *q, queue_send_waker_t *qsw); - -/* Typed receive. Mirrors chan_recv: returns RECV_GOT (value popped from - * the buffer or directly from a parked sender), RECV_CLOSED (closed and - * drained), or RECV_EMPTY (caller may park). */ -recv_status_t queue_recv(queue_t *q, uintptr_t *out); - -/* Close the queue. Idempotent. Drains parked senders (delivered=false) - * and wakes parked receivers. After close, queue_try_send under - * QUEUE_BLOCK returns false; under QUEUE_DROP_* the value is silently - * dropped (returns true). queue_park_send after close is UB. */ -void queue_close(queue_t *q); -static inline bool queue_is_closed(const queue_t *q) { return q->closed; } - -/* Selectable send op. Mirrors chan_send_op_t: a per-call object that - * holds the value, parks on the queue (only meaningful under QUEUE_BLOCK), - * and exposes &op->done.base as the event that fires when the send - * resolves. The done latch's payload is 1 on a successful delivery - * (handed to a receiver, buffered, or accepted under DROP_*) and 0 on - * close-drain. - * - * Under DROP_* policies the send always resolves inline at init: the - * try_send path returns true and op->done fires immediately. */ -typedef struct { - queue_send_waker_t qsw; /* parked on queue; fire overridden */ - queue_t *queue; - latch_t done; -} queue_send_op_t; - -void _queue_send_op_fire(waker_t *w, uintptr_t value); - -void queue_send_op_init (queue_send_op_t *op, queue_t *q, uintptr_t value); -void queue_send_op_deinit(queue_send_op_t *op); - -/* ---- Broadcast (slot) ------------------------------------------------- */ - -/* Re-armable signal carrying a "latest value" slot. Subscribers park on - * the event; broadcast_publish stores the new value, fires every parked - * subscriber with that value, and clears the waitlist — subscribers must - * re-park to see further publishes. Subscribers that aren't parked at - * publish time miss that publish but will see the next one. This is the - * coalescing "watch a slot" semantics, not lossless fan-out. - * - * event_try always returns false: there is no "ready now" state — a - * subscriber waits for the *next* publish. To read the latest published - * value at any time, use broadcast_value (valid once broadcast_has_value - * returns true). - * - * For lossless multi-consumer delivery, give each subscriber its own - * queue and have the producer write to all of them. */ - -typedef struct broadcast { - event_t base; - bool has_value; - uintptr_t value; - waker_t *waiters; -} broadcast_t; - -extern const event_vtable_t _broadcast_vt; - -static inline void broadcast_init(broadcast_t *b) { - b->base.vt = &_broadcast_vt; - b->has_value = false; - b->value = 0; - b->waiters = NULL; -} - -static inline event_t *broadcast_event (broadcast_t *b) { return &b->base; } -static inline bool broadcast_has_value(const broadcast_t *b) { return b->has_value; } -static inline uintptr_t broadcast_value (const broadcast_t *b) { return b->value; } - -void broadcast_publish(broadcast_t *b, uintptr_t value); - -/* ---- Cancellation ----------------------------------------------------- */ - -/* A cancellation token is a sticky latch — these aliases exist for - * vocabulary at call sites. cancel_set fires every parked waiter; the - * idempotency of latch_set means racing cancellers are fine. - * - * Pair a cancel_t with any blocking event via wait_or_cancel to get - * "await X, or be cancelled." - * - * Discipline: cancellation notifies; it never drops in-flight values. - * A cancelled await returns control to its caller, which is responsible - * for draining whatever it owns — deinit a pending chan_send_op so its - * value goes back to the sender, deinit a select_event so input wakers - * detach, drive a cancellable coroutine to XSTEP_DEAD before freeing - * its stack. The xco layer does no unwinding; the coroutine cooperates. */ - -typedef latch_t cancel_t; - -static inline void cancel_init(cancel_t *c) { latch_init(c); } -static inline void cancel_set(cancel_t *c) { latch_set(c, 0); } -static inline bool cancel_is_set(const cancel_t *c) { return c->set; } -static inline event_t *cancel_event(cancel_t *c) { return &c->base; } - -/* Outcome indices for wait_or_cancel — match the inputs[] order so the - * value the resumer receives from the latched select reads as one of - * these directly. */ -enum { - WAIT_OK = 0, /* ev fired; inputs[0].value holds its payload */ - WAIT_CANCELLED = 1, /* cancel fired before ev */ -}; - -/* Build a select over (ev, cancel) using caller-provided storage. If - * either is already ready at init the select fast-paths and never parks - * anyone (ev is checked first, so an event that has already resolved - * wins over a concurrent cancel). Treat &out->done.base as the event - * to wait on. Always pair with select_event_deinit before storage - * leaves scope (no-op once fired). */ -static inline void wait_or_cancel(select_event_t *out, - select_input_t inputs[2], - event_t *ev, cancel_t *c) { - event_t *srcs[2] = {ev, cancel_event(c)}; - select_event_init(out, inputs, 2, srcs); -} - -/* ---- Timers ----------------------------------------------------------- */ - -/* A timer is a sticky event keyed on a u64 deadline. It fires (exactly - * once) when the attached timer source is advanced past that deadline. - * The library never reads a clock; the caller provides `now` to - * timers_advance (or via rt_run). - * - * Storage is pluggable through the timers vtable so callers can swap a - * pairing heap (in-tree, O(log n) amortized everywhere including cancel) - * for a wheel or other structure without touching the timer/timeout - * surface. The timer struct holds the heap link fields inline; the source - * impl interprets them. - * - * Lifecycle: - * timer_init(t, ts, deadline) // inserts into ts - * ... wait on timer_event(t), or compose into select/wait_or_cancel ... - * timer_deinit(t) // removes from ts if not yet fired - * - * Fire payload is the deadline. Re-arming = reinit a fresh timer. */ - -typedef struct timer timer_t; - -typedef struct { - /* Insert t into the source. t must be initialized but not yet - * inserted; insert sets t's heap link fields. */ - void (*insert) (timers_t *ts, timer_t *t); - /* Remove t from the source if currently inserted. Caller must - * ensure t was inserted into this same source. */ - void (*cancel) (timers_t *ts, timer_t *t); - /* Fire every timer whose deadline <= now, in deadline order, popping - * each from the source. Each fire drains the timer's waiter list. */ - void (*advance)(timers_t *ts, uint64_t now); - /* If any timer is queued, write its deadline to *out and return - * true. *out untouched on false. */ - bool (*peek) (const timers_t *ts, uint64_t *out); -} timers_vtable_t; - -struct timers { const timers_vtable_t *vt; }; - -static inline void timers_insert (timers_t *ts, timer_t *t) { ts->vt->insert (ts, t); } -static inline void timers_cancel (timers_t *ts, timer_t *t) { ts->vt->cancel (ts, t); } -static inline void timers_advance(timers_t *ts, uint64_t now) { ts->vt->advance(ts, now); } -static inline bool timers_peek (const timers_t *ts, uint64_t *o) { return ts->vt->peek(ts, o); } - -/* Concrete timer. Embeds a latch so try/park/unpark and the fire-all - * waitlist handling come for free; the timer source manipulates only - * the heap link fields and triggers the latch on fire. The latch's - * payload after fire is the timer's deadline. */ -struct timer { - latch_t done; /* fires once, payload = deadline */ - uint64_t deadline; - timers_t *src; /* source this timer is registered with */ - bool in_heap; /* true between insert and fire/cancel */ - /* Pairing-heap link fields; opaque to anyone but the source impl. - * prev is parent if first child, else previous sibling, else NULL. */ - timer_t *child, *prev, *next; -}; - -static inline event_t *timer_event(timer_t *t) { return &t->done.base; } -static inline bool timer_fired(const timer_t *t) { return t->done.set; } - -void timer_init (timer_t *t, timers_t *ts, uint64_t deadline); -void timer_deinit(timer_t *t); - -/* In-tree timer source: intrusive pairing heap. O(1) amortized insert - * and meld; O(log n) amortized advance and cancel. No per-source - * allocation — the heap is just a root pointer; nodes live in the - * caller's timer_t's. */ -typedef struct { - timers_t base; - timer_t *root; -} pairing_heap_t; - -extern const timers_vtable_t _pairing_heap_vt; - -static inline void pairing_heap_init(pairing_heap_t *h) { - h->base.vt = &_pairing_heap_vt; - h->root = NULL; -} - -/* ---- Timeout ---------------------------------------------------------- */ - -/* Bundle: a timer that fires a cancel_t on expiration. The natural - * pairing for "await ev, or be cancelled by deadline": - * - * timeout_t to; - * timeout_init(&to, ts, now + budget); - * select_event_t sel; select_input_t inputs[2]; - * wait_or_cancel(&sel, inputs, ev, &to.cancel); - * ... wait on &sel.done.base ... - * select_event_deinit(&sel); - * timeout_deinit(&to); // safe whether the timer fired or not - * - * The bridge waker is parked on the timer; when it fires it sets the - * cancel. Bridge fire is idempotent vs cancel_set (a sticky latch). */ -typedef struct timeout { - timer_t timer; - cancel_t cancel; - waker_t bridge; -} timeout_t; - -void timeout_init (timeout_t *to, timers_t *ts, uint64_t deadline); -void timeout_deinit(timeout_t *to); - -/* ---- Ticker ----------------------------------------------------------- */ - -/* Re-armable transient signal driven by a timer source. Each time the - * underlying timer fires, the ticker computes the next deadline (period - * past the just-fired one, with skip-ahead for catch-up after overflow), - * reinstalls the timer, and fires every parked subscriber with the - * just-fired deadline as the payload. Subscribers that aren't parked at - * a fire miss it (transient — same coalescing semantics as broadcast). - * - * ticker_init(&t, ts, period, first_deadline); - * ... wait on ticker_event(&t), re-park to see further ticks ... - * ticker_deinit(&t); // cancels the in-flight timer - * - * event_try always returns false; subscribers wait for the *next* tick. */ -typedef struct ticker { - timer_t timer; - timers_t *src; - uint64_t period; - event_t base; - waker_t *waiters; - waker_t bridge; /* internal: parks on timer_event */ -} ticker_t; - -extern const event_vtable_t _ticker_vt; - -void ticker_init (ticker_t *t, timers_t *ts, - uint64_t period, uint64_t first_deadline); -void ticker_deinit(ticker_t *t); -static inline event_t *ticker_event(ticker_t *t) { return &t->base; } - -/* ---- Task ------------------------------------------------------------- */ - -/* Lifecycle handle for a running xstep. Bundles a done latch (fires when - * the xstep returns, payload = its return value) with a cancel latch - * (the canonical signal to ask the xstep to wind down). The xstep itself - * is caller-allocated; the task holds a pointer to it. - * - * Who fires done: - * - Hand-coded state machine: call task_done(t, ret) in the same arm - * that returns XSTEP_DEAD. - * - xco-backed task (see xco_task_t in xco.h): the trampoline calls - * task_done automatically with the coroutine's return value. - * - * Cooperation: cancellation only notifies — the xstep is responsible for - * draining what it owns and reaching XSTEP_DEAD. The task's cancel is a - * normal cancel_t, so the xstep typically composes wait_or_cancel against - * it on every blocking await. - * - * Joining: callers wait on task_done_event with the standard event API - * (try / park, or compose into select / wait_or_cancel). On fire the - * latch's payload is the xstep's return value. */ - -typedef struct task { - xstep_t *step; - latch_t done; - cancel_t cancel; -} task_t; - -static inline void task_init(task_t *t, xstep_t *step) { - t->step = step; - latch_init(&t->done); - cancel_init(&t->cancel); -} - -/* Mark the task complete with its return value. Idempotent (latch_set is). */ -static inline void task_done(task_t *t, uintptr_t value) { - latch_set(&t->done, value); -} - -static inline event_t *task_done_event (task_t *t) { return &t->done.base; } -static inline cancel_t *task_cancel (task_t *t) { return &t->cancel; } -static inline bool task_finished (const task_t *t) { return t->done.set; } -static inline bool task_is_cancelled(const task_t *t) { return cancel_is_set(&t->cancel); } -static inline xstep_t *task_step (task_t *t) { return t->step; } - -/* ---- Task group ------------------------------------------------------- */ - -/* Fan-in join + fan-out cancel for a dynamic set of tasks. Caller - * provides storage for each per-attachment record (group_attach_t), so - * the group itself does no allocation. - * - * task_group_attach(g, t, slot): - * countdown_add(g->pending, 1); slot's bridge waker parks on - * task_done_event(t); slot is appended to g's list. When the task's - * done fires, the bridge fires: it splices the slot out of g's list - * and calls countdown_done(&g->pending). Re-attaching a finished - * task is UB. - * - * task_group_cancel(g): walks the attachment list and cancel_set's - * each &slot->task->cancel, then cancel_set's g->cancel. Bodies that - * compose wait_or_cancel against task_cancel(t) wind down cooperatively; - * meanwhile, anything waiting on g->cancel observes the group-level - * signal directly. - * - * task_group_join_event(g): fires when every attached task has reached - * task_done. Compose with select / wait_or_cancel like any event. */ - -typedef struct group_attach group_attach_t; - -typedef struct task_group { - countdown_t pending; - cancel_t cancel; - group_attach_t *head, *tail; -} task_group_t; - -struct group_attach { - waker_t bridge; /* parked on task_done_event(task) */ - task_t *task; - task_group_t *group; - group_attach_t *next, *prev; -}; - -void task_group_init (task_group_t *g); -void task_group_attach (task_group_t *g, task_t *t, - group_attach_t *slot); -void task_group_cancel (task_group_t *g); -static inline event_t *task_group_join_event (task_group_t *g) { - return countdown_event(&g->pending); -} -static inline cancel_t *task_group_cancel_handle(task_group_t *g) { - return &g->cancel; -} - -#endif /* EVENT_H */ diff --git a/platform/arm64/xco_platform.c b/platform/arm64/xco_platform.c @@ -0,0 +1,132 @@ +/* + * platform/arm64/xco_platform.c — AArch64 init + switch + trampoline thunk. + * + * The switch primitive and the trampoline thunk are written as + * file-scope global asm rather than __attribute__((naked)) functions: + * GCC silently ignores `naked` on AArch64, so naked-function inline + * asm only works under Clang. File-scope __asm__ is portable across + * both GCC and Clang and keeps everything in one .c file. + * + * Layout offsets used in the asm: + * 0 regs[0..9] x19-x28 + * 80 regs[10..11] fp (x29), lr (x30) + * 96 regs[12] sp + * 104 fp_regs[0..7] d8-d15 + * + * Symbol naming: asm labels are built with the compiler-provided + * __USER_LABEL_PREFIX__ (empty on ELF, "_" on Mach-O) so the labels + * emitted here match what the C compiler generates for references to + * xco_platform_switch / xco_platform_trampoline_thunk on either OS. + */ + +#include "xco_platform_internal.h" +#include "xco_platform.h" + +#include <assert.h> +#include <stddef.h> +#include <stdint.h> +#include <string.h> + +/* The platform context type forward-declared in xco_platform.h. */ +struct xco_platform_ctx { + uintptr_t regs[13]; + uint64_t fp_regs[8]; +} __attribute__((aligned(16))); + +/* ---- layout checks -------------------------------------------------- + * + * xco.c sized its embedded ctx buffer using these macros — verify + * they still match the real struct, and verify the byte offsets the + * asm hardcodes are still correct. + */ +_Static_assert(sizeof(struct xco_platform_ctx) == _XCO_CTX_SIZE, + "_XCO_CTX_SIZE out of sync with struct layout"); +_Static_assert(_Alignof(struct xco_platform_ctx) == _XCO_CTX_ALIGN, + "_XCO_CTX_ALIGN out of sync with struct layout"); +_Static_assert(sizeof(uintptr_t) == 8, "AArch64"); +_Static_assert(offsetof(struct xco_platform_ctx, regs) == 0, ""); +_Static_assert(offsetof(struct xco_platform_ctx, fp_regs) == 104, ""); + +/* Forward declaration: the symbol is defined by the file-scope asm + * below. Its address is taken in xco_platform_init. */ +extern void xco_platform_trampoline_thunk(void); + +/* ---- xco_platform_init --------------------------------------------- */ + +void xco_platform_init(xco_platform_ctx_t *ctx, + void *stack_base, size_t stack_len, + void (*entry)(uintptr_t)) { + assert(((uintptr_t)stack_base & (XCO_STACK_ALIGN - 1)) == 0); + + /* AArch64 stacks grow down. Compute and align the top. */ + uintptr_t top = (uintptr_t)stack_base + stack_len; + top &= ~(uintptr_t)(XCO_STACK_ALIGN - 1); + + memset(ctx, 0, sizeof(*ctx)); + + /* The switch primitive will load these into x19, fp, lr, sp. */ + ctx->regs[0] = (uintptr_t)entry; /* x19 */ + ctx->regs[10] = 0; /* fp */ + ctx->regs[11] = (uintptr_t)xco_platform_trampoline_thunk; /* lr */ + ctx->regs[12] = top; /* sp */ +} + +/* ---- xco_platform_switch and trampoline thunk (file-scope asm) ----- + * + * xco_platform_switch(from x0, to x1, value x2) -> x0: + * saves callee-saved state into *from, restores from *to, and + * delivers `value` to the destination as x0 — which is the first-arg + * register on the trampoline thunk's first run, and the return-value + * register when resuming a previously-suspended switch. + * + * xco_platform_trampoline_thunk(): + * On entry x0 = value, x19 = entry; tail-calls entry(value); brk if + * it ever returns. + */ + +/* Stringify-after-expand so __USER_LABEL_PREFIX__ (a token, possibly + * empty) becomes a string literal we can concatenate into the asm. */ +#define XCO_STR_(x) #x +#define XCO_STR(x) XCO_STR_(x) +#define XCO_SYM(name) XCO_STR(__USER_LABEL_PREFIX__) #name + +__asm__ ( + ".text\n" + ".align 4\n" + + ".globl " XCO_SYM(xco_platform_switch) "\n" + XCO_SYM(xco_platform_switch) ":\n" + " stp x19, x20, [x0, #0]\n" + " stp x21, x22, [x0, #16]\n" + " stp x23, x24, [x0, #32]\n" + " stp x25, x26, [x0, #48]\n" + " stp x27, x28, [x0, #64]\n" + " stp fp, lr, [x0, #80]\n" + " mov x9, sp\n" + " str x9, [x0, #96]\n" + " stp d8, d9, [x0, #104]\n" + " stp d10, d11, [x0, #120]\n" + " stp d12, d13, [x0, #136]\n" + " stp d14, d15, [x0, #152]\n" + + " ldp d8, d9, [x1, #104]\n" + " ldp d10, d11, [x1, #120]\n" + " ldp d12, d13, [x1, #136]\n" + " ldp d14, d15, [x1, #152]\n" + " ldp x19, x20, [x1, #0]\n" + " ldp x21, x22, [x1, #16]\n" + " ldp x23, x24, [x1, #32]\n" + " ldp x25, x26, [x1, #48]\n" + " ldp x27, x28, [x1, #64]\n" + " ldp fp, lr, [x1, #80]\n" + " ldr x9, [x1, #96]\n" + " mov sp, x9\n" + + " mov x0, x2\n" + " ret\n" + + ".globl " XCO_SYM(xco_platform_trampoline_thunk) "\n" + XCO_SYM(xco_platform_trampoline_thunk) ":\n" + " blr x19\n" + " brk #0\n" +); diff --git a/platform/arm64/xco_platform.h b/platform/arm64/xco_platform.h @@ -0,0 +1,27 @@ +/* + * xco_platform_internal.h (arm64) — sizing and alignment constants. + * + * Found by the build's -I path (platform/arm64) and included by xco.h. + * Only #defines: the actual platform context struct definition is + * private to platform/arm64/xco_platform.c. + */ + +#ifndef XCO_PLATFORM_INTERNAL_H +#define XCO_PLATFORM_INTERNAL_H + +/* Stack alignment required by AAPCS at function call boundaries. */ +#define XCO_STACK_ALIGN 16 + +/* Implementation-detail constants used by xco.c to embed the platform + * context inside xco_impl_t without seeing the struct definition. + * platform/arm64/xco_platform.c verifies these match the real layout. */ +#define _XCO_CTX_SIZE 176 /* sizeof(struct xco_platform_ctx) */ +#define _XCO_CTX_ALIGN 16 /* _Alignof(struct xco_platform_ctx) */ + +/* Size and alignment of xco_t._private. Must hold an xco_impl_t + * (defined in xco.c): a platform context + ~3 words of bookkeeping. + * xco.c verifies sufficiency with _Static_assert. */ +#define XCO_SIZE (_XCO_CTX_SIZE + 48) +#define XCO_ALIGN _XCO_CTX_ALIGN + +#endif /* XCO_PLATFORM_INTERNAL_H */ diff --git a/platform/xco_platform_internal.h b/platform/xco_platform_internal.h @@ -0,0 +1,34 @@ +/* + * xco_platform.h — generic platform interface for xco.c. + * + * Declares the two primitives the platform layer must implement, plus + * the (forward-declared) context type. xco.c never derefs values of + * this type — it only passes pointers around — so the forward + * declaration is enough here. The full struct lives in the platform- + * specific xco_platform.c (under platform/<name>/). + */ + +#ifndef XCO_PLATFORM_H +#define XCO_PLATFORM_H + +#include <stddef.h> +#include <stdint.h> + +typedef struct xco_platform_ctx xco_platform_ctx_t; + +/* Initialize *ctx so that the next switch into it begins executing + * entry(value), where value is the uintptr_t handed to the switch. + * stack_base must be XCO_STACK_ALIGN-aligned. entry must not return. */ +void xco_platform_init(xco_platform_ctx_t *ctx, + void *stack_base, size_t stack_len, + void (*entry)(uintptr_t)); + +/* Save callee-saved state into *from, restore it from *to, and hand + * `value` to *to — as the return value of *to's previous switch, or + * as entry's argument on first switch into a fresh context. Returns + * the value passed by the next switch back to *from. */ +uintptr_t xco_platform_switch(xco_platform_ctx_t *from, + xco_platform_ctx_t *to, + uintptr_t value); + +#endif /* XCO_PLATFORM_H */ diff --git a/tests/test_event.c b/tests/test_event.c @@ -6,8 +6,7 @@ * the event, park if not ready, re-try after wake. */ -#include "event.h" -#include "xstep.h" +#include "xco.h" #include <assert.h> #include <stdio.h> diff --git a/xco.c b/xco.c @@ -1,31 +1,1219 @@ /* - * xco.c — minimal asymmetric coroutines, C11. + * xco.c — implementation for xco.h. * - * The platform layer (arch/<name>/xco_arch.c) supplies the register - * save/restore primitive (xco_platform_switch) and the initial-context - * setup (xco_platform_init). Everything else — the state machine, the - * resume chain, the trampoline wrapping user fn entry/exit, and the - * xco_self() TLS pointer — lives here. + * Two parts: + * - Event substrate (runtime, latch, semaphore, select/allof, channel, + * queue, broadcast, notify, timer, pairing heap, timeout, ticker, + * task group). Each event type is a small struct with a static vtable. + * Waitlists are intrusive linked lists; fire-all detaches the whole + * list before iterating so callbacks can do anything, including + * re-park or unpark sibling wakers, without iterator hazards. * - * The xstep_fn registered into base.step (xco_step) is the entry point - * all callers see — coroutines are driven through xstep() exactly like - * any other xstep_t. + * - Stack-switching coroutines (xco). The platform layer + * (arch/<name>/xco_arch.c) supplies the register save/restore + * primitive (xco_platform_switch) and the initial-context setup + * (xco_platform_init). Everything else — the state machine, the + * resume chain, the trampoline wrapping user fn entry/exit, and + * the xco_self() TLS pointer — lives here. * * This translation unit is architecture-neutral. The platform context * type is forward-declared (in xco_platform.h) and only ever referred - * to by pointer here, so its actual size and layout never cross into - * this file. We reserve raw space for it inside xco_impl_t using + * to by pointer, so its actual size and layout never cross into this + * file. We reserve raw space for it inside xco_impl_t using * _XCO_CTX_SIZE / _XCO_CTX_ALIGN from the arch's xco_arch.h, and cast * to xco_platform_ctx_t * when calling into the platform layer. */ #include "xco.h" -#include "xco_platform.h" +#include "xco_platform_internal.h" #include <assert.h> #include <stddef.h> #include <stdint.h> +/* ==================================================================== + * Runtime + * ==================================================================== */ + +/* rt_init and rt_enqueue are defined inline in xco.h. */ + +static waker_t *rt_dequeue(runtime_t *rt) { + waker_t *w = rt->head; + if (!w) return NULL; + rt->head = w->next; + if (!rt->head) rt->tail = NULL; + /* Hand the waker back fully detached. Every fire path already clears + * prev before firing (and we just walked off the ready-queue), so + * w->prev is NULL in practice — making it explicit here means + * step_waker users can re-park without re-init, regardless of which + * fire path resumed them. */ + w->next = NULL; + w->prev = NULL; + return w; +} + +void rt_run(runtime_t *rt, uint64_t now) { + /* The runtime ready queue holds only step-wakers. Other waker + * shapes (e.g. select_input) fire synchronously inside event + * notify paths and never reach here. + * + * If a timer source is attached, advance it at the top of each + * iteration: any timer whose deadline <= now fires, enqueueing its + * step-wakers, which the inner loop then drains. A step may insert + * a fresh already-expired timer; the outer loop catches it on the + * next pass. Termination: each pass either drains a non-empty + * queue or exits, and advance only fires timers it then removes, + * so total work is bounded. */ + for (;;) { + if (rt->timers) timers_advance(rt->timers, now); + if (!rt->head) return; + for (waker_t *w; (w = rt_dequeue(rt));) { + step_waker_t *sw = (step_waker_t *)w; + xstep(sw->step, sw->resume_value); + } + } +} + +/* ---- Step waker ------------------------------------------------------- */ + +/* Exposed (with leading underscore) so the inline step_waker_init in + * xco.h can install it without dragging the body into the header. */ +void _step_waker_fire(waker_t *w, uintptr_t value) { + step_waker_t *sw = (step_waker_t *)w; + sw->resume_value = value; + rt_enqueue(sw->rt, w); +} + +/* ==================================================================== + * Latch + * ==================================================================== */ + +static bool latch_try(event_t *e, uintptr_t *out) { + latch_t *l = (latch_t *)e; + if (!l->set) return false; + if (out) *out = l->value; + return true; +} + +static void latch_park(event_t *e, waker_t *w) { + latch_t *l = (latch_t *)e; + /* Single-threaded contract: caller must have just observed try=false. */ + assert(!l->set); + /* One-waker invariant: w must arrive clean. Detach paths (latch_set + * iterator, latch_unpark, select_event_deinit) all leave wakers in + * this state. A trip here means a double-park bug. */ + assert(!w->prev && !w->next); + w->next = l->waiters; + if (l->waiters) l->waiters->prev = w; + l->waiters = w; +} + +static void latch_unpark(event_t *e, waker_t *w) { + latch_t *l = (latch_t *)e; + /* Detect "not on this list" via the invariant maintained by park + * and the detach paths: a parked waker has prev set OR is the + * head; a detached one has prev == NULL and is not the head. */ + if (!w->prev && l->waiters != w) return; + if (w->prev) w->prev->next = w->next; + else l->waiters = w->next; + if (w->next) w->next->prev = w->prev; + w->prev = w->next = NULL; +} + +/* Exposed so the inline latch_init in xco.h can reference it. */ +const event_vtable_t _latch_vt = { + .try_ = latch_try, + .park = latch_park, + .unpark = latch_unpark, +}; + +void latch_set(latch_t *l, uintptr_t value) { + if (l->set) return; + l->set = true; + l->value = value; + + /* Detach the whole waitlist before firing. A waker's fire callback + * might do anything (including unpark a sibling on another event), + * but it cannot mutate this list — it's already gone. */ + waker_t *w = l->waiters; + l->waiters = NULL; + while (w) { + waker_t *next = w->next; /* save before waker_fire clears */ + waker_fire(w, value); + w = next; + } +} + +/* ==================================================================== + * Semaphore + * + * FIFO doubly-linked waitlist, same shape as the chan_q_* helpers below + * but specialized to a semaphore_t (so we don't have to thread the + * head/tail pair through chan_q_*). + * ==================================================================== */ + +static void sem_q_push(semaphore_t *s, waker_t *w) { + assert(!w->prev && !w->next); + w->prev = s->tail; + w->next = NULL; + if (s->tail) s->tail->next = w; + else s->head = w; + s->tail = w; +} + +static waker_t *sem_q_pop(semaphore_t *s) { + waker_t *w = s->head; + if (!w) return NULL; + s->head = w->next; + if (s->head) s->head->prev = NULL; + else s->tail = NULL; + w->prev = w->next = NULL; + return w; +} + +static void sem_q_remove(semaphore_t *s, waker_t *w) { + if (!w->prev && s->head != w) return; + if (w->prev) w->prev->next = w->next; + else s->head = w->next; + if (w->next) w->next->prev = w->prev; + else s->tail = w->prev; + w->prev = w->next = NULL; +} + +static bool semaphore_try(event_t *e, uintptr_t *out) { + semaphore_t *s = (semaphore_t *)e; + if (s->permits == 0) return false; + s->permits--; + if (out) *out = 1; + return true; +} + +static void semaphore_park(event_t *e, waker_t *w) { + semaphore_t *s = (semaphore_t *)e; + /* Single-threaded contract: caller just observed try=false (permits=0). */ + assert(s->permits == 0); + sem_q_push(s, w); +} + +static void semaphore_unpark(event_t *e, waker_t *w) { + semaphore_t *s = (semaphore_t *)e; + sem_q_remove(s, w); +} + +const event_vtable_t _semaphore_acquire_vt = { + .try_ = semaphore_try, + .park = semaphore_park, + .unpark = semaphore_unpark, +}; + +void semaphore_release(semaphore_t *s, size_t n) { + /* Hand a permit directly to each FIFO waiter, then drop any leftover + * into the count. Direct handoff prevents a fresh try from jumping + * the queue: an arriving acquirer that called try_ would see permits=0 + * and park behind the existing waiters until everyone ahead has been + * served. */ + while (n > 0) { + waker_t *w = sem_q_pop(s); + if (!w) break; + n--; + /* Fire value is conventional 1 — "you got a permit". Step-waker + * users ignore the value; select inputs capture it as the input's + * value field. */ + waker_fire(w, 1); + } + s->permits += n; +} + +/* ==================================================================== + * Select / all-of + * ==================================================================== */ + +/* One fire callback serves both modes. A counter `remaining` is decremented + * on each fire; done is set when it hits 0. select inits remaining=1 (any + * one fire closes); allof inits remaining=n (every input must fire). The + * disarm-siblings loop is a no-op for already-fired wakers, so it runs + * uniformly: for select it cleans up still-parked losers, for allof it + * does nothing (every sibling is already detached by its source). */ +static void select_input_fire(waker_t *w, uintptr_t value) { + select_input_t *in = (select_input_t *)w; + select_event_t *s = in->parent; + + /* Defensive: guard against any straggler that escaped disarm. */ + if (s->done.set) return; + /* Capture the input's payload before resuming anyone. Sticky + * sources also keep it on themselves; transient sources (channels) + * deliver only here, so this is the only durable record. */ + in->value = value; + if (--s->remaining > 0) return; + + size_t i = (size_t)(in - s->inputs); + /* Disarm anyone still parked so their wakers don't dangle on input + * waitlists past s's lifetime. Idempotent on already-detached wakers. */ + for (size_t j = 0; j < s->n; j++) { + if (j != i) event_unpark(s->inputs[j].src, &s->inputs[j].w); + } + latch_set(&s->done, i); +} + +void select_event_init(select_event_t *s, + select_input_t *inputs, size_t n, + event_t *const *srcs) { + latch_init(&s->done); + s->inputs = inputs; + s->n = n; + s->remaining = 1; /* any one fire closes the wait */ + + /* Fast path: an input already ready. Fire and skip parking entirely + * so deinit has nothing to disarm. */ + for (size_t i = 0; i < n; i++) { + uintptr_t v; + if (event_try(srcs[i], &v)) { + inputs[i].value = v; /* captured for inputs[winner].value */ + latch_set(&s->done, i); + return; + } + } + + for (size_t i = 0; i < n; i++) { + inputs[i].w.next = NULL; + inputs[i].w.prev = NULL; + inputs[i].w.fire = select_input_fire; + inputs[i].src = srcs[i]; + inputs[i].parent = s; + inputs[i].value = 0; + event_park(srcs[i], &inputs[i].w); + } +} + +void allof_event_init(select_event_t *s, + select_input_t *inputs, size_t n, + event_t *const *srcs) { + latch_init(&s->done); + s->inputs = inputs; + s->n = n; + s->remaining = n; /* every input must fire to close */ + + if (n == 0) { latch_set(&s->done, 0); return; } + + /* Initialize each input then try-or-park. An already-ready input is + * consumed inline (value captured, remaining--, no parking); the + * rest park. If everyone was inline, fire done at the end. */ + for (size_t i = 0; i < n; i++) { + inputs[i].w.next = NULL; + inputs[i].w.prev = NULL; + inputs[i].w.fire = select_input_fire; + inputs[i].src = srcs[i]; + inputs[i].parent = s; + inputs[i].value = 0; + + uintptr_t v; + if (event_try(srcs[i], &v)) { + inputs[i].value = v; + s->remaining--; + } else { + event_park(srcs[i], &inputs[i].w); + } + } + + /* All inline-ready: fire done with the last input's index, matching + * the "closing index" semantics of the parked path. */ + if (s->remaining == 0) latch_set(&s->done, n - 1); +} + +void select_event_deinit(select_event_t *s) { + /* done.set => the closing fire already disarmed everyone (or the + * fast path skipped parking entirely). Otherwise — possible after a + * partial allof — some inputs may still be parked; unpark is + * idempotent for already-detached wakers. */ + if (s->done.set) return; + for (size_t i = 0; i < s->n; i++) { + event_unpark(s->inputs[i].src, &s->inputs[i].w); + } +} + +/* ==================================================================== + * Channel + * + * Doubly-linked FIFO push/pop on a channel waitlist. Same shape as + * latch's list operations but with an explicit tail so arrival order + * is preserved (unlike latch, where waiter order is irrelevant). + * ==================================================================== */ + +static void chan_q_push(waker_t **head, waker_t **tail, waker_t *w) { + assert(!w->prev && !w->next); + w->prev = *tail; + w->next = NULL; + if (*tail) (*tail)->next = w; + else *head = w; + *tail = w; +} + +static waker_t *chan_q_pop(waker_t **head, waker_t **tail) { + waker_t *w = *head; + if (!w) return NULL; + *head = w->next; + if (*head) (*head)->prev = NULL; + else *tail = NULL; + w->prev = w->next = NULL; + return w; +} + +static void chan_q_remove(waker_t **head, waker_t **tail, waker_t *w) { + /* Same not-on-list test as latch_unpark: a queued waker has prev + * set OR is the head; a detached one has prev == NULL and isn't + * the head. */ + if (!w->prev && *head != w) return; + if (w->prev) w->prev->next = w->next; + else *head = w->next; + if (w->next) w->next->prev = w->prev; + else *tail = w->prev; + w->prev = w->next = NULL; +} + +/* Recover the chan_t from its embedded recv event. */ +static inline chan_t *chan_of_recv(event_t *e) { + return (chan_t *)((char *)e - offsetof(chan_t, recv)); +} + +static bool chan_recv_try(event_t *e, uintptr_t *out) { + chan_t *c = chan_of_recv(e); + waker_t *w = chan_q_pop(&c->send_head, &c->send_tail); + if (w) { + /* w is &csw->sw.base; sw is the first field of chan_send_waker_t, + * and base is the first field of step_waker_t, so addresses align. */ + chan_send_waker_t *csw = (chan_send_waker_t *)w; + if (out) *out = csw->value; + csw->delivered = true; + /* Resume the sender. The fire value is unused for step-wakers; + * for op senders, _chan_send_op_fire reads csw->delivered. */ + waker_fire(w, 0); + return true; + } + /* Close makes the recv event "ready" with no value: the receiver is + * expected to call chan_recv to learn it's RECV_CLOSED. *out is + * undefined in that case (set to 0 here for determinism). */ + if (c->closed) { + if (out) *out = 0; + return true; + } + return false; +} + +static void chan_recv_park(event_t *e, waker_t *w) { + chan_t *c = chan_of_recv(e); + chan_q_push(&c->recv_head, &c->recv_tail, w); +} + +static void chan_recv_unpark(event_t *e, waker_t *w) { + chan_t *c = chan_of_recv(e); + chan_q_remove(&c->recv_head, &c->recv_tail, w); +} + +const event_vtable_t _chan_recv_vt = { + .try_ = chan_recv_try, + .park = chan_recv_park, + .unpark = chan_recv_unpark, +}; + +bool chan_try_send(chan_t *c, uintptr_t value) { + if (c->closed) return false; + waker_t *w = chan_q_pop(&c->recv_head, &c->recv_tail); + if (!w) return false; + /* Hand the value to the recv-side waker. step_waker stashes it as + * resume_value; select_input_fire stashes it in input.value. */ + waker_fire(w, value); + return true; +} + +void chan_park_send(chan_t *c, chan_send_waker_t *csw) { + /* park_send after close is UB — caller must check chan_is_closed + * (typically via chan_try_send returning false plus chan_is_closed). */ + assert(!c->closed); + chan_q_push(&c->send_head, &c->send_tail, &csw->sw.base); +} + +void chan_unpark_send(chan_t *c, chan_send_waker_t *csw) { + chan_q_remove(&c->send_head, &c->send_tail, &csw->sw.base); +} + +recv_status_t chan_recv(chan_t *c, uintptr_t *out) { + waker_t *w = chan_q_pop(&c->send_head, &c->send_tail); + if (w) { + chan_send_waker_t *csw = (chan_send_waker_t *)w; + if (out) *out = csw->value; + csw->delivered = true; + waker_fire(w, 0); + return RECV_GOT; + } + if (c->closed) return RECV_CLOSED; + return RECV_EMPTY; +} + +void chan_close(chan_t *c) { + if (c->closed) return; + c->closed = true; + + /* Drain parked senders with delivered=false. waker_fire detaches + * before invoking the callback, so re-park inside fire (e.g. to + * land on the runtime ready queue) is safe. */ + waker_t *w; + while ((w = chan_q_pop(&c->send_head, &c->send_tail)) != NULL) { + chan_send_waker_t *csw = (chan_send_waker_t *)w; + csw->delivered = false; + waker_fire(w, 0); + } + /* Wake parked receivers so they observe RECV_CLOSED via chan_recv. + * Fire value is irrelevant — the recv event_try will return true + * because c->closed is set, but receivers should use chan_recv. */ + while ((w = chan_q_pop(&c->recv_head, &c->recv_tail)) != NULL) { + waker_fire(w, 0); + } +} + +/* ---- Send op (selectable send) ---------------------------------------- */ + +void _chan_send_op_fire(waker_t *w, uintptr_t value) { + /* Receiver hands no payload to the sender on delivery — the sender + * learns whether its value reached someone via op->csw.delivered, + * set by chan_recv* (true) or chan_close (false). */ + (void)value; + /* csw is the first field of chan_send_op_t; sw is first of + * chan_send_waker_t; base is first of step_waker_t. All offsets + * coincide, so w aliases op. */ + chan_send_op_t *op = (chan_send_op_t *)w; + latch_set(&op->done, op->csw.delivered ? 1 : 0); +} + +void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value) { + /* The embedded chan_send_waker carries the value and provides the + * waker layout the chan's send list expects. rt/step are unused — + * fire goes straight to the latch, no scheduler hop. */ + chan_send_waker_init(&op->csw, NULL, NULL, value); + op->csw.sw.base.fire = _chan_send_op_fire; + op->chan = c; + latch_init(&op->done); + + if (c->closed) { + /* Closed channel: no delivery possible, resolve immediately. */ + latch_set(&op->done, 0); + return; + } + if (chan_try_send(c, value)) { + /* Inline delivery: no parking, done set immediately. */ + op->csw.delivered = true; + latch_set(&op->done, 1); + return; + } + chan_park_send(c, &op->csw); +} + +void chan_send_op_deinit(chan_send_op_t *op) { + /* If delivered, the waker is already off the send list. If not + * (e.g., select cancellation), pull it off so it doesn't dangle. */ + if (op->done.set) return; + chan_unpark_send(op->chan, &op->csw); +} + +/* ==================================================================== + * Queue + * + * The FIFO list helpers (chan_q_push/pop/remove) are reused for the + * queue's send and recv waitlists — same shape, same invariants. The + * ring buffer lives in caller-provided storage; we just track head and + * len. cap == 0 leaves the buffer logic dormant: every send either + * direct-hands or parks, every recv either takes from a parked sender + * or parks — i.e. it degenerates to chan rendezvous. + * ==================================================================== */ + +static inline queue_t *queue_of_recv(event_t *e) { + return (queue_t *)((char *)e - offsetof(queue_t, recv)); +} + +static inline void queue_push_buf(queue_t *q, uintptr_t v) { + assert(q->len < q->cap); + q->buf[(q->head + q->len) % q->cap] = v; + q->len++; +} + +static inline uintptr_t queue_pop_buf(queue_t *q) { + assert(q->len > 0); + uintptr_t v = q->buf[q->head]; + q->head = (q->head + 1) % q->cap; + q->len--; + return v; +} + +/* Pop one parked sender's value into the now-free buffer slot, firing + * the sender. Caller must have ensured a free slot exists (just popped + * from the buffer, or cap > len). Maintains FIFO across the buffer + + * sender-waitlist boundary: oldest buffered values come out before any + * sender's value (which was queued later). No-op if no sender parked. */ +static void queue_drain_one_sender(queue_t *q) { + if (!q->send_head) return; + waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); + queue_send_waker_t *qsw = (queue_send_waker_t *)w; + queue_push_buf(q, qsw->value); + qsw->delivered = true; + /* Fire after pushing so the sender sees its delivery as complete. */ + waker_fire(w, 0); +} + +static bool queue_recv_try(event_t *e, uintptr_t *out) { + queue_t *q = queue_of_recv(e); + if (q->len > 0) { + uintptr_t v = queue_pop_buf(q); + if (out) *out = v; + queue_drain_one_sender(q); + return true; + } + /* Empty buffer. If a sender is parked here it can only mean cap==0 + * (otherwise the sender would have used the buffer). Hand directly. */ + if (q->send_head) { + waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); + queue_send_waker_t *qsw = (queue_send_waker_t *)w; + if (out) *out = qsw->value; + qsw->delivered = true; + waker_fire(w, 0); + return true; + } + /* Closed and drained: receivers learn EOF via queue_recv; out is + * undefined. */ + if (q->closed) { + if (out) *out = 0; + return true; + } + return false; +} + +static void queue_recv_park(event_t *e, waker_t *w) { + queue_t *q = queue_of_recv(e); + chan_q_push(&q->recv_head, &q->recv_tail, w); +} + +static void queue_recv_unpark(event_t *e, waker_t *w) { + queue_t *q = queue_of_recv(e); + chan_q_remove(&q->recv_head, &q->recv_tail, w); +} + +const event_vtable_t _queue_recv_vt = { + .try_ = queue_recv_try, + .park = queue_recv_park, + .unpark = queue_recv_unpark, +}; + +bool queue_try_send(queue_t *q, uintptr_t value) { + if (q->closed) { + /* Send-after-close. BLOCK: no delivery, signal failure. DROP_*: + * silently drop (queue policy already says "may be lost"). */ + if (q->policy == QUEUE_BLOCK) return false; + return true; + } + /* Direct handoff first: parked receivers always win over the buffer. + * This is the rendezvous case and the cap==0 case. */ + waker_t *w = chan_q_pop(&q->recv_head, &q->recv_tail); + if (w) { + waker_fire(w, value); + return true; + } + if (q->len < q->cap) { + queue_push_buf(q, value); + return true; + } + /* Buffer full and no waiting receiver. */ + switch (q->policy) { + case QUEUE_BLOCK: + return false; + case QUEUE_DROP_NEWEST: + return true; + case QUEUE_DROP_OLDEST: + (void)queue_pop_buf(q); + queue_push_buf(q, value); + return true; + } + __builtin_unreachable(); +} + +void queue_park_send(queue_t *q, queue_send_waker_t *qsw) { + /* DROP_* never parks (try_send always returns true); only valid + * for BLOCK. park_send after close is UB. */ + assert(q->policy == QUEUE_BLOCK); + assert(!q->closed); + chan_q_push(&q->send_head, &q->send_tail, &qsw->sw.base); +} + +void queue_unpark_send(queue_t *q, queue_send_waker_t *qsw) { + chan_q_remove(&q->send_head, &q->send_tail, &qsw->sw.base); +} + +recv_status_t queue_recv(queue_t *q, uintptr_t *out) { + if (q->len > 0) { + uintptr_t v = queue_pop_buf(q); + if (out) *out = v; + queue_drain_one_sender(q); + return RECV_GOT; + } + if (q->send_head) { + waker_t *w = chan_q_pop(&q->send_head, &q->send_tail); + queue_send_waker_t *qsw = (queue_send_waker_t *)w; + if (out) *out = qsw->value; + qsw->delivered = true; + waker_fire(w, 0); + return RECV_GOT; + } + if (q->closed) return RECV_CLOSED; + return RECV_EMPTY; +} + +void queue_close(queue_t *q) { + if (q->closed) return; + q->closed = true; + + /* Drain parked senders with delivered=false. Senders only park + * under BLOCK, so this is no-op for DROP_* (their waitlist is + * always empty). */ + waker_t *w; + while ((w = chan_q_pop(&q->send_head, &q->send_tail)) != NULL) { + queue_send_waker_t *qsw = (queue_send_waker_t *)w; + qsw->delivered = false; + waker_fire(w, 0); + } + /* Wake parked receivers so they can observe closed via queue_recv. + * Receivers may still drain buffered values first — queue_recv's + * RECV_GOT path is hit before the RECV_CLOSED branch. */ + while ((w = chan_q_pop(&q->recv_head, &q->recv_tail)) != NULL) { + waker_fire(w, 0); + } +} + +/* ---- Queue send op (selectable send) ---------------------------------- */ + +void _queue_send_op_fire(waker_t *w, uintptr_t value) { + (void)value; + queue_send_op_t *op = (queue_send_op_t *)w; + latch_set(&op->done, op->qsw.delivered ? 1 : 0); +} + +void queue_send_op_init(queue_send_op_t *op, queue_t *q, uintptr_t value) { + queue_send_waker_init(&op->qsw, NULL, NULL, value); + op->qsw.sw.base.fire = _queue_send_op_fire; + op->queue = q; + latch_init(&op->done); + + if (q->closed) { + /* BLOCK: no delivery; DROP_*: dropped per policy. Either way the + * value did not reach a receiver, so delivered=false. */ + latch_set(&op->done, 0); + return; + } + if (queue_try_send(q, value)) { + /* Inline accept: handoff to receiver, buffered, or DROP_* policy + * accepted it. */ + op->qsw.delivered = true; + latch_set(&op->done, 1); + return; + } + /* Only BLOCK policy with full buffer reaches here. */ + queue_park_send(q, &op->qsw); +} + +void queue_send_op_deinit(queue_send_op_t *op) { + if (op->done.set) return; + queue_unpark_send(op->queue, &op->qsw); +} + +/* ==================================================================== + * Broadcast (slot) + * + * The waitlist uses the same doubly-linked LIFO shape as latch — there + * is no FIFO requirement because publish wakes everyone at once. The + * key difference from latch is publish vs set: publish never marks a + * sticky bit on the event, so try always returns false (subscribers + * wait for the *next* publish), and the waitlist is reusable across + * publishes — subscribers re-park to receive subsequent values. + * ==================================================================== */ + +static bool broadcast_try(event_t *e, uintptr_t *out) { + (void)e; (void)out; + return false; +} + +static void broadcast_park(event_t *e, waker_t *w) { + broadcast_t *b = (broadcast_t *)e; + assert(!w->prev && !w->next); + w->next = b->waiters; + if (b->waiters) b->waiters->prev = w; + b->waiters = w; +} + +static void broadcast_unpark(event_t *e, waker_t *w) { + broadcast_t *b = (broadcast_t *)e; + if (!w->prev && b->waiters != w) return; + if (w->prev) w->prev->next = w->next; + else b->waiters = w->next; + if (w->next) w->next->prev = w->prev; + w->prev = w->next = NULL; +} + +const event_vtable_t _broadcast_vt = { + .try_ = broadcast_try, + .park = broadcast_park, + .unpark = broadcast_unpark, +}; + +void broadcast_publish(broadcast_t *b, uintptr_t value) { + b->has_value = true; + b->value = value; + + /* Detach the waitlist before iterating — same hazard-free pattern as + * latch_set. A waker's fire callback may re-park itself on us (the + * common case for a re-arming subscriber); decoupling means that + * re-park lands on a fresh waitlist, not on the snapshot we're + * walking. */ + waker_t *w = b->waiters; + b->waiters = NULL; + while (w) { + waker_t *next = w->next; /* save before waker_fire clears */ + waker_fire(w, value); + w = next; + } +} + +/* ==================================================================== + * Notify + * + * Doubly-linked FIFO waitlist (same shape as the chan/queue waitlists). + * notify_one fires the head; notify_all detaches the whole list before + * iterating so callbacks can re-park onto a fresh waitlist without + * iterator hazards (same pattern as latch_set). event_try is always + * false: notify is purely transient. + * ==================================================================== */ + +static bool notify_try(event_t *e, uintptr_t *out) { + (void)e; (void)out; + return false; +} + +static void notify_park(event_t *e, waker_t *w) { + notify_t *n = (notify_t *)e; + chan_q_push(&n->head, &n->tail, w); +} + +static void notify_unpark(event_t *e, waker_t *w) { + notify_t *n = (notify_t *)e; + chan_q_remove(&n->head, &n->tail, w); +} + +const event_vtable_t _notify_vt = { + .try_ = notify_try, + .park = notify_park, + .unpark = notify_unpark, +}; + +void notify_one(notify_t *n) { + waker_t *w = chan_q_pop(&n->head, &n->tail); + if (!w) return; + waker_fire(w, 0); +} + +void notify_all(notify_t *n) { + /* Detach before iterating: re-parking inside fire lands on a fresh + * (empty) list. Walk the snapshot via saved next pointers. */ + waker_t *w = n->head; + n->head = n->tail = NULL; + while (w) { + waker_t *next = w->next; + waker_fire(w, 0); + w = next; + } +} + +/* ==================================================================== + * Timer + * + * The timer-as-event surface is just the embedded latch's vtable: try + * checks done.set, park/unpark mutate done.waiters. The timer source + * triggers the latch from advance(). No bespoke timer vtable needed. + * ==================================================================== */ + +void timer_init(timer_t *t, timers_t *ts, uint64_t deadline) { + latch_init(&t->done); + t->deadline = deadline; + t->src = ts; + t->in_heap = false; + t->child = NULL; + t->prev = NULL; + t->next = NULL; + timers_insert(ts, t); /* sets in_heap = true */ +} + +void timer_deinit(timer_t *t) { + /* Idempotent. After fire, in_heap is false (advance popped us); + * after explicit cancel, also false. Otherwise we're still queued. */ + if (t->in_heap) timers_cancel(t->src, t); +} + +/* ==================================================================== + * Pairing heap + * + * Standard intrusive pairing heap. Each node carries three link fields: + * + * child : head of children sibling list (NULL if leaf). + * prev : parent if this node is its parent's first child; + * previous sibling otherwise; NULL only for a detached tree + * root (including h->root). + * next : next sibling, or NULL for the last child / a detached root. + * + * The "prev points to either parent or a sibling" trick lets us splice + * a node out in O(1) without an extra parent pointer: parent->child==n + * distinguishes "first child" from "non-first sibling." + * + * meld picks the smaller-deadline root as winner and grafts the loser + * as its new first child. Pop-min and remove both rebuild via the + * classic two-pass pairwise merge of the resulting children list. + * ==================================================================== */ + +/* Merge two detached subtree roots (each with prev=next=NULL). Returns + * the merged root (also detached: prev=next=NULL). */ +static timer_t *ph_meld(timer_t *a, timer_t *b) { + if (!a) return b; + if (!b) return a; + timer_t *small, *large; + if (a->deadline <= b->deadline) { small = a; large = b; } + else { small = b; large = a; } + /* Graft `large` as the new first child of `small`. */ + large->next = small->child; + if (small->child) small->child->prev = large; + large->prev = small; /* parent link via prev */ + small->child = large; + small->prev = NULL; + small->next = NULL; + return small; +} + +/* Two-pass pairwise meld of a children sibling list. Detaches each node + * before melding so meld inputs satisfy its prev=next=NULL contract. + * The output has prev=next=NULL. */ +static timer_t *ph_merge_pairs(timer_t *first) { + /* Pass 1: walk the sibling list left-to-right, melding consecutive + * pairs. Chain results via `next` (ab)use as a temporary list link. */ + timer_t *list = NULL; + while (first) { + timer_t *a = first; + timer_t *b = a->next; + timer_t *rest = b ? b->next : NULL; + a->prev = a->next = NULL; + if (b) { b->prev = b->next = NULL; } + timer_t *m = ph_meld(a, b); + m->next = list; /* prepend to pass-1 list */ + list = m; + first = rest; + } + /* Pass 2: meld the pass-1 list into a single root. */ + timer_t *acc = NULL; + while (list) { + timer_t *nxt = list->next; + list->prev = NULL; + list->next = NULL; + acc = ph_meld(acc, list); + list = nxt; + } + return acc; +} + +/* Detach n from the tree (must currently be in the heap). Returns the + * (possibly new) main-heap root. n is left fully detached: child still + * points to its subtree, but prev/next are NULL — caller decides what + * to do with that subtree. */ +static timer_t *ph_detach(pairing_heap_t *h, timer_t *n) { + if (h->root == n) { + /* n is the main root; pop it and rebuild from its children. */ + timer_t *new_root = ph_merge_pairs(n->child); + n->child = NULL; + n->prev = NULL; + n->next = NULL; + return new_root; + } + /* n has a parent (recorded via prev — either as its parent's first + * child or as some sibling's successor). Splice out of the sibling + * list. */ + if (n->prev->child == n) { + /* First child: parent's child link skips us. */ + n->prev->child = n->next; + } else { + /* Mid/last sibling: previous sibling's next skips us. */ + n->prev->next = n->next; + } + if (n->next) n->next->prev = n->prev; + n->prev = NULL; + n->next = NULL; + /* The main-heap root is unchanged structurally; the caller of this + * function decides how (or whether) to reintroduce n's subtree. */ + return h->root; +} + +static void ph_insert(timers_t *ts, timer_t *t) { + pairing_heap_t *h = (pairing_heap_t *)ts; + /* Singleton tree (prev/next/child already NULL via timer_init). */ + h->root = ph_meld(h->root, t); + t->in_heap = true; +} + +static void ph_cancel(timers_t *ts, timer_t *t) { + pairing_heap_t *h = (pairing_heap_t *)ts; + if (!t->in_heap) return; + h->root = ph_detach(h, t); + /* Now meld t's subtree (its children) back into the main heap. */ + timer_t *sub = ph_merge_pairs(t->child); + t->child = NULL; + h->root = ph_meld(h->root, sub); + t->in_heap = false; +} + +static void ph_advance(timers_t *ts, uint64_t now) { + pairing_heap_t *h = (pairing_heap_t *)ts; + /* Pop while the min-key timer is due. Each fire may run callbacks + * that insert *new* timers (with later deadlines, normally) — those + * land back in the heap, and we keep checking the root. */ + while (h->root && h->root->deadline <= now) { + timer_t *t = h->root; + h->root = ph_merge_pairs(t->child); + t->child = NULL; + t->prev = NULL; + t->next = NULL; + t->in_heap = false; + /* Trigger the latch: drains the waitlist and delivers the + * deadline as the fire payload. */ + latch_set(&t->done, (uintptr_t)t->deadline); + } +} + +static bool ph_peek(const timers_t *ts, uint64_t *out) { + const pairing_heap_t *h = (const pairing_heap_t *)ts; + if (!h->root) return false; + if (out) *out = h->root->deadline; + return true; +} + +const timers_vtable_t _pairing_heap_vt = { + .insert = ph_insert, + .cancel = ph_cancel, + .advance = ph_advance, + .peek = ph_peek, +}; + +/* ==================================================================== + * Timeout + * ==================================================================== */ + +/* Bridge waker: parked on the timer's latch, fires the cancel when the + * timer fires. Two-step indirection so cancel can already have its own + * waiters (e.g. a wait_or_cancel select) without the timer's waitlist + * caring about cancel internals. */ +static void _timeout_bridge_fire(waker_t *w, uintptr_t value) { + (void)value; + timeout_t *to = (timeout_t *)((char *)w - offsetof(timeout_t, bridge)); + cancel_set(&to->cancel); +} + +void timeout_init(timeout_t *to, timers_t *ts, uint64_t deadline) { + timer_init(&to->timer, ts, deadline); + cancel_init(&to->cancel); + to->bridge.next = NULL; + to->bridge.prev = NULL; + to->bridge.fire = _timeout_bridge_fire; + /* Park the bridge on the timer. If the timer fires, latch_set + * detaches the bridge and calls our fire callback inline. */ + event_park(timer_event(&to->timer), &to->bridge); +} + +void timeout_deinit(timeout_t *to) { + /* unpark is idempotent (no-op if the bridge already fired or was + * never parked). timer_deinit is idempotent on the in_heap flag. */ + event_unpark(timer_event(&to->timer), &to->bridge); + timer_deinit(&to->timer); +} + +/* ==================================================================== + * Ticker + * + * The ticker's event surface uses the broadcast-style LIFO doubly-linked + * waitlist: subscribers are fired all-at-once on each tick, so order + * doesn't matter; doubly-linked gives O(1) unpark for cancellation. + * ==================================================================== */ + +static bool ticker_try(event_t *e, uintptr_t *out) { + (void)e; (void)out; + return false; /* transient — wait for the next tick */ +} + +static void ticker_park(event_t *e, waker_t *w) { + ticker_t *t = (ticker_t *)((char *)e - offsetof(ticker_t, base)); + assert(!w->prev && !w->next); + w->next = t->waiters; + if (t->waiters) t->waiters->prev = w; + t->waiters = w; +} + +static void ticker_unpark(event_t *e, waker_t *w) { + ticker_t *t = (ticker_t *)((char *)e - offsetof(ticker_t, base)); + if (!w->prev && t->waiters != w) return; + if (w->prev) w->prev->next = w->next; + else t->waiters = w->next; + if (w->next) w->next->prev = w->prev; + w->prev = w->next = NULL; +} + +const event_vtable_t _ticker_vt = { + .try_ = ticker_try, + .park = ticker_park, + .unpark = ticker_unpark, +}; + +/* Bridge waker: parks on the underlying timer's latch. On fire, compute + * the next deadline (skip-ahead-safe), reinstall the timer, re-park the + * bridge on the new timer, then fire every parked subscriber with the + * just-fired deadline. The waitlist is detached before iteration so + * subscribers can re-park inside their fire callbacks. */ +static void _ticker_bridge_fire(waker_t *w, uintptr_t value) { + ticker_t *t = (ticker_t *)((char *)w - offsetof(ticker_t, bridge)); + uint64_t fired = (uint64_t)value; + uint64_t next = fired + t->period; + /* Skip-ahead: in the rare overflow case (period = 0 or wraparound), + * step forward enough to keep next > fired. */ + if (next <= fired) { + next += ((fired - next) / t->period + 1) * t->period; + } + /* Reinstall the timer for the next tick. The latch's storage is + * reused — timer_init runs latch_init on it. */ + timer_init(&t->timer, t->src, next); + /* Bridge waker is fully detached (waker_fire just cleared its + * links); park it on the freshly-armed timer. */ + event_park(timer_event(&t->timer), &t->bridge); + + /* Fire the subscribers. Detach the waitlist first so re-park inside + * fire lands on the now-empty list. */ + waker_t *waiters = t->waiters; + t->waiters = NULL; + while (waiters) { + waker_t *nxt = waiters->next; + waker_fire(waiters, (uintptr_t)fired); + waiters = nxt; + } +} + +void ticker_init(ticker_t *t, timers_t *ts, + uint64_t period, uint64_t first_deadline) { + /* period must be positive — the skip-ahead computation in the bridge + * divides by period, and a zero-period ticker would loop forever + * inside ph_advance. */ + assert(period > 0); + t->base.vt = &_ticker_vt; + t->src = ts; + t->period = period; + t->waiters = NULL; + + timer_init(&t->timer, ts, first_deadline); + + t->bridge.next = NULL; + t->bridge.prev = NULL; + t->bridge.fire = _ticker_bridge_fire; + event_park(timer_event(&t->timer), &t->bridge); +} + +void ticker_deinit(ticker_t *t) { + /* Pull the bridge off the timer (no-op if already fired) and cancel + * the timer. Subscribers' wakers are the caller's storage; nothing + * to free here. */ + event_unpark(timer_event(&t->timer), &t->bridge); + timer_deinit(&t->timer); +} + +/* ==================================================================== + * Task group + * + * Each attach contributes one to the countdown and parks a bridge waker + * on the task's done event. Bridge fire splices the slot out of the + * group's list and decrements the countdown — the join event fires when + * the last attached task finishes. + * + * Cancellation is fan-out: walk the list, set each task's cancel, then + * set the group-level cancel. Bodies cooperate by composing their work + * with task_cancel(self); the group-level cancel is for non-task + * waiters that want to react to "the group has been told to stop." + * ==================================================================== */ + +static void _task_group_detach_slot(task_group_t *g, group_attach_t *slot) { + /* Doubly-linked, head/tail tracked; same shape as other waitlists. */ + if (slot->prev) slot->prev->next = slot->next; + else g->head = slot->next; + if (slot->next) slot->next->prev = slot->prev; + else g->tail = slot->prev; + slot->prev = slot->next = NULL; +} + +static void _task_group_bridge_fire(waker_t *w, uintptr_t value) { + (void)value; + group_attach_t *slot = (group_attach_t *)((char *)w - offsetof(group_attach_t, bridge)); + task_group_t *g = slot->group; + _task_group_detach_slot(g, slot); + countdown_done(&g->pending); +} + +void task_group_init(task_group_t *g) { + /* Don't go through countdown_init(0) — that fires the latch + * immediately, which would make the very first attach's + * countdown_add UB. The group's join must remain not-fired until at + * least one attached task has finished, so we open with + * remaining=0 and an unset latch. The first attach lifts remaining + * to 1, and matching countdown_dones bring it back to 0, firing + * the latch. */ + latch_init(&g->pending.done); + g->pending.remaining = 0; + cancel_init(&g->cancel); + g->head = g->tail = NULL; +} + +void task_group_attach(task_group_t *g, task_t *t, group_attach_t *slot) { + countdown_add(&g->pending, 1); + + slot->task = t; + slot->group = g; + + /* Append to the group's list (FIFO; ordering doesn't affect cancel + * fan-out semantics, but consistent with other waitlists in the + * codebase). */ + slot->prev = g->tail; + slot->next = NULL; + if (g->tail) g->tail->next = slot; + else g->head = slot; + g->tail = slot; + + slot->bridge.next = NULL; + slot->bridge.prev = NULL; + slot->bridge.fire = _task_group_bridge_fire; + + /* Park on the task's done event. If the task has already finished + * (re-attaching is UB per the contract, but if the task fired + * before attach finished initializing — e.g. an inline-spawned + * synchronous task), the latch_park assert would catch it. */ + event_park(task_done_event(t), &slot->bridge); +} + +void task_group_cancel(task_group_t *g) { + /* Fan-out cancel: signal each attached task. Walk the snapshot + * (cancel doesn't detach the slot — only task done does — so the + * list is stable across iteration). */ + for (group_attach_t *s = g->head; s; s = s->next) { + cancel_set(&s->task->cancel); + } + /* Group-level cancel for anyone awaiting "the group as a whole." */ + cancel_set(&g->cancel); +} + +/* ==================================================================== + * xco — coroutines + * ==================================================================== */ + typedef struct xco_impl { xstep_t base; /* must be first; aliases xco_t.base */ _Alignas(_XCO_CTX_ALIGN) unsigned char ctx_buf[_XCO_CTX_SIZE]; diff --git a/xco.h b/xco.h @@ -1,36 +1,1011 @@ /* - * xco.h — minimal asymmetric coroutines built atop xstep. C11. + * xco.h — minimal asymmetric coroutines + event substrate. C11. * - * A coroutine is a (program counter, stack) pair that can be resumed - * and suspended. xco_t embeds xstep_t as its first member, so a - * coroutine is one concrete kind of resumable function: generic code - * holding an xstep_t * works on coroutines and hand-coded state - * machines uniformly. Values pass between caller and coroutine - * through a single uintptr_t channel; pack richer data behind a - * pointer. + * Four layers in this header, bottom-up: * - * Asymmetric: xco_suspend always returns to the most recent resumer. - * Resumes nest like function calls. + * xstep_t Generic resumable function. A value that can be + * driven forward one step at a time; each step takes a + * uintptr_t in, returns one out, and reports whether + * the function suspended or finished. Substrate shared + * by stack-switching coroutines (xco) and hand-coded + * state machines. * - * Thread affinity: a coroutine must be resumed on the thread that - * initialized it. Cross-thread migration is not supported. + * waker / event / runtime + * Pollable event substrate (try / park / unpark) with a + * single-threaded FIFO ready queue. Concrete events: + * latch, countdown, notify, semaphore/mutex, select & + * allof, channel, queue, broadcast, cancel, timer, + * pairing-heap timer source, timeout, ticker, task, + * task_group. All storage caller-provided, no allocation, + * no atomics. * - * Teardown: this layer does not unwind a suspended coroutine's stack. - * Drive a coroutine to return (e.g. by passing a cancel sentinel it - * is expected to handle) before freeing its stack memory. + * xco_t Stack-switching coroutine. xco_t embeds xstep_t as + * its first member, so a coroutine is one concrete + * kind of resumable function: generic code holding an + * xstep_t * works on coroutines and hand-coded state + * machines uniformly. Values pass between caller and + * coroutine through a single uintptr_t channel; pack + * richer data behind a pointer. + * Asymmetric: xco_suspend always returns to the most + * recent resumer. Resumes nest like function calls. + * Thread-affine: a coroutine must be resumed on the + * thread that initialized it. + * + * xco_task_t xco specialization of task_t. The trampoline calls + * fn(&xt->task, arg) and then task_done with the + * return value, so wait_or_cancel-style teardown works + * without the user wiring anything. + * + * Single-threaded only — matches xco's thread affinity. The contract + * "try first, park only if try failed" is race-free because nothing + * else runs between the two calls. + * + * One-waker invariant. An xstep, while suspended, is parked on at most + * one event. Multi-wait is composed in the event graph: build a + * select_event (or any future combinator — all-of, timeout, ...) and + * park on that. The xstep never sees more than one event directly. + * This is what lets a single step_waker_t live inline in the xstep + * and a single next/prev pair serve both event waitlists and the + * runtime ready queue (the two list memberships are disjoint in time). */ #ifndef XCO_H #define XCO_H +#include <stdbool.h> #include <stddef.h> #include <stdint.h> -#include "xstep.h" -#include "event.h" -/* Provides XCO_SIZE, XCO_ALIGN, XCO_STACK_ALIGN; resolved by the - * build to the arch-specific copy via the include path. */ -#include "xco_arch.h" +/* Provides XCO_SIZE, XCO_ALIGN, XCO_STACK_ALIGN, _XCO_CTX_SIZE, + * _XCO_CTX_ALIGN; resolved by the build to the platform-specific + * copy via the include path (-Iplatform/$(PLATFORM)). */ +#include "xco_platform.h" + +/* ==================================================================== + * xstep — generic resumable function interface. + * + * The first-member convention: embed xstep_t as the first field of + * your concrete type so a pointer to your type can be passed wherever + * an xstep_t * is expected. + * + * typedef struct { + * xstep_t base; + * int phase; + * ... + * } parser_t; + * + * static xstep_result_t parser_step(xstep_t *s, uintptr_t v) { + * parser_t *p = (parser_t *)s; + * switch (p->phase) { + * case 0: p->phase = 1; return (xstep_result_t){v + 1, XSTEP_SUSPENDED}; + * case 1: return (xstep_result_t){v * 2, XSTEP_DEAD}; + * } + * __builtin_unreachable(); + * } + * + * parser_t p = { .base = {.step = parser_step, .status = XSTEP_INIT} }; + * xstep_result_t r = xstep(&p.base, 0); + * ==================================================================== */ + +typedef enum { + XSTEP_INIT, /* created, never stepped */ + XSTEP_RUNNING, /* inside step(), or in an active resume chain */ + XSTEP_SUSPENDED, /* yielded; resumable */ + XSTEP_DEAD, /* function returned */ +} xstep_status_t; + +typedef struct { + uintptr_t value; + xstep_status_t status; +} xstep_result_t; + +typedef struct xstep xstep_t; +typedef xstep_result_t (*xstep_fn)(xstep_t *s, uintptr_t value); + +struct xstep { + xstep_fn step; + xstep_status_t status; /* cached; xstep() syncs from each result */ +}; + +/* Drive one step. The wrapper updates s->status from the returned + * result so step implementations only need to populate the result. */ +static inline xstep_result_t xstep(xstep_t *s, uintptr_t value) { + xstep_result_t r = s->step(s, value); + s->status = r.status; + return r; +} + +static inline xstep_status_t xstep_status(const xstep_t *s) { + return s->status; +} + +/* ==================================================================== + * Event substrate. + * + * Standard usage from a state machine: + * + * uintptr_t v; + * if (event_try(e, &v)) { ... use v ... } + * else { event_park(e, &my_waker.base); return SUSPENDED; } + * + * Standard usage from an xco coroutine wrapper: + * + * uintptr_t v; + * if (!event_try(e, &v)) { + * step_waker_t sw; + * step_waker_init(&sw, rt, &xco_self()->base); + * event_park(e, &sw.base); + * xco_suspend(0); + * (void)event_try(e, &v); // now ready + * } + * ==================================================================== */ + +/* ---- Waker ------------------------------------------------------------ */ + +typedef struct waker waker_t; +struct waker { + /* Doubly-linked while parked on an event waitlist, so unpark is + * O(1). Reused as the singly-linked next pointer while on the + * runtime ready queue (FIFO, no removal from middle); prev is + * undefined in that state and reset on the next park. */ + waker_t *next; + waker_t *prev; + /* Fire callback. value is the event's payload at fire time — sticky + * events also store it on themselves, transient events (channels, + * one-shot signals) deliver only here. Wakers that don't care + * about the value just ignore the parameter. + * + * Invoke via waker_fire (below), not directly: the helper enforces + * the "fire receives a fully detached waker" contract that makes it + * safe to re-park inside the callback. */ + void (*fire)(waker_t *w, uintptr_t value); +}; + +/* Canonical way to invoke a waker's fire callback. Hands the callback a + * fully detached waker so the callback (or whatever the resumed step + * does) can re-park on a fresh waitlist without colliding with stale + * link state. Detachers that lead into fire (queue pops, latch drains, + * etc.) don't need to clear prev/next themselves. */ +static inline void waker_fire(waker_t *w, uintptr_t value) { + w->prev = NULL; + w->next = NULL; + w->fire(w, value); +} + +/* ---- Event ------------------------------------------------------------ */ + +typedef struct event event_t; + +typedef struct { + /* Inline-succeed if possible. *out receives the event's value when + * it has one (latch payload, select winner index, channel data). + * out may be NULL if the caller doesn't need the value. */ + bool (*try_)(event_t *e, uintptr_t *out); + /* Arm w on the event's waitlist. Caller's contract: try_ returned + * false. Single-threaded runtime guarantees no transition between. */ + void (*park)(event_t *e, waker_t *w); + /* Remove w from the waitlist. Idempotent: no-op if not parked. */ + void (*unpark)(event_t *e, waker_t *w); +} event_vtable_t; + +struct event { const event_vtable_t *vt; }; + +static inline bool event_try(event_t *e, uintptr_t *out) { + return e->vt->try_(e, out); +} +static inline void event_park(event_t *e, waker_t *w) { e->vt->park(e, w); } +static inline void event_unpark(event_t *e, waker_t *w) { e->vt->unpark(e, w); } + +/* ---- Runtime ---------------------------------------------------------- */ + +/* Forward-declared: the optional timer source attached to the runtime. + * Defined in the timer section below. */ +typedef struct timers timers_t; + +typedef struct runtime { + waker_t *head, *tail; + timers_t *timers; /* optional; advanced inside rt_run */ +} runtime_t; + +static inline void rt_init(runtime_t *rt) { + rt->head = rt->tail = NULL; + rt->timers = NULL; +} + +/* Attach (or detach with NULL) a timer source. While attached, rt_run + * advances it each pass with the now value the caller supplied; firing + * timers may enqueue more wakers, which the same rt_run call then drains. */ +static inline void rt_attach_timers(runtime_t *rt, timers_t *ts) { + rt->timers = ts; +} + +/* Append w to the ready queue. Used by step_waker_fire and by anyone + * else that wants a waker resumed by the scheduler. */ +static inline void rt_enqueue(runtime_t *rt, waker_t *w) { + w->next = NULL; + if (rt->tail) rt->tail->next = w; + else rt->head = w; + rt->tail = w; +} + +/* Drain the ready queue, resuming each step-waker's xstep, until empty. + * Steps may re-arm on events (and thus leave the queue) or enqueue + * other steps; rt_run keeps going until quiescent. now is forwarded to + * any attached timer source's advance(); pass 0 (or anything) when no + * source is attached. The library never reads a clock — now is always + * caller-supplied. */ +void rt_run(runtime_t *rt, uint64_t now); + +/* The canonical bridge between events and the scheduler. When fired, + * stashes the value and enqueues itself onto rt; rt_run pops it and + * calls xstep(step, value), so the resumed step receives the event's + * payload directly without a re-try. + * + * Init once, re-park freely. The runtime hands the waker back fully + * detached (next/prev cleared) before invoking the resumed step, so a + * subscriber that wants to wait on the next event can call event_park + * directly — no re-init needed unless rt or step changes. */ +typedef struct { + waker_t base; + runtime_t *rt; + xstep_t *step; + uintptr_t resume_value; /* set by fire, consumed by rt_run */ +} step_waker_t; + +/* Defined in xco.c; declared here so step_waker_init can install it. */ +void _step_waker_fire(waker_t *w, uintptr_t value); + +static inline void step_waker_init(step_waker_t *sw, runtime_t *rt, xstep_t *s) { + sw->base.next = NULL; + sw->base.prev = NULL; + sw->base.fire = _step_waker_fire; + sw->rt = rt; + sw->step = s; + sw->resume_value = 0; +} + +/* ---- Latch ------------------------------------------------------------ */ + +/* One-shot sticky event. set() flips the bit, stores the payload, and + * fires every waiter. Subsequent set() calls are ignored. To re-arm, + * reinitialize a fresh latch. */ +typedef struct { + event_t base; + bool set; + uintptr_t value; + waker_t *waiters; +} latch_t; + +/* Defined in xco.c; referenced by latch_init. */ +extern const event_vtable_t _latch_vt; + +static inline void latch_init(latch_t *l) { + l->base.vt = &_latch_vt; + l->set = false; + l->value = 0; + l->waiters = NULL; +} + +void latch_set(latch_t *l, uintptr_t value); + +/* ---- Countdown -------------------------------------------------------- */ + +/* One-shot fan-in counter. Fires its embedded latch (payload 0) when + * remaining hits 0. countdown_add(n) is legal while remaining > 0; + * countdown_done decrements; both are UB once the latch has fired. + * + * Compose with the standard event API via countdown_event(). */ +typedef struct countdown { + latch_t done; + size_t remaining; +} countdown_t; + +static inline void countdown_init(countdown_t *c, size_t n) { + latch_init(&c->done); + c->remaining = n; + if (n == 0) latch_set(&c->done, 0); +} + +static inline void countdown_add(countdown_t *c, size_t n) { + /* UB after fire — caller's contract. */ + c->remaining += n; +} + +static inline void countdown_done(countdown_t *c) { + /* UB at 0 — caller's contract. */ + if (--c->remaining == 0) latch_set(&c->done, 0); +} + +static inline event_t *countdown_event(countdown_t *c) { return &c->done.base; } +static inline bool countdown_fired(const countdown_t *c) { return c->done.set; } + +/* ---- Notify (wake-one / wake-all) ------------------------------------- */ + +/* Transient signal with no sticky state. notify_one fires (and detaches) + * the head of a FIFO waitlist; notify_all fires every parked waker. Both + * are no-ops when the waitlist is empty. Subscribers must re-park to see + * subsequent notifications. + * + * event_try always returns false: there is no "ready now" state — a + * subscriber waits for the *next* notify. */ +typedef struct notify { + event_t base; + waker_t *head, *tail; +} notify_t; + +extern const event_vtable_t _notify_vt; + +static inline void notify_init(notify_t *n) { + n->base.vt = &_notify_vt; + n->head = n->tail = NULL; +} + +static inline event_t *notify_event(notify_t *n) { return &n->base; } + +void notify_one(notify_t *n); +void notify_all(notify_t *n); + +/* ---- Semaphore -------------------------------------------------------- */ + +/* Counting semaphore. acquire is exposed as event_t (composable with + * select / wait_or_cancel): event_try succeeds and decrements when + * permits > 0; otherwise the waker parks FIFO. semaphore_release(n) + * hands one permit to each of up to n waiting wakers (each is fired, + * which the receiver treats as "you got a permit") before adding any + * leftover to the count. + * + * One permit per acquire. Bulk acquire isn't expressible in event_t's + * shape; if you need it, call sequentially. For binary use (mutex-style + * critical section across awaits) init with permits = 1. + * + * Fairness: FIFO at the waitlist. A racing inline event_try by a fresh + * caller can jump ahead of parked waiters when permits are released + * back to count rather than directly handed off — release prefers + * parked waiters first to avoid that. */ +typedef struct semaphore { + event_t acquire; + size_t permits; + waker_t *head, *tail; +} semaphore_t; + +extern const event_vtable_t _semaphore_acquire_vt; + +static inline void semaphore_init(semaphore_t *s, size_t initial) { + s->acquire.vt = &_semaphore_acquire_vt; + s->permits = initial; + s->head = s->tail = NULL; +} + +static inline event_t *semaphore_event(semaphore_t *s) { return &s->acquire; } + +void semaphore_release(semaphore_t *s, size_t n); + +/* ---- Mutex ------------------------------------------------------------ */ + +/* Binary semaphore wrapper for vocabulary at call sites. mutex_init is + * semaphore_init(s, 1); the event_t fires once per release; mutex_release + * hands the permit to the next waiter (or returns it to the count). */ +typedef semaphore_t mutex_t; + +static inline void mutex_init (mutex_t *m) { semaphore_init(m, 1); } +static inline event_t *mutex_event (mutex_t *m) { return semaphore_event(m); } +static inline void mutex_release(mutex_t *m) { semaphore_release(m, 1); } + +/* ---- Select / all-of -------------------------------------------------- */ + +/* Wait over N input events. Two semantics share the same storage shape, + * so a caller can switch between them by changing only the init call: + * + * select_event_init fires when ANY input fires (any-of) + * allof_event_init fires when ALL inputs fire (all-of) + * + * In both cases done's payload is the index of the input whose firing + * closed the wait — the winner for select, the last-to-fire for allof — + * and inputs[i].value carries each fired input's payload (works + * uniformly for sticky and transient sources, where re-trying the input + * would either succeed or fail). Composes: a select_event is itself an + * event. */ + +typedef struct select_event select_event_t; + +/* Per-input arming record. Caller-allocated as an array of n alongside + * the select_event. After fire, .value holds whatever the input + * delivered; other fields are internal. */ +typedef struct { + waker_t w; + event_t *src; + select_event_t *parent; + uintptr_t value; /* captured at fire time */ +} select_input_t; + +struct select_event { + latch_t done; /* fires with the closing input's index */ + select_input_t *inputs; + size_t n; + size_t remaining; /* counts down; done fires at 0 + * (select: starts at 1, allof: at n) */ +}; + +/* Initialize as a select (any-of). inputs[] is caller-provided storage + * for n nodes; srcs[] is the array of n input event pointers (read + * once during init). If any input is already ready, the select fires + * immediately and no wakers are parked. Use &s->done.base as the + * resulting event_t. */ +void select_event_init(select_event_t *s, + select_input_t *inputs, size_t n, + event_t *const *srcs); + +/* Initialize as an allof (all-of). Inputs already ready at init are + * consumed inline (no parking, value captured); if every input is + * ready, done fires immediately. n == 0 fires done with payload 0. */ +void allof_event_init(select_event_t *s, + select_input_t *inputs, size_t n, + event_t *const *srcs); + +/* Disarm any inputs still parked. Safe to call after fire (no-op) and + * after partial completion (allof). Required before s leaves scope if + * it has not yet fired. */ +void select_event_deinit(select_event_t *s); + +/* ---- Channel ---------------------------------------------------------- */ + +/* Unbuffered rendezvous channel carrying uintptr_t. Senders and receivers + * wait on each other; whichever arrives first parks until its peer + * shows up. The pending value lives in the sender's chan_send_waker_t + * for the duration of any wait — no per-channel buffer. + * + * Recv side is exposed as event_t (composable with select). Send side is + * a typed API because send carries a value that doesn't fit event_t's + * try(out) signature. Selecting on send is therefore not supported in + * this layer; if needed, wrap a send in a per-call op event. + * + * Rendezvous matrix: + * send + parked recv fire recv with value, sender continues inline. + * send + no recv sender parks (chan_park_send); peer pulls later. + * recv + parked sender read sender's value, fire sender (delivery + * confirmation), receiver continues inline. + * recv + no sender receiver parks (event_park on recv); peer + * delivers later. + * + * FIFO order on both waitlists. + * + * Close: optional EOF semantics. After chan_close, try_send fails (no + * delivery), parked senders are drained with delivered=false, and parked + * receivers are woken so they can observe RECV_CLOSED via chan_recv. The + * recv event is "ready" iff a value is available OR the channel is + * closed — receivers must call chan_recv to disambiguate value vs EOF. + * chan_park_send after close is UB. */ + +/* Result of a typed receive on a channel or queue. */ +typedef enum { + RECV_GOT, /* *out holds the delivered value */ + RECV_EMPTY, /* nothing available right now; caller may park */ + RECV_CLOSED, /* peer closed and no values remain */ +} recv_status_t; + +typedef struct chan { + event_t recv; /* the recv-side event */ + waker_t *send_head, *send_tail; /* parked chan_send_waker_t bases */ + waker_t *recv_head, *recv_tail; /* parked recv-side wakers */ + bool closed; +} chan_t; + +extern const event_vtable_t _chan_recv_vt; + +static inline void chan_init(chan_t *c) { + c->recv.vt = &_chan_recv_vt; + c->send_head = c->send_tail = NULL; + c->recv_head = c->recv_tail = NULL; + c->closed = false; +} + +/* Sender-side waker. Embeds a step_waker (the actual scheduler bridge) + * and the value to deliver. The receiver reads .value, then fires the + * step_waker — the sender resumes via the runtime, no payload passed. + * + * `delivered` is set by the closing side before fire: true on a normal + * recv handoff, false when chan_close drains the parked-sender list. + * Senders read it after resume to know whether their value reached a + * receiver. */ +typedef struct { + step_waker_t sw; + uintptr_t value; + bool delivered; +} chan_send_waker_t; + +static inline void chan_send_waker_init(chan_send_waker_t *csw, + runtime_t *rt, xstep_t *s, + uintptr_t value) { + step_waker_init(&csw->sw, rt, s); + csw->value = value; + csw->delivered = false; +} + +/* Try to deliver value inline. Returns true iff a receiver was parked + * AND the channel is open; its waker fires with the value, the receiver + * becomes ready, and the sender continues without parking. Returns false + * after close (no delivery). */ +bool chan_try_send(chan_t *c, uintptr_t value); + +/* Park a sender. csw->value must already be set (use chan_send_waker_init). + * The sender's xstep is resumed when a receiver consumes the value, or + * when chan_close drains the list (sender resumes with csw->delivered + * false). Calling chan_park_send on a closed channel is UB. */ +void chan_park_send(chan_t *c, chan_send_waker_t *csw); + +/* Remove a parked sender (cancellation). No-op if not parked. */ +void chan_unpark_send(chan_t *c, chan_send_waker_t *csw); + +/* Typed receive. Disambiguates value vs EOF where event_try cannot. */ +recv_status_t chan_recv(chan_t *c, uintptr_t *out); + +/* Close the channel. Idempotent. Drains parked senders (each resumes + * with csw->delivered = false) and wakes parked receivers (which then + * observe RECV_CLOSED via chan_recv). Subsequent chan_try_send returns + * false; chan_park_send is UB. */ +void chan_close(chan_t *c); +static inline bool chan_is_closed(const chan_t *c) { return c->closed; } + +/* Selectable send op. A per-call object that holds the value, parks + * on the channel, and exposes &op->done.base as the event that fires + * when delivery resolves. Compose with select like any other event. + * + * The op embeds a chan_send_waker_t (so the chan's send list stays + * uniform — receivers read .value at the same offset for both direct + * and op senders) but rewires its fire callback: instead of resuming + * an xstep, fire sets op->done. Polymorphism via the function pointer. + * + * The done latch's payload is 1 on a successful delivery and 0 on + * close-drain (sender's value did not reach a receiver). The init path + * resolves to 0 inline if the channel is already closed. + * + * Lifecycle: init → wait on &op->done.base → deinit. Always deinit; + * it's a no-op after resolution and unparks the chan-side waker if not. */ +typedef struct { + chan_send_waker_t csw; /* parked on chan; fire overridden */ + chan_t *chan; + latch_t done; +} chan_send_op_t; + +/* Implementation detail: exposed so chan_send_op_init can install it. */ +void _chan_send_op_fire(waker_t *w, uintptr_t value); + +void chan_send_op_init(chan_send_op_t *op, chan_t *c, uintptr_t value); +void chan_send_op_deinit(chan_send_op_t *op); + +/* ---- Queue ------------------------------------------------------------ */ + +/* Bounded FIFO of uintptr_t. Caller provides the ring buffer storage. + * Recv side is exposed as event_t (composable with select). Send side is + * a typed API (carries a value), shaped after chan's send. + * + * Three full-buffer policies, fixed at init: + * QUEUE_BLOCK senders park until a receiver makes room. + * QUEUE_DROP_NEWEST queue_try_send silently discards the new value. + * QUEUE_DROP_OLDEST queue_try_send evicts the head and pushes new tail. + * + * Senders never park under DROP_* policies — queue_park_send is only + * meaningful under QUEUE_BLOCK, and only after queue_try_send returned + * false. queue_unpark_send is idempotent (cancellation-safe). + * + * Direct-handoff: queue_try_send first checks for a parked receiver and + * delivers inline if present (payload bypasses the buffer), regardless + * of policy. Symmetric to chan. + * + * cap == 0 with QUEUE_BLOCK degenerates to a rendezvous channel; chan_t + * remains the more direct expression of that case. */ + +typedef enum { + QUEUE_BLOCK, + QUEUE_DROP_NEWEST, + QUEUE_DROP_OLDEST, +} queue_policy_t; + +typedef struct queue { + event_t recv; + uintptr_t *buf; + size_t cap, head, len; + queue_policy_t policy; + waker_t *send_head, *send_tail; + waker_t *recv_head, *recv_tail; + bool closed; +} queue_t; + +extern const event_vtable_t _queue_recv_vt; + +static inline void queue_init(queue_t *q, uintptr_t *buf, size_t cap, + queue_policy_t policy) { + q->recv.vt = &_queue_recv_vt; + q->buf = buf; + q->cap = cap; + q->head = 0; + q->len = 0; + q->policy = policy; + q->send_head = q->send_tail = NULL; + q->recv_head = q->recv_tail = NULL; + q->closed = false; +} + +static inline event_t *queue_recv_event(queue_t *q) { return &q->recv; } + +/* Try to enqueue. Direct-delivers to a parked receiver if one is waiting. + * Returns: + * QUEUE_BLOCK true if delivered or buffered; false if full. + * QUEUE_DROP_NEWEST always true (silently drops if full). + * QUEUE_DROP_OLDEST always true (evicts head if full). */ +bool queue_try_send(queue_t *q, uintptr_t value); + +/* Sender-side waker for QUEUE_BLOCK. Same shape as chan_send_waker_t: + * a step_waker plus the value to deliver. Receivers read .value at the + * same offset so the queue's send list stays uniform. `delivered` is + * set by the closing side: true on a normal handoff, false on a close + * drain. */ +typedef struct { + step_waker_t sw; + uintptr_t value; + bool delivered; +} queue_send_waker_t; + +static inline void queue_send_waker_init(queue_send_waker_t *qsw, + runtime_t *rt, xstep_t *s, + uintptr_t value) { + step_waker_init(&qsw->sw, rt, s); + qsw->value = value; + qsw->delivered = false; +} + +void queue_park_send (queue_t *q, queue_send_waker_t *qsw); +void queue_unpark_send(queue_t *q, queue_send_waker_t *qsw); + +/* Typed receive. Mirrors chan_recv: returns RECV_GOT (value popped from + * the buffer or directly from a parked sender), RECV_CLOSED (closed and + * drained), or RECV_EMPTY (caller may park). */ +recv_status_t queue_recv(queue_t *q, uintptr_t *out); + +/* Close the queue. Idempotent. Drains parked senders (delivered=false) + * and wakes parked receivers. After close, queue_try_send under + * QUEUE_BLOCK returns false; under QUEUE_DROP_* the value is silently + * dropped (returns true). queue_park_send after close is UB. */ +void queue_close(queue_t *q); +static inline bool queue_is_closed(const queue_t *q) { return q->closed; } + +/* Selectable send op. Mirrors chan_send_op_t: a per-call object that + * holds the value, parks on the queue (only meaningful under QUEUE_BLOCK), + * and exposes &op->done.base as the event that fires when the send + * resolves. The done latch's payload is 1 on a successful delivery + * (handed to a receiver, buffered, or accepted under DROP_*) and 0 on + * close-drain. + * + * Under DROP_* policies the send always resolves inline at init: the + * try_send path returns true and op->done fires immediately. */ +typedef struct { + queue_send_waker_t qsw; /* parked on queue; fire overridden */ + queue_t *queue; + latch_t done; +} queue_send_op_t; + +void _queue_send_op_fire(waker_t *w, uintptr_t value); + +void queue_send_op_init (queue_send_op_t *op, queue_t *q, uintptr_t value); +void queue_send_op_deinit(queue_send_op_t *op); + +/* ---- Broadcast (slot) ------------------------------------------------- */ + +/* Re-armable signal carrying a "latest value" slot. Subscribers park on + * the event; broadcast_publish stores the new value, fires every parked + * subscriber with that value, and clears the waitlist — subscribers must + * re-park to see further publishes. Subscribers that aren't parked at + * publish time miss that publish but will see the next one. This is the + * coalescing "watch a slot" semantics, not lossless fan-out. + * + * event_try always returns false: there is no "ready now" state — a + * subscriber waits for the *next* publish. To read the latest published + * value at any time, use broadcast_value (valid once broadcast_has_value + * returns true). + * + * For lossless multi-consumer delivery, give each subscriber its own + * queue and have the producer write to all of them. */ + +typedef struct broadcast { + event_t base; + bool has_value; + uintptr_t value; + waker_t *waiters; +} broadcast_t; + +extern const event_vtable_t _broadcast_vt; + +static inline void broadcast_init(broadcast_t *b) { + b->base.vt = &_broadcast_vt; + b->has_value = false; + b->value = 0; + b->waiters = NULL; +} + +static inline event_t *broadcast_event (broadcast_t *b) { return &b->base; } +static inline bool broadcast_has_value(const broadcast_t *b) { return b->has_value; } +static inline uintptr_t broadcast_value (const broadcast_t *b) { return b->value; } + +void broadcast_publish(broadcast_t *b, uintptr_t value); + +/* ---- Cancellation ----------------------------------------------------- */ + +/* A cancellation token is a sticky latch — these aliases exist for + * vocabulary at call sites. cancel_set fires every parked waiter; the + * idempotency of latch_set means racing cancellers are fine. + * + * Pair a cancel_t with any blocking event via wait_or_cancel to get + * "await X, or be cancelled." + * + * Discipline: cancellation notifies; it never drops in-flight values. + * A cancelled await returns control to its caller, which is responsible + * for draining whatever it owns — deinit a pending chan_send_op so its + * value goes back to the sender, deinit a select_event so input wakers + * detach, drive a cancellable coroutine to XSTEP_DEAD before freeing + * its stack. The xco layer does no unwinding; the coroutine cooperates. */ + +typedef latch_t cancel_t; + +static inline void cancel_init(cancel_t *c) { latch_init(c); } +static inline void cancel_set(cancel_t *c) { latch_set(c, 0); } +static inline bool cancel_is_set(const cancel_t *c) { return c->set; } +static inline event_t *cancel_event(cancel_t *c) { return &c->base; } + +/* Outcome indices for wait_or_cancel — match the inputs[] order so the + * value the resumer receives from the latched select reads as one of + * these directly. */ +enum { + WAIT_OK = 0, /* ev fired; inputs[0].value holds its payload */ + WAIT_CANCELLED = 1, /* cancel fired before ev */ +}; + +/* Build a select over (ev, cancel) using caller-provided storage. If + * either is already ready at init the select fast-paths and never parks + * anyone (ev is checked first, so an event that has already resolved + * wins over a concurrent cancel). Treat &out->done.base as the event + * to wait on. Always pair with select_event_deinit before storage + * leaves scope (no-op once fired). */ +static inline void wait_or_cancel(select_event_t *out, + select_input_t inputs[2], + event_t *ev, cancel_t *c) { + event_t *srcs[2] = {ev, cancel_event(c)}; + select_event_init(out, inputs, 2, srcs); +} + +/* ---- Timers ----------------------------------------------------------- */ + +/* A timer is a sticky event keyed on a u64 deadline. It fires (exactly + * once) when the attached timer source is advanced past that deadline. + * The library never reads a clock; the caller provides `now` to + * timers_advance (or via rt_run). + * + * Storage is pluggable through the timers vtable so callers can swap a + * pairing heap (in-tree, O(log n) amortized everywhere including cancel) + * for a wheel or other structure without touching the timer/timeout + * surface. The timer struct holds the heap link fields inline; the source + * impl interprets them. + * + * Lifecycle: + * timer_init(t, ts, deadline) // inserts into ts + * ... wait on timer_event(t), or compose into select/wait_or_cancel ... + * timer_deinit(t) // removes from ts if not yet fired + * + * Fire payload is the deadline. Re-arming = reinit a fresh timer. */ + +typedef struct timer timer_t; + +typedef struct { + /* Insert t into the source. t must be initialized but not yet + * inserted; insert sets t's heap link fields. */ + void (*insert) (timers_t *ts, timer_t *t); + /* Remove t from the source if currently inserted. Caller must + * ensure t was inserted into this same source. */ + void (*cancel) (timers_t *ts, timer_t *t); + /* Fire every timer whose deadline <= now, in deadline order, popping + * each from the source. Each fire drains the timer's waiter list. */ + void (*advance)(timers_t *ts, uint64_t now); + /* If any timer is queued, write its deadline to *out and return + * true. *out untouched on false. */ + bool (*peek) (const timers_t *ts, uint64_t *out); +} timers_vtable_t; + +struct timers { const timers_vtable_t *vt; }; + +static inline void timers_insert (timers_t *ts, timer_t *t) { ts->vt->insert (ts, t); } +static inline void timers_cancel (timers_t *ts, timer_t *t) { ts->vt->cancel (ts, t); } +static inline void timers_advance(timers_t *ts, uint64_t now) { ts->vt->advance(ts, now); } +static inline bool timers_peek (const timers_t *ts, uint64_t *o) { return ts->vt->peek(ts, o); } + +/* Concrete timer. Embeds a latch so try/park/unpark and the fire-all + * waitlist handling come for free; the timer source manipulates only + * the heap link fields and triggers the latch on fire. The latch's + * payload after fire is the timer's deadline. */ +struct timer { + latch_t done; /* fires once, payload = deadline */ + uint64_t deadline; + timers_t *src; /* source this timer is registered with */ + bool in_heap; /* true between insert and fire/cancel */ + /* Pairing-heap link fields; opaque to anyone but the source impl. + * prev is parent if first child, else previous sibling, else NULL. */ + timer_t *child, *prev, *next; +}; + +static inline event_t *timer_event(timer_t *t) { return &t->done.base; } +static inline bool timer_fired(const timer_t *t) { return t->done.set; } + +void timer_init (timer_t *t, timers_t *ts, uint64_t deadline); +void timer_deinit(timer_t *t); + +/* In-tree timer source: intrusive pairing heap. O(1) amortized insert + * and meld; O(log n) amortized advance and cancel. No per-source + * allocation — the heap is just a root pointer; nodes live in the + * caller's timer_t's. */ +typedef struct { + timers_t base; + timer_t *root; +} pairing_heap_t; + +extern const timers_vtable_t _pairing_heap_vt; + +static inline void pairing_heap_init(pairing_heap_t *h) { + h->base.vt = &_pairing_heap_vt; + h->root = NULL; +} + +/* ---- Timeout ---------------------------------------------------------- */ + +/* Bundle: a timer that fires a cancel_t on expiration. The natural + * pairing for "await ev, or be cancelled by deadline": + * + * timeout_t to; + * timeout_init(&to, ts, now + budget); + * select_event_t sel; select_input_t inputs[2]; + * wait_or_cancel(&sel, inputs, ev, &to.cancel); + * ... wait on &sel.done.base ... + * select_event_deinit(&sel); + * timeout_deinit(&to); // safe whether the timer fired or not + * + * The bridge waker is parked on the timer; when it fires it sets the + * cancel. Bridge fire is idempotent vs cancel_set (a sticky latch). */ +typedef struct timeout { + timer_t timer; + cancel_t cancel; + waker_t bridge; +} timeout_t; + +void timeout_init (timeout_t *to, timers_t *ts, uint64_t deadline); +void timeout_deinit(timeout_t *to); + +/* ---- Ticker ----------------------------------------------------------- */ + +/* Re-armable transient signal driven by a timer source. Each time the + * underlying timer fires, the ticker computes the next deadline (period + * past the just-fired one, with skip-ahead for catch-up after overflow), + * reinstalls the timer, and fires every parked subscriber with the + * just-fired deadline as the payload. Subscribers that aren't parked at + * a fire miss it (transient — same coalescing semantics as broadcast). + * + * ticker_init(&t, ts, period, first_deadline); + * ... wait on ticker_event(&t), re-park to see further ticks ... + * ticker_deinit(&t); // cancels the in-flight timer + * + * event_try always returns false; subscribers wait for the *next* tick. */ +typedef struct ticker { + timer_t timer; + timers_t *src; + uint64_t period; + event_t base; + waker_t *waiters; + waker_t bridge; /* internal: parks on timer_event */ +} ticker_t; + +extern const event_vtable_t _ticker_vt; + +void ticker_init (ticker_t *t, timers_t *ts, + uint64_t period, uint64_t first_deadline); +void ticker_deinit(ticker_t *t); +static inline event_t *ticker_event(ticker_t *t) { return &t->base; } + +/* ---- Task ------------------------------------------------------------- */ + +/* Lifecycle handle for a running xstep. Bundles a done latch (fires when + * the xstep returns, payload = its return value) with a cancel latch + * (the canonical signal to ask the xstep to wind down). The xstep itself + * is caller-allocated; the task holds a pointer to it. + * + * Who fires done: + * - Hand-coded state machine: call task_done(t, ret) in the same arm + * that returns XSTEP_DEAD. + * - xco-backed task (see xco_task_t below): the trampoline calls + * task_done automatically with the coroutine's return value. + * + * Cooperation: cancellation only notifies — the xstep is responsible for + * draining what it owns and reaching XSTEP_DEAD. The task's cancel is a + * normal cancel_t, so the xstep typically composes wait_or_cancel against + * it on every blocking await. + * + * Joining: callers wait on task_done_event with the standard event API + * (try / park, or compose into select / wait_or_cancel). On fire the + * latch's payload is the xstep's return value. */ + +typedef struct task { + xstep_t *step; + latch_t done; + cancel_t cancel; +} task_t; + +static inline void task_init(task_t *t, xstep_t *step) { + t->step = step; + latch_init(&t->done); + cancel_init(&t->cancel); +} + +/* Mark the task complete with its return value. Idempotent (latch_set is). */ +static inline void task_done(task_t *t, uintptr_t value) { + latch_set(&t->done, value); +} + +static inline event_t *task_done_event (task_t *t) { return &t->done.base; } +static inline cancel_t *task_cancel (task_t *t) { return &t->cancel; } +static inline bool task_finished (const task_t *t) { return t->done.set; } +static inline bool task_is_cancelled(const task_t *t) { return cancel_is_set(&t->cancel); } +static inline xstep_t *task_step (task_t *t) { return t->step; } + +/* ---- Task group ------------------------------------------------------- */ + +/* Fan-in join + fan-out cancel for a dynamic set of tasks. Caller + * provides storage for each per-attachment record (group_attach_t), so + * the group itself does no allocation. + * + * task_group_attach(g, t, slot): + * countdown_add(g->pending, 1); slot's bridge waker parks on + * task_done_event(t); slot is appended to g's list. When the task's + * done fires, the bridge fires: it splices the slot out of g's list + * and calls countdown_done(&g->pending). Re-attaching a finished + * task is UB. + * + * task_group_cancel(g): walks the attachment list and cancel_set's + * each &slot->task->cancel, then cancel_set's g->cancel. Bodies that + * compose wait_or_cancel against task_cancel(t) wind down cooperatively; + * meanwhile, anything waiting on g->cancel observes the group-level + * signal directly. + * + * task_group_join_event(g): fires when every attached task has reached + * task_done. Compose with select / wait_or_cancel like any event. */ + +typedef struct group_attach group_attach_t; + +typedef struct task_group { + countdown_t pending; + cancel_t cancel; + group_attach_t *head, *tail; +} task_group_t; + +struct group_attach { + waker_t bridge; /* parked on task_done_event(task) */ + task_t *task; + task_group_t *group; + group_attach_t *next, *prev; +}; + +void task_group_init (task_group_t *g); +void task_group_attach (task_group_t *g, task_t *t, + group_attach_t *slot); +void task_group_cancel (task_group_t *g); +static inline event_t *task_group_join_event (task_group_t *g) { + return countdown_event(&g->pending); +} +static inline cancel_t *task_group_cancel_handle(task_group_t *g) { + return &g->cancel; +} + +/* ==================================================================== + * xco — stack-switching coroutines. + * + * Teardown: this layer does not unwind a suspended coroutine's stack. + * Drive a coroutine to return (e.g. by passing a cancel sentinel it + * is expected to handle) before freeing its stack memory. + * ==================================================================== */ /* Coroutine entry point. The argument is the value supplied to the * first xstep on this xco. The return value is delivered to the resumer @@ -124,8 +1099,8 @@ static inline bool xco_await_or_cancel(runtime_t *rt, event_t *ev, /* ---- xco-backed task ------------------------------------------------- */ -/* xco specialization of task_t (event.h). Bundles the user-visible task - * handle with the xco that runs it; the trampoline calls fn(&xt->task, arg) +/* xco specialization of task_t. Bundles the user-visible task handle + * with the xco that runs it; the trampoline calls fn(&xt->task, arg) * and then task_done with its return value, so wait_or_cancel-style * teardown works without the user wiring anything. * diff --git a/xco_platform.h b/xco_platform.h @@ -1,34 +0,0 @@ -/* - * xco_platform.h — generic platform interface for xco.c. - * - * Declares the two primitives the platform layer must implement, plus - * the (forward-declared) context type. xco.c never derefs values of - * this type — it only passes pointers around — so the forward - * declaration is enough here. The full struct lives in the arch- - * specific xco_arch.c. - */ - -#ifndef XCO_PLATFORM_H -#define XCO_PLATFORM_H - -#include <stddef.h> -#include <stdint.h> - -typedef struct xco_platform_ctx xco_platform_ctx_t; - -/* Initialize *ctx so that the next switch into it begins executing - * entry(value), where value is the uintptr_t handed to the switch. - * stack_base must be XCO_STACK_ALIGN-aligned. entry must not return. */ -void xco_platform_init(xco_platform_ctx_t *ctx, - void *stack_base, size_t stack_len, - void (*entry)(uintptr_t)); - -/* Save callee-saved state into *from, restore it from *to, and hand - * `value` to *to — as the return value of *to's previous switch, or - * as entry's argument on first switch into a fresh context. Returns - * the value passed by the next switch back to *from. */ -uintptr_t xco_platform_switch(xco_platform_ctx_t *from, - xco_platform_ctx_t *to, - uintptr_t value); - -#endif /* XCO_PLATFORM_H */ diff --git a/xstep.h b/xstep.h @@ -1,71 +0,0 @@ -/* - * xstep.h — generic resumable function interface. C11. - * - * An xstep_t is a value that can be driven forward one step at a - * time. Each step takes a uintptr_t in, returns one out, and reports - * whether the function suspended (more steps to come) or finished. - * Pack richer data behind a pointer. - * - * This is the substrate two implementations share: stack-switching - * coroutines (xco) and hand-coded state machines. The first member - * convention — embed xstep_t as the first field of your concrete - * type — lets a pointer to your type be passed wherever an xstep_t * - * is expected. - * - * typedef struct { - * xstep_t base; - * int phase; - * ... - * } parser_t; - * - * static xstep_result_t parser_step(xstep_t *s, uintptr_t v) { - * parser_t *p = (parser_t *)s; - * switch (p->phase) { - * case 0: p->phase = 1; return (xstep_result_t){v + 1, XSTEP_SUSPENDED}; - * case 1: return (xstep_result_t){v * 2, XSTEP_DEAD}; - * } - * __builtin_unreachable(); - * } - * - * parser_t p = { .base = {.step = parser_step, .status = XSTEP_INIT} }; - * xstep_result_t r = xstep(&p.base, 0); - */ - -#ifndef XSTEP_H -#define XSTEP_H - -#include <stdint.h> - -typedef enum { - XSTEP_INIT, /* created, never stepped */ - XSTEP_RUNNING, /* inside step(), or in an active resume chain */ - XSTEP_SUSPENDED, /* yielded; resumable */ - XSTEP_DEAD, /* function returned */ -} xstep_status_t; - -typedef struct { - uintptr_t value; - xstep_status_t status; -} xstep_result_t; - -typedef struct xstep xstep_t; -typedef xstep_result_t (*xstep_fn)(xstep_t *s, uintptr_t value); - -struct xstep { - xstep_fn step; - xstep_status_t status; /* cached; xstep() syncs from each result */ -}; - -/* Drive one step. The wrapper updates s->status from the returned - * result so step implementations only need to populate the result. */ -static inline xstep_result_t xstep(xstep_t *s, uintptr_t value) { - xstep_result_t r = s->step(s, value); - s->status = r.status; - return r; -} - -static inline xstep_status_t xstep_status(const xstep_t *s) { - return s->status; -} - -#endif /* XSTEP_H */