xco.h (54688B)
1 /* 2 * xco.h — minimal C11 concurrency library. 3 * 4 * Four layers in this header, bottom-up: 5 * 6 * xco_mach_t Generic resumable function. A value that can be 7 * driven forward one step at a time; each step takes a 8 * uintptr_t in, returns one out, and reports whether 9 * the function suspended or finished. Substrate shared 10 * by stack-switching coroutines (xco) and hand-coded 11 * state machines. 12 * 13 * waiter / event / runtime 14 * Pollable event substrate (poll / unpark) with a 15 * single-threaded FIFO ready queue. Concrete events: 16 * latch, countdown, notify, semaphore/mutex, select, 17 * allof, channel, queue, broadcast, cancel, timer, 18 * pairing-heap timer source, timeout, ticker. All 19 * storage caller-provided, no allocation, no atomics. 20 * 21 * xco_task_t Lifecycle handle for a running xco_step. Bundles a 22 * done latch (fires with the step's return value) and 23 * a cancel latch (the cooperative wind-down signal). 24 * xco_task_group_t fans these in/out across a dynamic 25 * set of tasks. Storage caller-provided; the step 26 * itself lives wherever the caller put it. 27 * 28 * xco_coro_t Stack-switching coroutine. xco_coro_t embeds xco_mach_t as 29 * its first member, so a coroutine is one concrete 30 * kind of resumable function: generic code holding an 31 * xco_mach_t* works on coroutines and hand-coded state 32 * machines uniformly. Values pass between caller and 33 * coroutine through a single uintptr_t channel; pack 34 * richer data behind a pointer. 35 * Asymmetric: xco_suspend always returns to the most 36 * recent resumer. Resumes nest like function calls. 37 * Thread portability: the platform switch saves only 38 * callee-saved regs (no TLS register, no signal mask), 39 * so a fully-suspended coroutine could in principle be 40 * resumed on another thread. In practice don't: the 41 * runtime/event/waiter substrate is single-threaded 42 * (no atomics), so a coroutine parked on any event is 43 * tethered to that runtime's thread; and user code 44 * silently rebinds errno / _Thread_local / thread-affine 45 * OS handles to whichever thread resumed it. 46 * 47 * xco_cotask_t xco specialization of xco_task_t. The xco_trampoline calls 48 * fn(&xt->task, arg) and then xco_task_done with the 49 * return value, so xco_wait_or_cancel-style teardown works 50 * without the user wiring anything. 51 * 52 * One-waiter invariant. An xco_step, while suspended, is parked on at most 53 * one event. Multi-wait is composed in the event graph: build a 54 * select_event (or any future combinator — all-of, timeout, ...) and 55 * park on that. The xco_step never sees more than one event directly. 56 * This is what lets a single xco_waker_t live inline in the xco_step 57 * and a single next/prev pair serve both event waitlists and the 58 * runtime ready queue (the two list memberships are disjoint in time). 59 */ 60 61 #ifndef XCO_H 62 #define XCO_H 63 64 #include <assert.h> 65 #include <stdbool.h> 66 #include <stddef.h> 67 #include <stdint.h> 68 69 /* Provides XCO_SIZE, XCO_ALIGN, XCO_STACK_ALIGN, XCO__CTX_SIZE, 70 * XCO__CTX_ALIGN; resolved by the build to the platform-specific 71 * copy via the include path (-Iplatform/$(PLATFORM)). */ 72 #include "xco_platform.h" 73 74 /* ==================================================================== 75 * xco_step — generic resumable function interface. 76 * 77 * The first-member convention: embed xco_mach_t as the first field of 78 * your concrete type so a pointer to your type can be passed wherever 79 * an xco_mach_t * is expected. 80 * 81 * typedef struct { 82 * xco_mach_t base; 83 * int phase; 84 * ... 85 * } parser_t; 86 * 87 * static xco_step_result_t parser_step(xco_mach_t *s, uintptr_t v) { 88 * parser_t *p = (parser_t *)s; 89 * switch (p->phase) { 90 * case 0: p->phase = 1; return (xco_step_result_t){v + 1, XCO_STEP_SUSPENDED}; 91 * case 1: return (xco_step_result_t){v * 2, XCO_STEP_DEAD}; 92 * } 93 * __builtin_unreachable(); 94 * } 95 * 96 * parser_t p = { .base = {.step = parser_step, .status = XCO_STEP_INIT} }; 97 * xco_step_result_t r = xco_step(&p.base, 0); 98 * ==================================================================== */ 99 100 typedef enum { 101 XCO_STEP_INIT, /* created, never stepped */ 102 XCO_STEP_RUNNING, /* inside step(), or in an active resume chain */ 103 XCO_STEP_SUSPENDED, /* yielded; resumable */ 104 XCO_STEP_DEAD, /* function returned */ 105 } xco_mach_status_t; 106 107 typedef struct { 108 uintptr_t value; 109 xco_mach_status_t status; 110 } xco_step_result_t; 111 112 typedef struct xco_mach xco_mach_t; 113 typedef xco_step_result_t (*xco_step_fn)(xco_mach_t *s, uintptr_t value); 114 115 struct xco_mach { 116 xco_step_fn step; 117 xco_mach_status_t status; /* cached; xco_step() syncs from each result */ 118 }; 119 120 /* Drive one step. The wrapper updates s->status from the returned 121 * result so step implementations only need to populate the result. */ 122 static inline xco_step_result_t xco_step(xco_mach_t *s, uintptr_t value) { 123 xco_step_result_t r = s->step(s, value); 124 s->status = r.status; 125 return r; 126 } 127 128 static inline xco_mach_status_t xco_mach_status(const xco_mach_t *s) { 129 return s->status; 130 } 131 132 /* ==================================================================== 133 * Event substrate. 134 * 135 * Fused try-or-park: xco_event_poll(e, &out, w) returns true if the 136 * event is ready (writes the value to *out, w is NOT parked) and false 137 * otherwise (parks w on the waitlist iff w is non-NULL). The two 138 * degenerate forms: 139 * 140 * xco_event_poll(e, &v, NULL) — pure try (peek; never parks). 141 * xco_event_poll(e, NULL, w) — park-if-not-ready (out discarded). 142 * 143 * Fusing closes the try/park TOCTOU window an MT impl would otherwise 144 * have to internally re-check, and lets each event arm a waiter 145 * atomically with its readiness check. 146 * 147 * Standard usage from a state machine: 148 * 149 * uintptr_t v; 150 * if (xco_event_poll(e, &v, &my_waiter.base)) { ... use v ... } 151 * else { return SUSPENDED; } 152 * 153 * Standard usage from an xco coroutine wrapper: 154 * 155 * uintptr_t v; 156 * xco_waker_t sw; 157 * xco_waker_init(&sw, rt, &xco_self()->base); 158 * if (!xco_event_poll(e, &v, &sw.base)) { 159 * xco_suspend(0); 160 * (void)xco_event_poll(e, &v, NULL); // sticky: now ready 161 * } 162 * ==================================================================== */ 163 164 /* ---- Waiter ------------------------------------------------------------ */ 165 166 typedef struct xco_waiter xco_waiter_t; 167 struct xco_waiter { 168 /* Doubly-linked while parked on an event waitlist, so unpark is 169 * O(1). Reused as the singly-linked next pointer while on the 170 * runtime ready queue (FIFO, no removal from middle); prev is 171 * undefined in that state and reset on the next park. */ 172 xco_waiter_t *next; 173 xco_waiter_t *prev; 174 /* Fire callback. value is the event's payload at fire time — sticky 175 * events also store it on themselves, transient events (channels, 176 * one-shot signals) deliver only here. Waiters that don't care 177 * about the value just ignore the parameter. 178 * 179 * Invoke via xco_waiter_fire (below), not directly: the helper enforces 180 * the "fire receives a fully detached waiter" contract that makes it 181 * safe to re-park inside the callback. */ 182 void (*fire)(xco_waiter_t *w, uintptr_t value); 183 }; 184 185 /* Canonical way to invoke a waiter's fire callback. Hands the callback a 186 * fully detached waiter so the callback (or whatever the resumed step 187 * does) can re-park on a fresh waitlist without colliding with stale 188 * link state. Detachers that lead into fire (queue pops, latch drains, 189 * etc.) don't need to clear prev/next themselves. */ 190 static inline void xco_waiter_fire(xco_waiter_t *w, uintptr_t value) { 191 w->prev = NULL; 192 w->next = NULL; 193 w->fire(w, value); 194 } 195 196 /* ---- Event ------------------------------------------------------------ */ 197 198 typedef struct xco_event xco_event_t; 199 200 typedef struct { 201 /* Fused try + park. If the event is ready, write its value to *out 202 * (when out != NULL), do NOT park w, and return true. Otherwise, if 203 * w != NULL park it on the waitlist and return false; if w == NULL 204 * just return false without parking. *out is left untouched on the 205 * not-ready path. */ 206 bool (*poll)(xco_event_t *e, uintptr_t *out, xco_waiter_t *w); 207 /* Remove w from the waitlist. Idempotent: no-op if not parked. */ 208 void (*unpark)(xco_event_t *e, xco_waiter_t *w); 209 } xco_event_vtable_t; 210 211 struct xco_event { const xco_event_vtable_t *vt; }; 212 213 static inline bool xco_event_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { 214 return e->vt->poll(e, out, w); 215 } 216 static inline void xco_event_unpark(xco_event_t *e, xco_waiter_t *w) { e->vt->unpark(e, w); } 217 218 /* ---- Runtime ---------------------------------------------------------- */ 219 220 /* Forward-declared: the optional timer source attached to the runtime. 221 * Defined in the timer section below. */ 222 typedef struct xco_timers xco_timers_t; 223 224 typedef struct xco_op xco_op_t; 225 226 typedef struct xco_runtime { 227 xco_waiter_t *head, *tail; 228 xco_timers_t *timers; /* optional; advanced inside xco_rt_run */ 229 /* Pending ops list (xco_op layer). The runtime never inspects ops; it 230 * only owns this list-head pair so the host can pull a batch via 231 * xco_rt_take_ops. op_epoch is bumped each take and recorded on each 232 * submit; PENDING vs IN_FLIGHT is a generation match (see xco_op). */ 233 xco_op_t *op_head, *op_tail; 234 uint64_t op_epoch; 235 } xco_runtime_t; 236 237 static inline void xco_rt_init(xco_runtime_t *rt) { 238 rt->head = rt->tail = NULL; 239 rt->timers = NULL; 240 rt->op_head = rt->op_tail = NULL; 241 rt->op_epoch = 0; 242 } 243 244 /* Attach (or detach with NULL) a timer source. While attached, xco_rt_run 245 * advances it each pass with the now value the caller supplied; firing 246 * timers may enqueue more waiters, which the same xco_rt_run call then drains. */ 247 static inline void xco_rt_attach_timers(xco_runtime_t *rt, xco_timers_t *ts) { 248 rt->timers = ts; 249 } 250 251 /* Append w to the ready queue. Used by xco__waker_fire and by anyone 252 * else that wants a waiter resumed by the scheduler. */ 253 static inline void xco_rt_enqueue(xco_runtime_t *rt, xco_waiter_t *w) { 254 w->next = NULL; 255 if (rt->tail) rt->tail->next = w; 256 else rt->head = w; 257 rt->tail = w; 258 } 259 260 /* Drain the ready queue, resuming each waker's xco_step, until empty. 261 * Steps may re-arm on events (and thus leave the queue) or enqueue 262 * other steps; xco_rt_run keeps going until quiescent. now is forwarded to 263 * any attached timer source's advance(); pass 0 (or anything) when no 264 * source is attached. The library never reads a clock — now is always 265 * caller-supplied. */ 266 void xco_rt_run(xco_runtime_t *rt, uint64_t now); 267 268 /* The canonical bridge between events and the scheduler. When fired, 269 * stashes the value and enqueues itself onto rt; xco_rt_run pops it and 270 * calls xco_step(mach, value), so the resumed step receives the event's 271 * payload directly without a re-try. 272 * 273 * Init once, re-park freely. The runtime hands the waiter back fully 274 * detached (next/prev cleared) before invoking the resumed step, so a 275 * subscriber that wants to wait on the next event can call xco_event_poll 276 * directly — no re-init needed unless rt or step changes. */ 277 typedef struct { 278 xco_waiter_t base; 279 xco_runtime_t *rt; 280 xco_mach_t *mach; 281 uintptr_t resume_value; /* set by fire, consumed by xco_rt_run */ 282 } xco_waker_t; 283 284 /* Defined in xco.c; declared here so xco_waker_init can install it. */ 285 void xco__waker_fire(xco_waiter_t *w, uintptr_t value); 286 287 static inline void xco_waker_init(xco_waker_t *sw, xco_runtime_t *rt, xco_mach_t *m) { 288 sw->base.next = NULL; 289 sw->base.prev = NULL; 290 sw->base.fire = xco__waker_fire; 291 sw->rt = rt; 292 sw->mach = m; 293 sw->resume_value = 0; 294 } 295 296 /* ---- Latch ------------------------------------------------------------ */ 297 298 /* One-shot sticky event. set() flips the bit, stores the payload, and 299 * fires every waiter. Subsequent set() calls are ignored. To re-arm, 300 * reinitialize a fresh latch. */ 301 typedef struct { 302 xco_event_t base; 303 bool set; 304 uintptr_t value; 305 xco_waiter_t *waiters; 306 } xco_latch_t; 307 308 /* Defined in xco.c; referenced by xco_latch_init. */ 309 extern const xco_event_vtable_t xco__latch_vt; 310 311 static inline void xco_latch_init(xco_latch_t *l) { 312 l->base.vt = &xco__latch_vt; 313 l->set = false; 314 l->value = 0; 315 l->waiters = NULL; 316 } 317 318 void xco_latch_set(xco_latch_t *l, uintptr_t value); 319 320 /* ---- Countdown -------------------------------------------------------- */ 321 322 /* One-shot fan-in counter. Fires its embedded latch (payload 0) when 323 * remaining hits 0. xco_countdown_add(n) is legal while remaining > 0; 324 * xco_countdown_done decrements; both are UB once the latch has fired. 325 * 326 * Compose with the standard event API via xco_countdown_event(). */ 327 typedef struct xco_countdown { 328 xco_latch_t done; 329 size_t remaining; 330 } xco_countdown_t; 331 332 static inline void xco_countdown_init(xco_countdown_t *c, size_t n) { 333 xco_latch_init(&c->done); 334 c->remaining = n; 335 if (n == 0) xco_latch_set(&c->done, 0); 336 } 337 338 static inline void xco_countdown_add(xco_countdown_t *c, size_t n) { 339 assert(!c->done.set); 340 c->remaining += n; 341 } 342 343 static inline void xco_countdown_done(xco_countdown_t *c) { 344 assert(c->remaining > 0); 345 if (--c->remaining == 0) xco_latch_set(&c->done, 0); 346 } 347 348 static inline xco_event_t *xco_countdown_event(xco_countdown_t *c) { return &c->done.base; } 349 static inline bool xco_countdown_fired(const xco_countdown_t *c) { return c->done.set; } 350 351 /* ---- Notify (wake-one / wake-all) ------------------------------------- */ 352 353 /* Transient signal with no sticky state. xco_notify_one fires (and detaches) 354 * the head of a FIFO waitlist; xco_notify_all fires every parked waiter. Both 355 * are no-ops when the waitlist is empty. Subscribers must re-park to see 356 * subsequent notifications. 357 * 358 * xco_event_poll never reports ready: there is no "ready now" state — a 359 * subscriber waits for the *next* notify. */ 360 typedef struct xco_notify { 361 xco_event_t base; 362 xco_waiter_t *head, *tail; 363 } xco_notify_t; 364 365 extern const xco_event_vtable_t xco__notify_vt; 366 367 static inline void xco_notify_init(xco_notify_t *n) { 368 n->base.vt = &xco__notify_vt; 369 n->head = n->tail = NULL; 370 } 371 372 static inline xco_event_t *xco_notify_event(xco_notify_t *n) { return &n->base; } 373 374 void xco_notify_one(xco_notify_t *n); 375 void xco_notify_all(xco_notify_t *n); 376 377 /* ---- Semaphore -------------------------------------------------------- */ 378 379 /* Counting semaphore. acquire is exposed as xco_event_t (composable with 380 * select / xco_wait_or_cancel): xco_event_poll succeeds and decrements when 381 * permits > 0; otherwise the waiter parks FIFO. xco_semaphore_release(n) 382 * hands one permit to each of up to n waiting waiters (each is fired, 383 * which the receiver treats as "you got a permit") before adding any 384 * leftover to the count. 385 * 386 * One permit per acquire. Bulk acquire isn't expressible in xco_event_t's 387 * shape; if you need it, call sequentially. For binary use (mutex-style 388 * critical section across awaits) init with permits = 1. 389 * 390 * Fairness: FIFO at the waitlist. A racing inline xco_event_poll by a fresh 391 * caller can jump ahead of parked waiters when permits are released 392 * back to count rather than directly handed off — release prefers 393 * parked waiters first to avoid that. */ 394 typedef struct xco_semaphore { 395 xco_event_t acquire; 396 size_t permits; 397 xco_waiter_t *head, *tail; 398 } xco_semaphore_t; 399 400 extern const xco_event_vtable_t xco__semaphore_acquire_vt; 401 402 static inline void xco_semaphore_init(xco_semaphore_t *s, size_t initial) { 403 s->acquire.vt = &xco__semaphore_acquire_vt; 404 s->permits = initial; 405 s->head = s->tail = NULL; 406 } 407 408 static inline xco_event_t *xco_semaphore_event(xco_semaphore_t *s) { return &s->acquire; } 409 410 void xco_semaphore_release(xco_semaphore_t *s, size_t n); 411 412 /* ---- Mutex ------------------------------------------------------------ */ 413 414 /* Binary semaphore wrapper for vocabulary at call sites. xco_mutex_init is 415 * xco_semaphore_init(s, 1); the xco_event_t fires once per release; xco_mutex_release 416 * hands the permit to the next waiter (or returns it to the count). */ 417 typedef xco_semaphore_t xco_mutex_t; 418 419 static inline void xco_mutex_init (xco_mutex_t *m) { xco_semaphore_init(m, 1); } 420 static inline xco_event_t *xco_mutex_event (xco_mutex_t *m) { return xco_semaphore_event(m); } 421 static inline void xco_mutex_release(xco_mutex_t *m) { xco_semaphore_release(m, 1); } 422 423 /* ---- Select / all-of -------------------------------------------------- */ 424 425 /* Wait over N input events. Two semantics share the same storage shape, 426 * so a caller can switch between them by changing only the init call: 427 * 428 * xco_select_event_init fires when ANY input fires (any-of) 429 * xco_allof_event_init fires when ALL inputs fire (all-of) 430 * 431 * In both cases done's payload is the index of the input whose firing 432 * closed the wait — the winner for select, the last-to-fire for allof — 433 * and inputs[i].value carries each fired input's payload (works 434 * uniformly for sticky and transient sources, where re-trying the input 435 * would either succeed or fail). 436 * 437 * Composes: a select_event is itself an event. */ 438 439 typedef struct xco_select_event xco_select_event_t; 440 441 /* Per-input arming record. Caller-allocated as an array of n alongside 442 * the select_event. After fire, .value holds whatever the input 443 * delivered; other fields are internal. */ 444 typedef struct { 445 xco_waiter_t w; 446 xco_event_t *src; 447 xco_select_event_t *parent; 448 uintptr_t value; /* captured at fire time */ 449 } xco_select_input_t; 450 451 struct xco_select_event { 452 xco_latch_t done; /* fires with the closing input's index */ 453 xco_select_input_t *inputs; 454 size_t n; 455 size_t remaining; /* counts down; done fires at 0 456 * (select: starts at 1, allof: at n) */ 457 }; 458 459 /* Initialize as a select (any-of). inputs[] is caller-provided storage 460 * for n nodes; srcs[] is the array of n input event pointers (read 461 * once during init). If any input is already ready, the select fires 462 * immediately and no waiters are parked. Use &s->done.base as the 463 * resulting xco_event_t. */ 464 void xco_select_event_init(xco_select_event_t *s, 465 xco_select_input_t *inputs, size_t n, 466 xco_event_t *const *srcs); 467 468 /* Initialize as an allof (all-of). Inputs already ready at init are 469 * consumed inline (no parking, value captured); if every input is 470 * ready, done fires immediately. n == 0 fires done with payload 0. */ 471 void xco_allof_event_init(xco_select_event_t *s, 472 xco_select_input_t *inputs, size_t n, 473 xco_event_t *const *srcs); 474 475 /* Disarm any inputs still parked. Safe to call after fire (no-op) and 476 * after partial completion (allof). Required before s leaves scope if 477 * it has not yet fired. */ 478 void xco_select_event_deinit(xco_select_event_t *s); 479 480 481 /* ---- Queue ------------------------------------------------------------ */ 482 483 /* Bounded FIFO of uintptr_t. Caller provides the ring buffer storage. 484 * Recv side is exposed as xco_event_t (composable with select). Send side 485 * is a typed API (carries a value), shaped after the event-poll fusion: 486 * fused try + park, NULL-waiter degenerates to pure-try. 487 * 488 * Three full-buffer policies, fixed at init: 489 * XCO_QUEUE_BLOCK senders park until a receiver makes room. 490 * XCO_QUEUE_DROP_NEWEST xco_queue_send_poll silently discards the new value. 491 * XCO_QUEUE_DROP_OLDEST xco_queue_send_poll evicts the head and pushes new tail. 492 * 493 * Senders never park under DROP_* policies — passing a non-NULL qsw is 494 * only meaningful under XCO_QUEUE_BLOCK. xco_queue_send_unpark is 495 * idempotent (cancellation-safe). 496 * 497 * Direct-handoff: xco_queue_send_poll first checks for a parked receiver and 498 * delivers inline if present (payload bypasses the buffer), regardless 499 * of policy. 500 * 501 * Rendezvous matrix (BLOCK + cap=0; the xco_chan_* aliases at the bottom 502 * of this section name this configuration explicitly): 503 * send + parked recv fire recv with value, sender continues inline. 504 * send + no recv sender parks (xco_queue_send_poll); peer pulls later. 505 * recv + parked sender read sender's value, fire sender (delivery 506 * confirmation), receiver continues inline. 507 * recv + no sender receiver parks (xco_event_poll on recv); peer 508 * delivers later. 509 * 510 * FIFO order on both waitlists. 511 * 512 * Close: optional EOF semantics. After xco_queue_close, xco_queue_send_poll 513 * returns XCO_QSEND_CLOSED regardless of policy (no delivery), parked 514 * senders are drained with delivered=false, and parked receivers are 515 * woken so they can observe XCO_RECV_CLOSED via xco_queue_recv. The recv 516 * event is "ready" iff a value is available OR the queue is closed — 517 * receivers must call xco_queue_recv to disambiguate value vs EOF. */ 518 519 /* Result of a typed receive on a queue (or chan, which is just a queue). */ 520 typedef enum { 521 XCO_RECV_GOT, /* *out holds the delivered value */ 522 XCO_RECV_EMPTY, /* nothing available right now; caller may park */ 523 XCO_RECV_CLOSED, /* peer closed and no values remain */ 524 } xco_recv_status_t; 525 526 typedef enum { 527 XCO_QUEUE_BLOCK, 528 XCO_QUEUE_DROP_NEWEST, 529 XCO_QUEUE_DROP_OLDEST, 530 } xco_queue_policy_t; 531 532 typedef struct xco_queue { 533 xco_event_t recv; 534 uintptr_t *buf; 535 size_t cap, head, len; 536 xco_queue_policy_t policy; 537 xco_waiter_t *send_head, *send_tail; 538 xco_waiter_t *recv_head, *recv_tail; 539 bool closed; 540 } xco_queue_t; 541 542 extern const xco_event_vtable_t xco__queue_recv_vt; 543 544 static inline void xco_queue_init(xco_queue_t *q, uintptr_t *buf, size_t cap, 545 xco_queue_policy_t policy) { 546 q->recv.vt = &xco__queue_recv_vt; 547 q->buf = buf; 548 q->cap = cap; 549 q->head = 0; 550 q->len = 0; 551 q->policy = policy; 552 q->send_head = q->send_tail = NULL; 553 q->recv_head = q->recv_tail = NULL; 554 q->closed = false; 555 } 556 557 static inline xco_event_t *xco_queue_recv_event(xco_queue_t *q) { return &q->recv; } 558 559 /* Sender-side waiter for XCO_QUEUE_BLOCK. Same shape as xco_chan_send_waiter_t: 560 * a waker plus a value slot the receiver / close-drain reads back on the 561 * park path. `delivered` is set by the closing side: true on a normal 562 * handoff, false on a close drain. */ 563 typedef struct { 564 xco_waker_t sw; 565 uintptr_t value; /* set by xco_queue_send_poll on the park path */ 566 bool delivered; 567 } xco_queue_send_waiter_t; 568 569 static inline void xco_queue_send_waiter_init(xco_queue_send_waiter_t *qsw, 570 xco_runtime_t *rt, xco_mach_t *m) { 571 xco_waker_init(&qsw->sw, rt, m); 572 qsw->value = 0; 573 qsw->delivered = false; 574 } 575 576 /* Result of xco_queue_send_poll. */ 577 typedef enum { 578 XCO_QSEND_ACCEPTED, /* delivered to a parked receiver, buffered, or 579 accepted-by-policy (silently dropped under 580 DROP_NEWEST, evicted-and-pushed under DROP_OLDEST) */ 581 XCO_QSEND_BLOCKED, /* BLOCK + full; parked iff qsw != NULL */ 582 XCO_QSEND_CLOSED, /* queue closed; never parks. Returned regardless 583 of policy — closed is closed. */ 584 } xco_queue_send_status_t; 585 586 /* Fused try + park for a sender. Direct-delivers to a parked receiver if 587 * one is waiting (returns XCO_QSEND_ACCEPTED, qsw not parked). Otherwise: 588 * 589 * XCO_QUEUE_BLOCK + room buffered → XCO_QSEND_ACCEPTED. 590 * XCO_QUEUE_BLOCK + full XCO_QSEND_BLOCKED; if qsw != NULL the 591 * sender's value is stashed in qsw->value 592 * and qsw is parked. 593 * XCO_QUEUE_DROP_NEWEST + full silently drops → XCO_QSEND_ACCEPTED. 594 * XCO_QUEUE_DROP_OLDEST + full evicts head, pushes new tail → ACCEPTED. 595 * closed (any policy) XCO_QSEND_CLOSED; never parks. 596 * 597 * The two degenerate forms mirror xco_event_poll: 598 * xco_queue_send_poll(q, v, NULL) — pure try (peek; never parks). 599 * xco_queue_send_poll(q, v, qsw) — fused try-or-park (BLOCK only). */ 600 xco_queue_send_status_t xco_queue_send_poll(xco_queue_t *q, uintptr_t value, 601 xco_queue_send_waiter_t *qsw); 602 603 void xco_queue_send_unpark(xco_queue_t *q, xco_queue_send_waiter_t *qsw); 604 605 /* Typed receive. Disambiguates value vs EOF where xco_event_poll cannot: 606 * returns XCO_RECV_GOT (value popped from the buffer or directly from a 607 * parked sender), XCO_RECV_CLOSED (closed and drained), or 608 * XCO_RECV_EMPTY (caller may park). */ 609 xco_recv_status_t xco_queue_recv(xco_queue_t *q, uintptr_t *out); 610 611 /* Close the queue. Idempotent. Drains parked senders (delivered=false) 612 * and wakes parked receivers. After close, xco_queue_send_poll returns 613 * XCO_QSEND_CLOSED regardless of policy. */ 614 void xco_queue_close(xco_queue_t *q); 615 static inline bool xco_queue_is_closed(const xco_queue_t *q) { return q->closed; } 616 617 /* Selectable send op. A per-call object that holds the value, parks on 618 * the queue (only meaningful under XCO_QUEUE_BLOCK), and exposes 619 * &op->done.base as the event that fires when the send resolves. 620 * 621 * The op embeds a xco_queue_send_waiter_t (so the queue's send list stays 622 * uniform — receivers read .value at the same offset for both direct 623 * and op senders) but rewires its fire callback: instead of resuming an 624 * xco_step, fire sets op->done. Polymorphism via the function pointer. 625 * 626 * The done latch's payload is 1 on XCO_QSEND_ACCEPTED (handed to a 627 * receiver, buffered, or accepted under DROP_*) and 0 on 628 * XCO_QSEND_CLOSED / close-drain. 629 * 630 * Under DROP_* policies the send always resolves inline at init (the 631 * poll returns ACCEPTED and op->done fires immediately). 632 * 633 * Lifecycle: init → wait on &op->done.base → deinit. Always deinit; 634 * it's a no-op after resolution and unparks the queue-side waiter if not. */ 635 typedef struct { 636 xco_queue_send_waiter_t qsw; /* parked on queue; fire overridden */ 637 xco_queue_t *queue; 638 xco_latch_t done; 639 } xco_queue_send_op_t; 640 641 void xco__queue_send_op_fire(xco_waiter_t *w, uintptr_t value); 642 643 void xco_queue_send_op_init(xco_queue_send_op_t *op, xco_queue_t *q, uintptr_t value); 644 static inline void xco_queue_send_op_deinit(xco_queue_send_op_t *op) { 645 if (op->done.set) return; 646 xco_queue_send_unpark(op->queue, &op->qsw); 647 } 648 649 /* ---- Channel (alias) -------------------------------------------------- */ 650 651 /* Unbuffered rendezvous channel: a queue at cap=0 with XCO_QUEUE_BLOCK 652 * policy. Senders and receivers wait on each other; whichever arrives 653 * first parks until its peer shows up. The pending value lives in the 654 * sender's xco_chan_send_waiter_t for the duration of any wait — no 655 * per-channel buffer storage. 656 * 657 * The xco_chan_* names are thin aliases over the queue API: a chan IS a 658 * queue. They exist so call sites can name "rendezvous" explicitly 659 * rather than spelling out cap=0+BLOCK. The queue's recv event, send 660 * poll, close, recv, and selectable send op all carry over unchanged. 661 * 662 * Storage: an xco_chan_t carries the queue's buffer/cap/policy fields 663 * even though they're inert at cap=0 (~40 bytes of overhead vs a 664 * dedicated rendezvous struct). Below the noise floor for typical use. */ 665 666 typedef xco_queue_t xco_chan_t; 667 typedef xco_queue_send_waiter_t xco_chan_send_waiter_t; 668 typedef xco_queue_send_status_t xco_chan_send_status_t; 669 typedef xco_queue_send_op_t xco_chan_send_op_t; 670 671 /* Status aliases. Same enumerators, vocabulary at call sites: a chan 672 * "delivers" rather than "accepts" — but the underlying constants are 673 * the queue's. */ 674 #define XCO_SEND_DELIVERED XCO_QSEND_ACCEPTED 675 #define XCO_SEND_BLOCKED XCO_QSEND_BLOCKED 676 #define XCO_SEND_CLOSED XCO_QSEND_CLOSED 677 678 static inline void xco_chan_init(xco_chan_t *c) { 679 xco_queue_init(c, NULL, 0, XCO_QUEUE_BLOCK); 680 } 681 static inline xco_event_t *xco_chan_recv_event(xco_chan_t *c) { 682 return xco_queue_recv_event(c); 683 } 684 static inline void xco_chan_send_waiter_init(xco_chan_send_waiter_t *csw, 685 xco_runtime_t *rt, xco_mach_t *m) { 686 xco_queue_send_waiter_init(csw, rt, m); 687 } 688 static inline xco_chan_send_status_t xco_chan_send_poll(xco_chan_t *c, uintptr_t value, 689 xco_chan_send_waiter_t *csw) { 690 return xco_queue_send_poll(c, value, csw); 691 } 692 static inline void xco_chan_send_unpark(xco_chan_t *c, xco_chan_send_waiter_t *csw) { 693 xco_queue_send_unpark(c, csw); 694 } 695 static inline xco_recv_status_t xco_chan_recv(xco_chan_t *c, uintptr_t *out) { 696 return xco_queue_recv(c, out); 697 } 698 static inline void xco_chan_close(xco_chan_t *c) { xco_queue_close(c); } 699 static inline bool xco_chan_is_closed(const xco_chan_t *c) { 700 return xco_queue_is_closed(c); 701 } 702 static inline void xco_chan_send_op_init(xco_chan_send_op_t *op, xco_chan_t *c, uintptr_t value) { 703 xco_queue_send_op_init(op, c, value); 704 } 705 static inline void xco_chan_send_op_deinit(xco_chan_send_op_t *op) { 706 xco_queue_send_op_deinit(op); 707 } 708 709 /* ---- Broadcast (slot) ------------------------------------------------- */ 710 711 /* Re-armable signal carrying a "latest value" slot. Subscribers park on 712 * the event; xco_broadcast_publish stores the new value, fires every parked 713 * subscriber with that value, and clears the waitlist — subscribers must 714 * re-park to see further publishes. Subscribers that aren't parked at 715 * publish time miss that publish but will see the next one. This is the 716 * coalescing "watch a slot" semantics, not lossless fan-out. 717 * 718 * xco_event_poll never reports ready: there is no "ready now" state — a 719 * subscriber waits for the *next* publish. To read the latest published 720 * value at any time, use xco_broadcast_value (valid once xco_broadcast_has_value 721 * returns true). 722 * 723 * For lossless multi-consumer delivery, give each subscriber its own 724 * queue and have the producer write to all of them. */ 725 726 typedef struct xco_broadcast { 727 xco_event_t base; 728 bool has_value; 729 uintptr_t value; 730 xco_waiter_t *waiters; 731 } xco_broadcast_t; 732 733 extern const xco_event_vtable_t xco__broadcast_vt; 734 735 static inline void xco_broadcast_init(xco_broadcast_t *b) { 736 b->base.vt = &xco__broadcast_vt; 737 b->has_value = false; 738 b->value = 0; 739 b->waiters = NULL; 740 } 741 742 static inline xco_event_t *xco_broadcast_event (xco_broadcast_t *b) { return &b->base; } 743 static inline bool xco_broadcast_has_value(const xco_broadcast_t *b) { return b->has_value; } 744 static inline uintptr_t xco_broadcast_value (const xco_broadcast_t *b) { return b->value; } 745 746 void xco_broadcast_publish(xco_broadcast_t *b, uintptr_t value); 747 748 /* ---- Cancellation ----------------------------------------------------- */ 749 750 /* A cancellation token is a sticky latch — these aliases exist for 751 * vocabulary at call sites. xco_cancel_set fires every parked waiter; the 752 * idempotency of xco_latch_set means racing cancellers are fine. 753 * 754 * Pair a xco_cancel_t with any blocking event via xco_wait_or_cancel to get 755 * "await X, or be cancelled." 756 * 757 * Discipline: cancellation notifies; it never drops in-flight values. 758 * A cancelled await returns control to its caller, which is responsible 759 * for draining whatever it owns — deinit a pending chan_send_op so its 760 * value goes back to the sender, deinit a select_event so input waiters 761 * detach, drive a cancellable coroutine to XCO_STEP_DEAD before freeing 762 * its stack. The xco layer does no unwinding; the coroutine cooperates. */ 763 764 typedef xco_latch_t xco_cancel_t; 765 766 static inline void xco_cancel_init(xco_cancel_t *c) { xco_latch_init(c); } 767 static inline void xco_cancel_set(xco_cancel_t *c) { xco_latch_set(c, 0); } 768 static inline bool xco_cancel_is_set(const xco_cancel_t *c) { return c->set; } 769 static inline xco_event_t *xco_cancel_event(xco_cancel_t *c) { return &c->base; } 770 771 /* Outcome indices for xco_wait_or_cancel — match the inputs[] order so the 772 * value the resumer receives from the latched select reads as one of 773 * these directly. */ 774 enum { 775 XCO_WAIT_OK = 0, /* ev fired; inputs[0].value holds its payload */ 776 XCO_WAIT_CANCELLED = 1, /* cancel fired before ev */ 777 }; 778 779 /* Build a select over (ev, cancel) using caller-provided storage. If 780 * either is already ready at init the select fast-paths and never parks 781 * anyone (ev is checked first, so an event that has already resolved 782 * wins over a concurrent cancel). Treat &out->done.base as the event 783 * to wait on. Always pair with xco_select_event_deinit before storage 784 * leaves scope (no-op once fired). */ 785 static inline void xco_wait_or_cancel(xco_select_event_t *out, 786 xco_select_input_t inputs[2], 787 xco_event_t *ev, xco_cancel_t *c) { 788 xco_event_t *srcs[2] = {ev, xco_cancel_event(c)}; 789 xco_select_event_init(out, inputs, 2, srcs); 790 } 791 792 /* ---- Timers ----------------------------------------------------------- */ 793 794 /* A timer is a sticky event keyed on a u64 deadline. It fires (exactly 795 * once) when the attached timer source is advanced past that deadline. 796 * The library never reads a clock; the caller provides `now` to 797 * xco_timers_advance (or via xco_rt_run). 798 * 799 * Storage is pluggable through the timers vtable so callers can swap a 800 * pairing heap (in-tree, O(log n) amortized everywhere including cancel) 801 * for a wheel or other structure without touching the timer/timeout 802 * surface. The timer struct holds the heap link fields inline; the source 803 * impl interprets them. 804 * 805 * Lifecycle: 806 * xco_timer_init(t, ts, deadline) // inserts into ts 807 * ... wait on xco_timer_event(t), or compose into select/xco_wait_or_cancel ... 808 * xco_timer_deinit(t) // removes from ts if not yet fired 809 * 810 * Fire payload is the deadline. Re-arming = reinit a fresh timer. */ 811 812 typedef struct xco_timer xco_timer_t; 813 814 typedef struct { 815 /* Insert t into the source. t must be initialized but not yet 816 * inserted; insert sets t's heap link fields. */ 817 void (*insert) (xco_timers_t *ts, xco_timer_t *t); 818 /* Remove t from the source if currently inserted. Caller must 819 * ensure t was inserted into this same source. */ 820 void (*cancel) (xco_timers_t *ts, xco_timer_t *t); 821 /* Fire every timer whose deadline <= now, in deadline order, popping 822 * each from the source. Each fire drains the timer's waiter list. */ 823 void (*advance)(xco_timers_t *ts, uint64_t now); 824 /* Return the earliest queued deadline, or UINT64_MAX if no timer 825 * is queued. */ 826 uint64_t (*peek)(const xco_timers_t *ts); 827 } xco_timers_vtable_t; 828 829 struct xco_timers { 830 const xco_timers_vtable_t *vt; 831 uint64_t now; /* most recent advance() input; monotonic */ 832 }; 833 834 static inline void xco_timers_insert (xco_timers_t *ts, xco_timer_t *t) { ts->vt->insert (ts, t); } 835 static inline void xco_timers_cancel (xco_timers_t *ts, xco_timer_t *t) { ts->vt->cancel (ts, t); } 836 static inline void xco_timers_advance(xco_timers_t *ts, uint64_t now) { 837 assert(now >= ts->now); 838 ts->now = now; 839 ts->vt->advance(ts, now); 840 } 841 static inline uint64_t xco_timers_peek (const xco_timers_t *ts) { return ts->vt->peek(ts); } 842 static inline uint64_t xco_timers_now (const xco_timers_t *ts) { return ts->now; } 843 844 /* Concrete timer. Embeds a latch so try/park/unpark and the fire-all 845 * waitlist handling come for free; the timer source manipulates only 846 * the heap link fields and triggers the latch on fire. The latch's 847 * payload after fire is the timer's deadline. */ 848 struct xco_timer { 849 xco_latch_t done; /* fires once, payload = deadline */ 850 uint64_t deadline; 851 xco_timers_t *src; /* source this timer is registered with */ 852 bool in_heap; /* true between insert and fire/cancel */ 853 /* Pairing-heap link fields; opaque to anyone but the source impl. 854 * prev is parent if first child, else previous sibling, else NULL. */ 855 xco_timer_t *child, *prev, *next; 856 }; 857 858 static inline xco_event_t *xco_timer_event(xco_timer_t *t) { return &t->done.base; } 859 static inline bool xco_timer_fired(const xco_timer_t *t) { return t->done.set; } 860 861 static inline void xco_timer_init(xco_timer_t *t, xco_timers_t *ts, uint64_t deadline) { 862 xco_latch_init(&t->done); 863 t->deadline = deadline; 864 t->src = ts; 865 t->in_heap = false; 866 t->child = NULL; 867 t->prev = NULL; 868 t->next = NULL; 869 xco_timers_insert(ts, t); 870 } 871 872 static inline void xco_timer_deinit(xco_timer_t *t) { 873 if (t->in_heap) xco_timers_cancel(t->src, t); 874 } 875 876 /* In-tree timer source: intrusive pairing heap. O(1) amortized insert 877 * and meld; O(log n) amortized advance and cancel. No per-source 878 * allocation — the heap is just a root pointer; nodes live in the 879 * caller's xco_timer_t's. */ 880 typedef struct { 881 xco_timers_t base; 882 xco_timer_t *root; 883 } xco_pairing_heap_t; 884 885 extern const xco_timers_vtable_t xco__pairing_heap_vt; 886 887 static inline void xco_pairing_heap_init(xco_pairing_heap_t *h) { 888 h->base.vt = &xco__pairing_heap_vt; 889 h->base.now = 0; 890 h->root = NULL; 891 } 892 893 /* ---- Timeout ---------------------------------------------------------- */ 894 895 /* Bundle: a timer that fires a xco_cancel_t on expiration. The natural 896 * pairing for "await ev, or be cancelled by deadline": 897 * 898 * xco_timeout_t to; 899 * xco_timeout_init(&to, ts, now + budget); 900 * xco_select_event_t sel; xco_select_input_t inputs[2]; 901 * xco_wait_or_cancel(&sel, inputs, ev, &to.cancel); 902 * ... wait on &sel.done.base ... 903 * xco_select_event_deinit(&sel); 904 * xco_timeout_deinit(&to); // safe whether the timer fired or not 905 * 906 * The bridge waiter is parked on the timer; when it fires it sets the 907 * cancel. Bridge fire is idempotent vs xco_cancel_set (a sticky latch). */ 908 typedef struct xco_timeout { 909 xco_timer_t timer; 910 xco_cancel_t cancel; 911 xco_waiter_t bridge; 912 } xco_timeout_t; 913 914 void xco_timeout_init(xco_timeout_t *to, xco_timers_t *ts, uint64_t deadline); 915 static inline void xco_timeout_deinit(xco_timeout_t *to) { 916 xco_event_unpark(xco_timer_event(&to->timer), &to->bridge); 917 xco_timer_deinit(&to->timer); 918 } 919 920 /* ---- Ticker ----------------------------------------------------------- */ 921 922 /* Re-armable transient signal driven by a timer source. Each time the 923 * underlying timer fires, the ticker computes the next deadline (period 924 * past the just-fired one, with skip-ahead for catch-up after overflow), 925 * reinstalls the timer, and fires every parked subscriber with the 926 * just-fired deadline as the payload. Subscribers that aren't parked at 927 * a fire miss it (transient — same coalescing semantics as broadcast). 928 * 929 * xco_ticker_init(&t, ts, period, first_deadline); 930 * ... wait on xco_ticker_event(&t), re-park to see further ticks ... 931 * xco_ticker_deinit(&t); // cancels the in-flight timer 932 * 933 * xco_event_poll never reports ready; subscribers wait for the *next* tick. */ 934 typedef struct xco_ticker { 935 xco_timer_t timer; 936 xco_timers_t *src; 937 uint64_t period; 938 xco_event_t base; 939 xco_waiter_t *waiters; 940 xco_waiter_t bridge; /* internal: parks on xco_timer_event */ 941 } xco_ticker_t; 942 943 extern const xco_event_vtable_t xco__ticker_vt; 944 945 void xco_ticker_init (xco_ticker_t *t, xco_timers_t *ts, 946 uint64_t period, uint64_t first_deadline); 947 void xco_ticker_deinit(xco_ticker_t *t); 948 static inline xco_event_t *xco_ticker_event(xco_ticker_t *t) { return &t->base; } 949 950 /* ---- Task ------------------------------------------------------------- */ 951 952 /* Lifecycle handle for a running xco_step. Bundles a done latch (fires when 953 * the xco_step returns, payload = its return value) with a cancel latch 954 * (the canonical signal to ask the xco_step to wind down). The xco_step itself 955 * is caller-allocated; the task holds a pointer to it. 956 * 957 * Who fires done: 958 * - Hand-coded state machine: call xco_task_done(t, ret) in the same arm 959 * that returns XCO_STEP_DEAD. 960 * - xco-backed task (see xco_cotask_t below): the xco_trampoline calls 961 * xco_task_done automatically with the coroutine's return value. 962 * 963 * Cooperation: cancellation only notifies — the xco_step is responsible for 964 * draining what it owns and reaching XCO_STEP_DEAD. The task's cancel is a 965 * normal xco_cancel_t, so the xco_step typically composes xco_wait_or_cancel against 966 * it on every blocking await. 967 * 968 * Joining: callers wait on xco_task_done_event with the standard event API 969 * (try / park, or compose into select / xco_wait_or_cancel). On fire the 970 * latch's payload is the xco_step's return value. */ 971 972 typedef struct xco_task { 973 xco_mach_t *mach; 974 xco_latch_t done; 975 xco_cancel_t cancel; 976 } xco_task_t; 977 978 static inline void xco_task_init(xco_task_t *t, xco_mach_t *mach) { 979 t->mach = mach; 980 xco_latch_init(&t->done); 981 xco_cancel_init(&t->cancel); 982 } 983 984 /* Mark the task complete with its return value. Idempotent (xco_latch_set is). */ 985 static inline void xco_task_done(xco_task_t *t, uintptr_t value) { 986 xco_latch_set(&t->done, value); 987 } 988 989 static inline xco_event_t *xco_task_done_event (xco_task_t *t) { return &t->done.base; } 990 static inline xco_cancel_t *xco_task_cancel (xco_task_t *t) { return &t->cancel; } 991 static inline bool xco_task_finished (const xco_task_t *t) { return t->done.set; } 992 static inline bool xco_task_is_cancelled(const xco_task_t *t) { return xco_cancel_is_set(&t->cancel); } 993 static inline xco_mach_t *xco_task_mach (xco_task_t *t) { return t->mach; } 994 995 /* ---- Task group ------------------------------------------------------- */ 996 997 /* Fan-in join + fan-out cancel for a dynamic set of tasks. Caller 998 * provides storage for each per-attachment record (xco_group_attach_t), so 999 * the group itself does no allocation. 1000 * 1001 * xco_task_group_attach(g, t, slot): 1002 * xco_countdown_add(g->pending, 1); slot's bridge waiter parks on 1003 * xco_task_done_event(t); slot is appended to g's list. When the task's 1004 * done fires, the bridge fires: it splices the slot out of g's list 1005 * and calls xco_countdown_done(&g->pending). Re-attaching a finished 1006 * task is UB. 1007 * 1008 * xco_task_group_cancel(g): walks the attachment list and xco_cancel_set's 1009 * each &slot->task->cancel, then xco_cancel_set's g->cancel. Bodies that 1010 * compose xco_wait_or_cancel against xco_task_cancel(t) wind down cooperatively; 1011 * meanwhile, anything waiting on g->cancel observes the group-level 1012 * signal directly. 1013 * 1014 * xco_task_group_join_event(g): fires when every attached task has reached 1015 * xco_task_done. Compose with select / xco_wait_or_cancel like any event. */ 1016 1017 typedef struct xco_group_attach xco_group_attach_t; 1018 1019 typedef struct xco_task_group { 1020 xco_countdown_t pending; 1021 xco_cancel_t cancel; 1022 xco_group_attach_t *head, *tail; 1023 } xco_task_group_t; 1024 1025 struct xco_group_attach { 1026 xco_waiter_t bridge; /* parked on xco_task_done_event(task) */ 1027 xco_task_t *task; 1028 xco_task_group_t *group; 1029 xco_group_attach_t *next, *prev; 1030 }; 1031 1032 void xco_task_group_init (xco_task_group_t *g); 1033 void xco_task_group_attach (xco_task_group_t *g, xco_task_t *t, 1034 xco_group_attach_t *slot); 1035 void xco_task_group_cancel (xco_task_group_t *g); 1036 static inline xco_event_t *xco_task_group_join_event (xco_task_group_t *g) { 1037 return xco_countdown_event(&g->pending); 1038 } 1039 static inline xco_cancel_t *xco_task_group_cancel_handle(xco_task_group_t *g) { 1040 return &g->cancel; 1041 } 1042 1043 /* ==================================================================== 1044 * xco — stack-switching coroutines. 1045 * 1046 * Teardown: this layer does not unwind a suspended coroutine's stack. 1047 * Drive a coroutine to return (e.g. by passing a cancel sentinel it 1048 * is expected to handle) before freeing its stack memory. 1049 * ==================================================================== */ 1050 1051 /* Coroutine entry point. The argument is the value supplied to the 1052 * first xco_step on this xco. The return value is delivered to the resumer 1053 * as the final xco_step_result, with status XCO_STEP_DEAD. */ 1054 typedef uintptr_t (*xco_fn)(uintptr_t arg); 1055 1056 /* Coroutine control block. Allocate anywhere — on a stack, in a 1057 * struct, on the heap. xco_mach_t base is first so xco_coro_t * can be passed 1058 * directly to xco_step() or any generic xco_mach_t * consumer. The trailing 1059 * priv storage holds the saved register context and bookkeeping; 1060 * its contents are private to the implementation. */ 1061 typedef struct xco_coro { 1062 xco_mach_t base; 1063 _Alignas(XCO_ALIGN) unsigned char priv[XCO_SIZE]; 1064 } xco_coro_t; 1065 1066 /* Initialize *c to run fn on [stack_base, stack_base + stack_len). 1067 * stack_base must be XCO_STACK_ALIGN-aligned; the runtime picks the 1068 * starting SP based on the architecture's stack growth direction. 1069 * Status after init is XCO_STEP_INIT. */ 1070 void xco_init(xco_coro_t *c, xco_fn fn, 1071 void *stack_base, size_t stack_len); 1072 1073 /* Suspend the currently running coroutine, returning value to its 1074 * resumer. Returns the value passed by the next resume. Undefined if 1075 * called outside a coroutine. */ 1076 uintptr_t xco_suspend(uintptr_t value); 1077 1078 /* The currently running coroutine, or NULL if the caller is not in 1079 * one. The runtime maintains this for xco_suspend. */ 1080 xco_coro_t *xco_self(void); 1081 1082 /* Resuming a coroutine is just driving its xco_step: callers use 1083 * xco_step(&c->base, v) directly. Reading status without resuming is 1084 * xco_mach_status(&c->base). The xco layer adds no separate vocabulary 1085 * for these — that's the unification with hand-coded state machines. 1086 * Resuming a coroutine that is not XCO_STEP_INIT or XCO_STEP_SUSPENDED is 1087 * undefined. */ 1088 1089 /* Convenience: init then first-step in one call. */ 1090 static inline xco_step_result_t xco_spawn(xco_coro_t *c, xco_fn fn, 1091 void *stack_base, size_t stack_len, 1092 uintptr_t arg) { 1093 xco_init(c, fn, stack_base, stack_len); 1094 return xco_step(&c->base, arg); 1095 } 1096 1097 /* Cooperative yield. Enqueues self on rt's ready queue and suspends; 1098 * the next xco_rt_run pass resumes us. Useful for fairness when a coroutine 1099 * wants to give other ready work a turn between long-running steps. 1100 * Must be called from inside a coroutine driven by rt. */ 1101 static inline void xco_yield(xco_runtime_t *rt) { 1102 xco_coro_t *self = xco_self(); 1103 assert(self != NULL); 1104 xco_waker_t sw; 1105 xco_waker_init(&sw, rt, &self->base); 1106 xco_rt_enqueue(rt, &sw.base); 1107 xco_suspend(0); 1108 } 1109 1110 /* Await an event from inside a coroutine. The standard poll-suspend 1111 * dance, in one call. Returns the event's value (delivered by fire on 1112 * the slow path, by the inline poll on the fast path). Must be called 1113 * from inside a coroutine driven by rt. */ 1114 static inline uintptr_t xco_await(xco_runtime_t *rt, xco_event_t *e) { 1115 xco_coro_t *self = xco_self(); 1116 assert(self != NULL); 1117 uintptr_t v; 1118 xco_waker_t sw; 1119 xco_waker_init(&sw, rt, &self->base); 1120 if (xco_event_poll(e, &v, &sw.base)) return v; 1121 xco_suspend(0); 1122 (void)xco_event_poll(e, &v, NULL); 1123 return v; 1124 } 1125 1126 /* Await ev or be cancelled by c. Returns true if ev fired (its payload 1127 * is written to *out, which may be NULL); false if cancelled. The 1128 * internal select_event is always cleaned up before return. 1129 * 1130 * The canonical shape for cooperative work inside a task body — pair 1131 * with xco_task_cancel(self) on every blocking await. */ 1132 static inline bool xco_await_or_cancel(xco_runtime_t *rt, xco_event_t *ev, 1133 xco_cancel_t *c, uintptr_t *out) { 1134 xco_select_event_t sel; 1135 xco_select_input_t inputs[2]; 1136 xco_wait_or_cancel(&sel, inputs, ev, c); 1137 uintptr_t winner = xco_await(rt, &sel.done.base); 1138 bool ok = (winner == XCO_WAIT_OK); 1139 if (ok && out) *out = inputs[XCO_WAIT_OK].value; 1140 xco_select_event_deinit(&sel); 1141 return ok; 1142 } 1143 1144 /* ---- xco-backed task ------------------------------------------------- */ 1145 1146 /* xco specialization of xco_task_t. Bundles the user-visible task handle 1147 * with the xco that runs it; the xco_trampoline calls fn(&xt->task, arg) 1148 * and then xco_task_done with its return value, so xco_wait_or_cancel-style 1149 * teardown works without the user wiring anything. 1150 * 1151 * The xco_trampoline recovers xt from xco_self() at first entry (container_of 1152 * on the embedded co), so the first-resume uintptr_t is preserved as 1153 * fn's arg under normal xco semantics. Subsequent resumes pass values 1154 * to the coroutine in the usual way. 1155 * 1156 * Storage shape: caller allocates xco_cotask_t and a stack. The xco_task_t 1157 * inside is the handle to wait/cancel on; cancel via xco_cancel_set on 1158 * &xt->task.cancel and the body is expected to observe it (typically 1159 * by composing xco_wait_or_cancel against xco_task_cancel(&xt->task)). */ 1160 1161 typedef uintptr_t (*xco_cotask_fn)(xco_task_t *t, uintptr_t arg); 1162 1163 typedef struct xco_cotask { 1164 xco_task_t task; 1165 xco_coro_t co; 1166 xco_cotask_fn fn; 1167 } xco_cotask_t; 1168 1169 /* Initialize xt to run fn on the given stack. After this the embedded 1170 * xco is XCO_STEP_INIT; drive it with xco_step(&xt->co.base, v) or use 1171 * xco_cotask_spawn for the init-and-first-step convenience. */ 1172 void xco_cotask_init(xco_cotask_t *xt, xco_cotask_fn fn, 1173 void *stack_base, size_t stack_len); 1174 1175 /* Convenience: init then first-step in one call. arg is delivered as 1176 * fn's argument. */ 1177 static inline xco_step_result_t xco_cotask_spawn(xco_cotask_t *xt, xco_cotask_fn fn, 1178 void *stack_base, size_t stack_len, 1179 uintptr_t arg) { 1180 xco_cotask_init(xt, fn, stack_base, stack_len); 1181 return xco_step(&xt->co.base, arg); 1182 } 1183 1184 /* ==================================================================== 1185 * xco_op — generic effect/IO layer. 1186 * 1187 * Coroutines submit ops describing work to do; the runtime accumulates 1188 * them on a pending list; after xco_rt_run quiesces, the host pulls the 1189 * batch via xco_rt_take_ops, executes them however it likes (io_uring, 1190 * threads, mocks, replay), and injects completions back via 1191 * xco_op_complete. The xco library itself reads no clocks and makes no 1192 * syscalls — that property extends to IO via this layer. 1193 * 1194 * An op is just an event with a side-channel describing what to do. 1195 * Embed xco_op_t as the first member of a kind-specific payload struct; 1196 * the host pattern-matches on `kind` (an open tag space — the runtime 1197 * never inspects it). 1198 * 1199 * State machine: 1200 * PENDING op->epoch == op->rt->op_epoch, done unset (on rt's list) 1201 * IN_FLIGHT op->epoch != op->rt->op_epoch, done unset (host has taken) 1202 * RESOLVED done.set (terminal) 1203 * 1204 * The epoch counter on the runtime is bumped each xco_rt_take_ops, so 1205 * "still on the pending list" is a generation match — submit + take are 1206 * both O(1) and the runtime never walks the batch. 1207 * 1208 * Threading: single-threaded, same as the rest of xco. submit, cancel, 1209 * and complete must all be called on the runtime's thread. 1210 * ==================================================================== */ 1211 1212 typedef enum { 1213 XCO_OP_PENDING, /* not used as a fire payload, but conceptual */ 1214 XCO_OP_COMPLETED, /* host finished; result lives in embedder fields */ 1215 XCO_OP_CANCELLED, /* resolved without a real result */ 1216 } xco_op_status_t; 1217 1218 struct xco_op { 1219 xco_latch_t done; /* fires once, payload = status */ 1220 xco_op_t *next, *prev; /* intrusive on rt's pending list */ 1221 xco_runtime_t *rt; /* set on submit; stays set after take */ 1222 uint64_t epoch; /* matches rt->op_epoch iff PENDING */ 1223 uint32_t kind; /* open tag space; host pattern-matches */ 1224 bool cancel_requested; /* advisory to host post-take */ 1225 }; 1226 1227 /* True iff op is still on rt's current pending batch (i.e. PENDING). 1228 * False for IN_FLIGHT or RESOLVED. O(1). */ 1229 static inline bool xco_op_is_pending(const xco_op_t *op) { 1230 return op->rt && op->epoch == op->rt->op_epoch && !op->done.set; 1231 } 1232 1233 /* Submit op to rt's pending list. Initializes done, clears 1234 * cancel_requested, and sets op->rt = rt. The caller pre-populates op->kind 1235 * and any embedder fields (fd, buf, len, ...) before submit. After submit 1236 * the awaiter typically waits on &op->done.base — composes with select, 1237 * xco_wait_or_cancel, etc. */ 1238 void xco_op_submit (xco_runtime_t *rt, xco_op_t *op); 1239 1240 /* Request cancellation. Behavior depends on state: 1241 * PENDING splice from rt list, fire done(CANCELLED). 1242 * IN_FLIGHT set cancel_requested=true (advisory; host decides). 1243 * RESOLVED no-op. 1244 * Idempotent. */ 1245 void xco_op_cancel (xco_op_t *op); 1246 1247 /* Host's final word for an op it took via xco_rt_take_ops. Fires done 1248 * with the given status. Idempotent (the latch is). */ 1249 void xco_op_complete(xco_op_t *op, xco_op_status_t status); 1250 1251 /* Detach the runtime's pending ops list and return its head. Bumps the 1252 * runtime's op_epoch, transitioning the whole batch to IN_FLIGHT in O(1) 1253 * (no walk — each op's stored epoch now no longer matches). The host 1254 * owns the returned list — the next/prev fields are the host's to reuse 1255 * however it wants for in-flight tracking. Returns NULL if the list is 1256 * empty. 1257 * 1258 * If tail_out is non-NULL, *tail_out receives the list's tail (or NULL 1259 * for an empty list) — handy for splicing the batch onto a host-side 1260 * in-flight list in O(1) without walking. */ 1261 xco_op_t *xco_rt_take_ops(xco_runtime_t *rt, xco_op_t **tail_out); 1262 1263 #endif /* XCO_H */