xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit 3a82709a0f8258ffa4e0a224ce0a45dfc493ce46
parent 76cdbc983cbfd158c6114e33a7252c576ed6b63c
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 03:01:20 -0700

Add timers: pluggable timer source with pairing-heap default

timer_t is a sticky event keyed on a u64 deadline. The library never
reads a clock — the caller passes now to timers_advance (or rt_run,
which forwards it to any attached source). Storage is pluggable via a
timers_t vtable; the in-tree pairing heap gives O(log n) amortized
cancel, which matters because timeouts mostly don't fire.

Runtime gains an optional timers_t pointer; rt_run(rt, now) advances
it each pass before draining the ready queue, so a timer firing from
inside rt_run drains its waiters in the same call.

timeout_t bundles a timer with a cancel_t and a bridge waker that
fires cancel_set on expiration — composes with wait_or_cancel out of
the box.

Diffstat:
Mevent.c | 222+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mevent.h | 130++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mtests/test_event.c | 392++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
3 files changed, 713 insertions(+), 31 deletions(-)

diff --git a/event.c b/event.c @@ -25,13 +25,25 @@ static waker_t *rt_dequeue(runtime_t *rt) { return w; } -void rt_run(runtime_t *rt) { +void rt_run(runtime_t *rt, uint64_t now) { /* The runtime ready queue holds only step-wakers. Other waker * shapes (e.g. select_input) fire synchronously inside event - * notify paths and never reach here. */ - for (waker_t *w; (w = rt_dequeue(rt));) { - step_waker_t *sw = (step_waker_t *)w; - xstep(sw->step, sw->resume_value); + * notify paths and never reach here. + * + * If a timer source is attached, advance it at the top of each + * iteration: any timer whose deadline <= now fires, enqueueing its + * step-wakers, which the inner loop then drains. A step may insert + * a fresh already-expired timer; the outer loop catches it on the + * next pass. Termination: each pass either drains a non-empty + * queue or exits, and advance only fires timers it then removes, + * so total work is bounded. */ + for (;;) { + if (rt->timers) timers_advance(rt->timers, now); + if (!rt->head) return; + for (waker_t *w; (w = rt_dequeue(rt));) { + step_waker_t *sw = (step_waker_t *)w; + xstep(sw->step, sw->resume_value); + } } } @@ -335,3 +347,203 @@ void chan_send_op_deinit(chan_send_op_t *op) { if (op->done.set) return; chan_unpark_send(op->chan, &op->csw); } + +/* ---- Timer ------------------------------------------------------------ */ + +/* The timer-as-event surface is just the embedded latch's vtable: try + * checks done.set, park/unpark mutate done.waiters. The timer source + * triggers the latch from advance(). No bespoke timer vtable needed. */ + +void timer_init(timer_t *t, timers_t *ts, uint64_t deadline) { + latch_init(&t->done); + t->deadline = deadline; + t->src = ts; + t->in_heap = false; + t->child = NULL; + t->prev = NULL; + t->next = NULL; + timers_insert(ts, t); /* sets in_heap = true */ +} + +void timer_deinit(timer_t *t) { + /* Idempotent. After fire, in_heap is false (advance popped us); + * after explicit cancel, also false. Otherwise we're still queued. */ + if (t->in_heap) timers_cancel(t->src, t); +} + +/* ---- Pairing heap ----------------------------------------------------- */ + +/* Standard intrusive pairing heap. Each node carries three link fields: + * + * child : head of children sibling list (NULL if leaf). + * prev : parent if this node is its parent's first child; + * previous sibling otherwise; NULL only for a detached tree + * root (including h->root). + * next : next sibling, or NULL for the last child / a detached root. + * + * The "prev points to either parent or a sibling" trick lets us splice + * a node out in O(1) without an extra parent pointer: parent->child==n + * distinguishes "first child" from "non-first sibling." + * + * meld picks the smaller-deadline root as winner and grafts the loser + * as its new first child. Pop-min and remove both rebuild via the + * classic two-pass pairwise merge of the resulting children list. */ + +/* Merge two detached subtree roots (each with prev=next=NULL). Returns + * the merged root (also detached: prev=next=NULL). */ +static timer_t *ph_meld(timer_t *a, timer_t *b) { + if (!a) return b; + if (!b) return a; + timer_t *small, *large; + if (a->deadline <= b->deadline) { small = a; large = b; } + else { small = b; large = a; } + /* Graft `large` as the new first child of `small`. */ + large->next = small->child; + if (small->child) small->child->prev = large; + large->prev = small; /* parent link via prev */ + small->child = large; + small->prev = NULL; + small->next = NULL; + return small; +} + +/* Two-pass pairwise meld of a children sibling list. Detaches each node + * before melding so meld inputs satisfy its prev=next=NULL contract. + * The output has prev=next=NULL. */ +static timer_t *ph_merge_pairs(timer_t *first) { + /* Pass 1: walk the sibling list left-to-right, melding consecutive + * pairs. Chain results via `next` (ab)use as a temporary list link. */ + timer_t *list = NULL; + while (first) { + timer_t *a = first; + timer_t *b = a->next; + timer_t *rest = b ? b->next : NULL; + a->prev = a->next = NULL; + if (b) { b->prev = b->next = NULL; } + timer_t *m = ph_meld(a, b); + m->next = list; /* prepend to pass-1 list */ + list = m; + first = rest; + } + /* Pass 2: meld the pass-1 list into a single root. */ + timer_t *acc = NULL; + while (list) { + timer_t *nxt = list->next; + list->prev = NULL; + list->next = NULL; + acc = ph_meld(acc, list); + list = nxt; + } + return acc; +} + +/* Detach n from the tree (must currently be in the heap). Returns the + * (possibly new) main-heap root. n is left fully detached: child still + * points to its subtree, but prev/next are NULL — caller decides what + * to do with that subtree. */ +static timer_t *ph_detach(pairing_heap_t *h, timer_t *n) { + if (h->root == n) { + /* n is the main root; pop it and rebuild from its children. */ + timer_t *new_root = ph_merge_pairs(n->child); + n->child = NULL; + n->prev = NULL; + n->next = NULL; + return new_root; + } + /* n has a parent (recorded via prev — either as its parent's first + * child or as some sibling's successor). Splice out of the sibling + * list. */ + if (n->prev->child == n) { + /* First child: parent's child link skips us. */ + n->prev->child = n->next; + } else { + /* Mid/last sibling: previous sibling's next skips us. */ + n->prev->next = n->next; + } + if (n->next) n->next->prev = n->prev; + n->prev = NULL; + n->next = NULL; + /* The main-heap root is unchanged structurally; the caller of this + * function decides how (or whether) to reintroduce n's subtree. */ + return h->root; +} + +static void ph_insert(timers_t *ts, timer_t *t) { + pairing_heap_t *h = (pairing_heap_t *)ts; + /* Singleton tree (prev/next/child already NULL via timer_init). */ + h->root = ph_meld(h->root, t); + t->in_heap = true; +} + +static void ph_cancel(timers_t *ts, timer_t *t) { + pairing_heap_t *h = (pairing_heap_t *)ts; + if (!t->in_heap) return; + h->root = ph_detach(h, t); + /* Now meld t's subtree (its children) back into the main heap. */ + timer_t *sub = ph_merge_pairs(t->child); + t->child = NULL; + h->root = ph_meld(h->root, sub); + t->in_heap = false; +} + +static void ph_advance(timers_t *ts, uint64_t now) { + pairing_heap_t *h = (pairing_heap_t *)ts; + /* Pop while the min-key timer is due. Each fire may run callbacks + * that insert *new* timers (with later deadlines, normally) — those + * land back in the heap, and we keep checking the root. */ + while (h->root && h->root->deadline <= now) { + timer_t *t = h->root; + h->root = ph_merge_pairs(t->child); + t->child = NULL; + t->prev = NULL; + t->next = NULL; + t->in_heap = false; + /* Trigger the latch: drains the waitlist and delivers the + * deadline as the fire payload. */ + latch_set(&t->done, (uintptr_t)t->deadline); + } +} + +static bool ph_peek(const timers_t *ts, uint64_t *out) { + const pairing_heap_t *h = (const pairing_heap_t *)ts; + if (!h->root) return false; + if (out) *out = h->root->deadline; + return true; +} + +const timers_vtable_t _pairing_heap_vt = { + .insert = ph_insert, + .cancel = ph_cancel, + .advance = ph_advance, + .peek = ph_peek, +}; + +/* ---- Timeout ---------------------------------------------------------- */ + +/* Bridge waker: parked on the timer's latch, fires the cancel when the + * timer fires. Two-step indirection so cancel can already have its own + * waiters (e.g. a wait_or_cancel select) without the timer's waitlist + * caring about cancel internals. */ +static void _timeout_bridge_fire(waker_t *w, uintptr_t value) { + (void)value; + timeout_t *to = (timeout_t *)((char *)w - offsetof(timeout_t, bridge)); + cancel_set(&to->cancel); +} + +void timeout_init(timeout_t *to, timers_t *ts, uint64_t deadline) { + timer_init(&to->timer, ts, deadline); + cancel_init(&to->cancel); + to->bridge.next = NULL; + to->bridge.prev = NULL; + to->bridge.fire = _timeout_bridge_fire; + /* Park the bridge on the timer. If the timer fires, latch_set + * detaches the bridge and calls our fire callback inline. */ + event_park(timer_event(&to->timer), &to->bridge); +} + +void timeout_deinit(timeout_t *to) { + /* unpark is idempotent (no-op if the bridge already fired or was + * never parked). timer_deinit is idempotent on the in_heap flag. */ + event_unpark(timer_event(&to->timer), &to->bridge); + timer_deinit(&to->timer); +} diff --git a/event.h b/event.h @@ -106,11 +106,26 @@ static inline void event_unpark(event_t *e, waker_t *w) { e->vt->unpark(e, w); } /* ---- Runtime ---------------------------------------------------------- */ +/* Forward-declared: the optional timer source attached to the runtime. + * Defined in the timer section below. */ +typedef struct timers timers_t; + typedef struct runtime { - waker_t *head, *tail; + waker_t *head, *tail; + timers_t *timers; /* optional; advanced inside rt_run */ } runtime_t; -static inline void rt_init(runtime_t *rt) { rt->head = rt->tail = NULL; } +static inline void rt_init(runtime_t *rt) { + rt->head = rt->tail = NULL; + rt->timers = NULL; +} + +/* Attach (or detach with NULL) a timer source. While attached, rt_run + * advances it each pass with the now value the caller supplied; firing + * timers may enqueue more wakers, which the same rt_run call then drains. */ +static inline void rt_attach_timers(runtime_t *rt, timers_t *ts) { + rt->timers = ts; +} /* Append w to the ready queue. Used by step_waker_fire and by anyone * else that wants a waker resumed by the scheduler. */ @@ -123,8 +138,11 @@ static inline void rt_enqueue(runtime_t *rt, waker_t *w) { /* Drain the ready queue, resuming each step-waker's xstep, until empty. * Steps may re-arm on events (and thus leave the queue) or enqueue - * other steps; rt_run keeps going until quiescent. */ -void rt_run(runtime_t *rt); + * other steps; rt_run keeps going until quiescent. now is forwarded to + * any attached timer source's advance(); pass 0 (or anything) when no + * source is attached. The library never reads a clock — now is always + * caller-supplied. */ +void rt_run(runtime_t *rt, uint64_t now); /* The canonical bridge between events and the scheduler. When fired, * stashes the value and enqueues itself onto rt; rt_run pops it and @@ -359,4 +377,108 @@ static inline void wait_or_cancel(select_event_t *out, select_event_init(out, inputs, 2, srcs); } +/* ---- Timers ----------------------------------------------------------- */ + +/* A timer is a sticky event keyed on a u64 deadline. It fires (exactly + * once) when the attached timer source is advanced past that deadline. + * The library never reads a clock; the caller provides `now` to + * timers_advance (or via rt_run). + * + * Storage is pluggable through the timers vtable so callers can swap a + * pairing heap (in-tree, O(log n) amortized everywhere including cancel) + * for a wheel or other structure without touching the timer/timeout + * surface. The timer struct holds the heap link fields inline; the source + * impl interprets them. + * + * Lifecycle: + * timer_init(t, ts, deadline) // inserts into ts + * ... wait on timer_event(t), or compose into select/wait_or_cancel ... + * timer_deinit(t) // removes from ts if not yet fired + * + * Fire payload is the deadline. Re-arming = reinit a fresh timer. */ + +typedef struct timer timer_t; + +typedef struct { + /* Insert t into the source. t must be initialized but not yet + * inserted; insert sets t's heap link fields. */ + void (*insert) (timers_t *ts, timer_t *t); + /* Remove t from the source if currently inserted. Caller must + * ensure t was inserted into this same source. */ + void (*cancel) (timers_t *ts, timer_t *t); + /* Fire every timer whose deadline <= now, in deadline order, popping + * each from the source. Each fire drains the timer's waiter list. */ + void (*advance)(timers_t *ts, uint64_t now); + /* If any timer is queued, write its deadline to *out and return + * true. *out untouched on false. */ + bool (*peek) (const timers_t *ts, uint64_t *out); +} timers_vtable_t; + +struct timers { const timers_vtable_t *vt; }; + +static inline void timers_insert (timers_t *ts, timer_t *t) { ts->vt->insert (ts, t); } +static inline void timers_cancel (timers_t *ts, timer_t *t) { ts->vt->cancel (ts, t); } +static inline void timers_advance(timers_t *ts, uint64_t now) { ts->vt->advance(ts, now); } +static inline bool timers_peek (const timers_t *ts, uint64_t *o) { return ts->vt->peek(ts, o); } + +/* Concrete timer. Embeds a latch so try/park/unpark and the fire-all + * waitlist handling come for free; the timer source manipulates only + * the heap link fields and triggers the latch on fire. The latch's + * payload after fire is the timer's deadline. */ +struct timer { + latch_t done; /* fires once, payload = deadline */ + uint64_t deadline; + timers_t *src; /* source this timer is registered with */ + bool in_heap; /* true between insert and fire/cancel */ + /* Pairing-heap link fields; opaque to anyone but the source impl. + * prev is parent if first child, else previous sibling, else NULL. */ + timer_t *child, *prev, *next; +}; + +static inline event_t *timer_event(timer_t *t) { return &t->done.base; } +static inline bool timer_fired(const timer_t *t) { return t->done.set; } + +void timer_init (timer_t *t, timers_t *ts, uint64_t deadline); +void timer_deinit(timer_t *t); + +/* In-tree timer source: intrusive pairing heap. O(1) amortized insert + * and meld; O(log n) amortized advance and cancel. No per-source + * allocation — the heap is just a root pointer; nodes live in the + * caller's timer_t's. */ +typedef struct { + timers_t base; + timer_t *root; +} pairing_heap_t; + +extern const timers_vtable_t _pairing_heap_vt; + +static inline void pairing_heap_init(pairing_heap_t *h) { + h->base.vt = &_pairing_heap_vt; + h->root = NULL; +} + +/* ---- Timeout ---------------------------------------------------------- */ + +/* Bundle: a timer that fires a cancel_t on expiration. The natural + * pairing for "await ev, or be cancelled by deadline": + * + * timeout_t to; + * timeout_init(&to, ts, now + budget); + * select_event_t sel; select_input_t inputs[2]; + * wait_or_cancel(&sel, inputs, ev, &to.cancel); + * ... wait on &sel.done.base ... + * select_event_deinit(&sel); + * timeout_deinit(&to); // safe whether the timer fired or not + * + * The bridge waker is parked on the timer; when it fires it sets the + * cancel. Bridge fire is idempotent vs cancel_set (a sticky latch). */ +typedef struct timeout { + timer_t timer; + cancel_t cancel; + waker_t bridge; +} timeout_t; + +void timeout_init (timeout_t *to, timers_t *ts, uint64_t deadline); +void timeout_deinit(timeout_t *to); + #endif /* EVENT_H */ diff --git a/tests/test_event.c b/tests/test_event.c @@ -73,7 +73,7 @@ static void test_latch_wake(void) { assert(l.waiters == NULL); /* drained on fire */ assert(rt.head != NULL); /* fire enqueued the step */ - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == 42); assert(rt.head == NULL); @@ -112,7 +112,7 @@ static void test_latch_multi_waiter(void) { assert(xstep_status(&c.base) == XSTEP_SUSPENDED); latch_set(&l, 99); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&a.base) == XSTEP_DEAD && a.got == 99); assert(xstep_status(&b.base) == XSTEP_DEAD && b.got == 99); @@ -183,7 +183,7 @@ static void test_select_winner(void) { assert(a.waiters && b.waiters && c.waiters); /* all armed */ latch_set(&b, 0xBBB); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == 1); /* b's index */ @@ -257,7 +257,7 @@ static void test_select_compose(void) { assert(xstep_status(&w.base) == XSTEP_SUSPENDED); latch_set(&a, 0); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == 0); /* outer winner: inner (index 0) */ @@ -296,17 +296,17 @@ static void test_allof_basic(void) { assert(xstep_status(&w.base) == XSTEP_SUSPENDED); latch_set(&a, 0xAAA); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_SUSPENDED); /* still waiting */ assert(!s.done.set); latch_set(&b, 0xBBB); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_SUSPENDED); assert(!s.done.set); latch_set(&c, 0xCCC); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == 2); /* closing index = c */ @@ -420,7 +420,7 @@ static void test_allof_compose_with_select(void) { /* Fire both halves of the allof; the select then sees its first input ready. */ latch_set(&a, 1); latch_set(&b, 2); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == 0); /* allof side won the select */ @@ -494,7 +494,7 @@ static void test_chan_send_blocks_until_recv(void) { assert(c.send_head == NULL); /* Sender's resumption is queued by the recv-side fire. */ - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&snd.base) == XSTEP_DEAD); assert(snd.done); } @@ -513,7 +513,7 @@ static void test_chan_recv_blocks_until_send(void) { assert(delivered); assert(c.recv_head == NULL); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&r.base) == XSTEP_DEAD); assert(r.got == 0xCAFE); } @@ -546,7 +546,7 @@ static void test_chan_fifo(void) { } assert(c.send_head == NULL); - rt_run(&rt); + rt_run(&rt, 0); for (int i = 0; i < 3; i++) { assert(xstep_status(&s[i].base) == XSTEP_DEAD); assert(s[i].done); @@ -578,7 +578,7 @@ static void test_chan_unpark_send(void) { /* a and d resume; b stays SUSPENDED (its waker was unparked * without firing). Drain so it doesn't dangle. */ - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&a.base) == XSTEP_DEAD); assert(xstep_status(&d.base) == XSTEP_DEAD); assert(xstep_status(&b.base) == XSTEP_SUSPENDED); @@ -604,7 +604,7 @@ static void test_chan_select_recv(void) { bool delivered = chan_try_send(&c, 0xABCDEF); assert(delivered); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == 0); /* chan_recv won (index 0) */ assert(inputs[0].value == 0xABCDEF); /* captured value */ @@ -638,7 +638,7 @@ static void test_chan_recv_fifo(void) { } assert(c.recv_head == NULL); - rt_run(&rt); + rt_run(&rt, 0); for (int i = 0; i < 3; i++) { assert(xstep_status(&r[i].base) == XSTEP_DEAD); assert(r[i].got == (uintptr_t)(200 + i)); @@ -661,7 +661,7 @@ static void test_chan_send_op_inline(void) { assert(op.done.set); /* delivered inline */ assert(c.send_head == NULL); /* nothing parked */ - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&r.base) == XSTEP_DEAD); assert(r.got == 0xFEED); @@ -711,7 +711,7 @@ static void test_chan_select_send(void) { assert(event_try(&c.recv, &v)); assert(v == 0x5EED); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == 0); /* send op won */ assert(timeout.waiters == NULL); /* loser disarmed */ @@ -740,7 +740,7 @@ static void test_chan_select_send_loses(void) { xstep(&w.base, 0); latch_set(&l, 0xDEAD); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == 1); /* latch (index 1) won */ assert(inputs[1].value == 0xDEAD); @@ -773,7 +773,7 @@ static void test_chan_select_recv_fast_path(void) { assert(winner == 0); assert(inputs[0].value == 0x12345); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&snd.base) == XSTEP_DEAD); select_event_deinit(&sel); } @@ -812,7 +812,7 @@ static void test_wait_or_cancel_ev_wins(void) { assert(l.waiters && c.waiters); /* both armed */ latch_set(&l, 0xF00D); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == WAIT_OK); assert(inputs[WAIT_OK].value == 0xF00D); @@ -836,7 +836,7 @@ static void test_wait_or_cancel_cancel_wins(void) { assert(xstep_status(&w.base) == XSTEP_SUSPENDED); cancel_set(&c); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == WAIT_CANCELLED); assert(l.waiters == NULL); /* ev disarmed */ @@ -902,7 +902,7 @@ static void test_wait_or_cancel_chan_recv(void) { assert(xstep_status(&w.base) == XSTEP_SUSPENDED); cancel_set(&c); - rt_run(&rt); + rt_run(&rt, 0); assert(xstep_status(&w.base) == XSTEP_DEAD); assert(w.got == WAIT_CANCELLED); assert(ch.recv_head == NULL); /* select_input_fire disarmed */ @@ -941,6 +941,337 @@ static void test_wait_or_cancel_send_op(void) { assert(ch.send_head == NULL); } +/* ---- Timers -------------------------------------------------------- */ + +static void test_timer_basic(void) { + /* Insert one timer; advance past its deadline; fire payload is the + * deadline value. */ + pairing_heap_t h; pairing_heap_init(&h); + timer_t t; + timer_init(&t, &h.base, 100); + assert(!timer_fired(&t)); + assert(t.in_heap); + + /* Not yet expired: advance is a no-op. */ + timers_advance(&h.base, 50); + assert(!timer_fired(&t)); + + /* Expired: fires. */ + timers_advance(&h.base, 100); + assert(timer_fired(&t)); + assert(!t.in_heap); + + uintptr_t v; + assert(event_try(timer_event(&t), &v)); + assert(v == 100); + + /* Idempotent deinit (already fired). */ + timer_deinit(&t); +} + +static void test_timer_peek(void) { + pairing_heap_t h; pairing_heap_init(&h); + uint64_t out = 0xDEAD; + assert(!timers_peek(&h.base, &out)); + assert(out == 0xDEAD); /* untouched on empty */ + + timer_t a, b, c; + timer_init(&a, &h.base, 300); + timer_init(&b, &h.base, 100); + timer_init(&c, &h.base, 200); + + assert(timers_peek(&h.base, &out)); + assert(out == 100); /* earliest deadline */ + + timers_advance(&h.base, 250); + /* b and c fired; a remains. */ + assert(timer_fired(&b)); + assert(timer_fired(&c)); + assert(!timer_fired(&a)); + assert(timers_peek(&h.base, &out)); + assert(out == 300); + + timer_deinit(&a); + timer_deinit(&b); + timer_deinit(&c); +} + +static void test_timer_cancel(void) { + /* Cancel before fire; later advance must not fire it. */ + pairing_heap_t h; pairing_heap_init(&h); + timer_t a, b, c; + timer_init(&a, &h.base, 100); + timer_init(&b, &h.base, 200); + timer_init(&c, &h.base, 300); + + timer_deinit(&b); /* cancel middle */ + assert(!b.in_heap); + assert(!timer_fired(&b)); + + timers_advance(&h.base, 1000); + assert(timer_fired(&a)); + assert(!timer_fired(&b)); /* cancelled — never fired */ + assert(timer_fired(&c)); + + timer_deinit(&a); + timer_deinit(&c); +} + +static void test_timer_park_wake(void) { + /* A waiter parked on a timer wakes when the timer fires. */ + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + timer_t t; + timer_init(&t, &h.base, 500); + + waiter_t w; waiter_init(&w, &rt, timer_event(&t)); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + assert(t.done.waiters == &w.sw.base); + + timers_advance(&h.base, 500); + /* fire enqueued the step. */ + assert(rt.head != NULL); + rt_run(&rt, 500); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 500); /* deadline as payload */ + + timer_deinit(&t); +} + +static void test_timer_select(void) { + /* Compose a timer into a select. */ + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + timer_t t; timer_init(&t, &h.base, 200); + latch_t l; latch_init(&l); + + select_event_t sel; + select_input_t inputs[2]; + event_t *srcs[2] = {timer_event(&t), &l.base}; + select_event_init(&sel, inputs, 2, srcs); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + timers_advance(&h.base, 200); + rt_run(&rt, 200); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 0); /* timer (input 0) won */ + assert(inputs[0].value == 200); /* deadline captured */ + assert(l.waiters == NULL); /* loser disarmed */ + + select_event_deinit(&sel); + timer_deinit(&t); +} + +static void test_timer_pairing_heap_order(void) { + /* Insert many timers with non-sorted deadlines; advance must pop + * them in deadline order. Stress the heap structure. */ + pairing_heap_t h; pairing_heap_init(&h); + enum { N = 32 }; + timer_t ts[N]; + /* A scrambled sequence — interleave halves so the insert order + * exercises the heap's restructuring. */ + uint64_t deadlines[N] = { + 17, 3, 25, 9, 31, 1, 28, 12, + 20, 7, 22, 5, 30, 14, 27, 10, + 16, 2, 24, 8, 32, 11, 29, 13, + 19, 6, 21, 4, 26, 15, 23, 18, + }; + + for (int i = 0; i < N; i++) timer_init(&ts[i], &h.base, deadlines[i]); + + /* Advance one tick at a time; verify exactly one timer fires per + * matching deadline (since deadlines 1..32 are unique). */ + int fired = 0; + for (uint64_t now = 1; now <= N; now++) { + timers_advance(&h.base, now); + int fired_now = 0; + for (int i = 0; i < N; i++) { + if (timer_fired(&ts[i]) && ts[i].deadline == now) fired_now++; + } + assert(fired_now == 1); + fired++; + } + assert(fired == N); + for (int i = 0; i < N; i++) assert(timer_fired(&ts[i])); +} + +static void test_timer_cancel_stress(void) { + /* Insert N timers; cancel every other one; advance and verify only + * the survivors fire, in order. Exercises non-root cancel paths. */ + pairing_heap_t h; pairing_heap_init(&h); + enum { N = 16 }; + timer_t ts[N]; + /* Deadlines: 1..N. Inserted in order so the heap shape is a long + * single-spine; cancels hit non-root nodes. */ + for (int i = 0; i < N; i++) timer_init(&ts[i], &h.base, (uint64_t)(i + 1)); + + /* Cancel even indices (deadlines 2,4,6,...). */ + for (int i = 0; i < N; i += 2) timer_deinit(&ts[i]); + + timers_advance(&h.base, N); + for (int i = 0; i < N; i++) { + if (i % 2 == 0) assert(!timer_fired(&ts[i])); + else assert( timer_fired(&ts[i])); + } + + for (int i = 1; i < N; i += 2) timer_deinit(&ts[i]); +} + +static void test_timer_deinit_idempotent(void) { + /* Deinit after fire is a no-op; deinit twice (without fire) is too. */ + pairing_heap_t h; pairing_heap_init(&h); + timer_t a, b; + timer_init(&a, &h.base, 10); + timer_init(&b, &h.base, 20); + + timers_advance(&h.base, 10); + assert(timer_fired(&a) && !timer_fired(&b)); + timer_deinit(&a); /* already fired */ + timer_deinit(&a); /* twice — still no-op */ + + timer_deinit(&b); /* cancel before fire */ + timer_deinit(&b); /* twice — must not corrupt */ + timers_advance(&h.base, 1000); + assert(!timer_fired(&b)); /* never fired */ +} + +/* ---- Timeout ------------------------------------------------------- */ + +static void test_timeout_fires(void) { + /* Bare timeout: advance past deadline, cancel becomes set. */ + pairing_heap_t h; pairing_heap_init(&h); + timeout_t to; + timeout_init(&to, &h.base, 100); + assert(!cancel_is_set(&to.cancel)); + + timers_advance(&h.base, 100); + assert(cancel_is_set(&to.cancel)); /* bridge fired */ + assert(timer_fired(&to.timer)); + + timeout_deinit(&to); /* idempotent */ +} + +static void test_timeout_deinit_before_fire(void) { + /* Deinit a timeout before its deadline: cancel must remain unset + * and the timer must be removed from the source so a later advance + * doesn't fire freed memory. */ + pairing_heap_t h; pairing_heap_init(&h); + timeout_t to; + timeout_init(&to, &h.base, 100); + timeout_deinit(&to); + assert(!cancel_is_set(&to.cancel)); + + /* Advance past the original deadline: nothing left to fire. */ + timers_advance(&h.base, 1000); + assert(!cancel_is_set(&to.cancel)); +} + +static void test_timeout_with_wait_or_cancel(void) { + /* Canonical pattern: a waiter blocks on (ev, timeout.cancel). The + * timeout fires first; waiter resumes with WAIT_CANCELLED. */ + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + rt_attach_timers(&rt, &h.base); + + latch_t ev; latch_init(&ev); + timeout_t to; + timeout_init(&to, &h.base, 50); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, &ev.base, &to.cancel); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + /* rt_run advances the timer source itself: now=50 fires the timer, + * which fires the bridge, which sets cancel, which fires sel, + * which enqueues the waiter — all in one rt_run call. */ + rt_run(&rt, 50); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == WAIT_CANCELLED); + + select_event_deinit(&sel); + timeout_deinit(&to); +} + +static void test_timeout_ev_wins(void) { + /* Event wins the race; timeout should be deinit'd cleanly without + * having fired. */ + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + rt_attach_timers(&rt, &h.base); + + latch_t ev; latch_init(&ev); + timeout_t to; + timeout_init(&to, &h.base, 1000); + + select_event_t sel; + select_input_t inputs[2]; + wait_or_cancel(&sel, inputs, &ev.base, &to.cancel); + + waiter_t w; waiter_init(&w, &rt, &sel.done.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + latch_set(&ev, 0xF00D); + rt_run(&rt, 50); /* now < deadline; timer untouched */ + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == WAIT_OK); + assert(inputs[WAIT_OK].value == 0xF00D); + assert(!cancel_is_set(&to.cancel)); + + select_event_deinit(&sel); + timeout_deinit(&to); + /* Subsequent advance must not fire anything (timer was removed). */ + rt_run(&rt, 10000); + assert(!cancel_is_set(&to.cancel)); +} + +/* ---- rt_run + timer integration ------------------------------------ */ + +static void test_rt_run_advances_timers(void) { + /* A waiter parked on a timer; rt_run with now=deadline advances the + * source, fires the timer, drains the waker — all in one call. */ + runtime_t rt; rt_init(&rt); + pairing_heap_t h; pairing_heap_init(&h); + rt_attach_timers(&rt, &h.base); + + timer_t t; timer_init(&t, &h.base, 42); + waiter_t w; waiter_init(&w, &rt, timer_event(&t)); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + assert(rt.head == NULL); /* parked, not queued */ + + rt_run(&rt, 42); + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 42); + assert(rt.head == NULL); + + timer_deinit(&t); +} + +static void test_rt_run_no_timers(void) { + /* Without an attached timer source, rt_run(rt, now) is the pure + * drainer regardless of `now`. */ + runtime_t rt; rt_init(&rt); + latch_t l; latch_init(&l); + + waiter_t w; waiter_init(&w, &rt, &l.base); + xstep(&w.base, 0); + assert(xstep_status(&w.base) == XSTEP_SUSPENDED); + + latch_set(&l, 7); + rt_run(&rt, 99999); /* now ignored */ + assert(xstep_status(&w.base) == XSTEP_DEAD); + assert(w.got == 7); +} + /* ---- Runtime test -------------------------------------------------- */ static void test_runtime_drains(void) { @@ -955,7 +1286,7 @@ static void test_runtime_drains(void) { for (int i = 0; i < 3; i++) xstep(&w[i].base, 0); latch_set(&l, 0xC0DE); - rt_run(&rt); + rt_run(&rt, 0); assert(rt.head == NULL); for (int i = 0; i < 3; i++) { @@ -1004,6 +1335,23 @@ int main(void) { test_wait_or_cancel_chan_recv(); test_wait_or_cancel_send_op(); + test_timer_basic(); + test_timer_peek(); + test_timer_cancel(); + test_timer_park_wake(); + test_timer_select(); + test_timer_pairing_heap_order(); + test_timer_cancel_stress(); + test_timer_deinit_idempotent(); + + test_timeout_fires(); + test_timeout_deinit_before_fire(); + test_timeout_with_wait_or_cancel(); + test_timeout_ev_wins(); + + test_rt_run_advances_timers(); + test_rt_run_no_timers(); + test_runtime_drains(); printf("ok\n");