xco.c (44826B)
1 /* 2 * xco.c — implementation for xco.h. 3 * 4 * Two parts: 5 * - Event substrate (runtime, latch, semaphore, select/allof, queue 6 * (chan is a typedef alias for the cap=0+BLOCK case), broadcast, 7 * notify, timer, pairing heap, timeout, ticker, task group). Each 8 * event type is a small struct with a static vtable. 9 * Waitlists are intrusive linked lists; fire-all detaches the whole 10 * list before iterating so callbacks can do anything, including 11 * re-park or unpark sibling waiters, without iterator hazards. 12 * 13 * - Stack-switching coroutines (xco). The platform layer 14 * (arch/<name>/xco_arch.c) supplies the register save/restore 15 * primitive (xco_platform_switch) and the initial-context setup 16 * (xco_platform_init). Everything else — the state machine, the 17 * resume chain, the xco_trampoline wrapping user fn entry/exit, and 18 * the xco_self() TLS pointer — lives here. 19 * 20 * This translation unit is architecture-neutral. The platform context 21 * type is forward-declared (in xco_platform.h) and only ever referred 22 * to by pointer, so its actual size and layout never cross into this 23 * file. We reserve raw space for it inside xco_impl_t using 24 * XCO__CTX_SIZE / XCO__CTX_ALIGN from the arch's xco_arch.h, and cast 25 * to xco_platform_ctx_t * when calling into the platform layer. 26 */ 27 28 #include "xco.h" 29 #include "xco_platform_internal.h" 30 31 #include <assert.h> 32 #include <stddef.h> 33 #include <stdint.h> 34 35 /* ==================================================================== 36 * Runtime 37 * ==================================================================== */ 38 39 /* xco_rt_init and xco_rt_enqueue are defined inline in xco.h. */ 40 41 static xco_waiter_t *xco_rt_dequeue(xco_runtime_t *rt) { 42 xco_waiter_t *w = rt->head; 43 if (!w) return NULL; 44 rt->head = w->next; 45 if (!rt->head) rt->tail = NULL; 46 /* Hand the waiter back fully detached. Every fire path already clears 47 * prev before firing (and we just walked off the ready-queue), so 48 * w->prev is NULL in practice — making it explicit here means 49 * waker users can re-park without re-init, regardless of which 50 * fire path resumed them. */ 51 w->next = NULL; 52 w->prev = NULL; 53 return w; 54 } 55 56 void xco_rt_run(xco_runtime_t *rt, uint64_t now) { 57 /* The runtime ready queue holds only wakers. Other waiter 58 * shapes (e.g. select_input) fire synchronously inside event 59 * notify paths and never reach here. 60 * 61 * If a timer source is attached, advance it at the top of each 62 * iteration: any timer whose deadline <= now fires, enqueueing its 63 * wakers, which the inner loop then drains. A step may insert 64 * a fresh already-expired timer; the outer loop catches it on the 65 * next pass. Termination: each pass either drains a non-empty 66 * queue or exits, and advance only fires timers it then removes, 67 * so total work is bounded. */ 68 for (;;) { 69 if (rt->timers) xco_timers_advance(rt->timers, now); 70 if (!rt->head) return; 71 for (xco_waiter_t *w; (w = xco_rt_dequeue(rt));) { 72 xco_waker_t *sw = (xco_waker_t *)w; 73 xco_step(sw->mach, sw->resume_value); 74 } 75 } 76 } 77 78 /* ---- Waker ------------------------------------------------------------ */ 79 80 /* Exposed (with leading underscore) so the inline xco_waker_init in 81 * xco.h can install it without dragging the body into the header. */ 82 void xco__waker_fire(xco_waiter_t *w, uintptr_t value) { 83 xco_waker_t *sw = (xco_waker_t *)w; 84 sw->resume_value = value; 85 xco_rt_enqueue(sw->rt, w); 86 } 87 88 /* ==================================================================== 89 * Latch 90 * ==================================================================== */ 91 92 static bool xco_latch_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { 93 xco_latch_t *l = (xco_latch_t *)e; 94 if (l->set) { 95 if (out) *out = l->value; 96 return true; 97 } 98 if (!w) return false; 99 /* One-waiter invariant: w must arrive clean. Detach paths (xco_latch_set 100 * iterator, xco_latch_unpark, xco_select_event_deinit) all leave waiters in 101 * this state. A trip here means a double-park bug. */ 102 assert(!w->prev && !w->next); 103 w->next = l->waiters; 104 if (l->waiters) l->waiters->prev = w; 105 l->waiters = w; 106 return false; 107 } 108 109 static void xco_latch_unpark(xco_event_t *e, xco_waiter_t *w) { 110 xco_latch_t *l = (xco_latch_t *)e; 111 /* Detect "not on this list" via the invariant maintained by park 112 * and the detach paths: a parked waiter has prev set OR is the 113 * head; a detached one has prev == NULL and is not the head. */ 114 if (!w->prev && l->waiters != w) return; 115 if (w->prev) w->prev->next = w->next; 116 else l->waiters = w->next; 117 if (w->next) w->next->prev = w->prev; 118 w->prev = w->next = NULL; 119 } 120 121 /* Exposed so the inline xco_latch_init in xco.h can reference it. */ 122 const xco_event_vtable_t xco__latch_vt = { 123 .poll = xco_latch_poll, 124 .unpark = xco_latch_unpark, 125 }; 126 127 void xco_latch_set(xco_latch_t *l, uintptr_t value) { 128 if (l->set) return; 129 l->set = true; 130 l->value = value; 131 132 /* Detach the whole waitlist before firing. A waiter's fire callback 133 * might do anything (including unpark a sibling on another event), 134 * but it cannot mutate this list — it's already gone. */ 135 xco_waiter_t *w = l->waiters; 136 l->waiters = NULL; 137 while (w) { 138 xco_waiter_t *next = w->next; /* save before xco_waiter_fire clears */ 139 xco_waiter_fire(w, value); 140 w = next; 141 } 142 } 143 144 /* ==================================================================== 145 * Semaphore 146 * 147 * FIFO doubly-linked waitlist, same shape as the chan_q_* helpers below 148 * but specialized to a xco_semaphore_t (so we don't have to thread the 149 * head/tail pair through chan_q_*). 150 * ==================================================================== */ 151 152 static void xco_sem_q_push(xco_semaphore_t *s, xco_waiter_t *w) { 153 assert(!w->prev && !w->next); 154 w->prev = s->tail; 155 w->next = NULL; 156 if (s->tail) s->tail->next = w; 157 else s->head = w; 158 s->tail = w; 159 } 160 161 static xco_waiter_t *xco_sem_q_pop(xco_semaphore_t *s) { 162 xco_waiter_t *w = s->head; 163 if (!w) return NULL; 164 s->head = w->next; 165 if (s->head) s->head->prev = NULL; 166 else s->tail = NULL; 167 w->prev = w->next = NULL; 168 return w; 169 } 170 171 static void xco_sem_q_remove(xco_semaphore_t *s, xco_waiter_t *w) { 172 if (!w->prev && s->head != w) return; 173 if (w->prev) w->prev->next = w->next; 174 else s->head = w->next; 175 if (w->next) w->next->prev = w->prev; 176 else s->tail = w->prev; 177 w->prev = w->next = NULL; 178 } 179 180 static bool xco_semaphore_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { 181 xco_semaphore_t *s = (xco_semaphore_t *)e; 182 if (s->permits > 0) { 183 s->permits--; 184 if (out) *out = 1; 185 return true; 186 } 187 if (!w) return false; 188 xco_sem_q_push(s, w); 189 return false; 190 } 191 192 static void xco_semaphore_unpark(xco_event_t *e, xco_waiter_t *w) { 193 xco_semaphore_t *s = (xco_semaphore_t *)e; 194 xco_sem_q_remove(s, w); 195 } 196 197 const xco_event_vtable_t xco__semaphore_acquire_vt = { 198 .poll = xco_semaphore_poll, 199 .unpark = xco_semaphore_unpark, 200 }; 201 202 void xco_semaphore_release(xco_semaphore_t *s, size_t n) { 203 /* Hand a permit directly to each FIFO waiter, then drop any leftover 204 * into the count. Direct handoff prevents a fresh try from jumping 205 * the queue: an arriving acquirer that called try_ would see permits=0 206 * and park behind the existing waiters until everyone ahead has been 207 * served. */ 208 while (n > 0) { 209 xco_waiter_t *w = xco_sem_q_pop(s); 210 if (!w) break; 211 n--; 212 /* Fire value is conventional 1 — "you got a permit". Step-waiter 213 * users ignore the value; select inputs capture it as the input's 214 * value field. */ 215 xco_waiter_fire(w, 1); 216 } 217 s->permits += n; 218 } 219 220 /* ==================================================================== 221 * Select / all-of 222 * ==================================================================== */ 223 224 /* One fire callback serves both modes. A counter `remaining` is decremented 225 * on each fire; done is set when it hits 0. select inits remaining=1 (any 226 * one fire closes); allof inits remaining=n (every input must fire). The 227 * disarm-siblings loop is a no-op for already-fired waiters, so it runs 228 * uniformly: for select it cleans up still-parked losers, for allof it 229 * does nothing (every sibling is already detached by its source). */ 230 static void xco_select_input_fire(xco_waiter_t *w, uintptr_t value) { 231 xco_select_input_t *in = (xco_select_input_t *)w; 232 xco_select_event_t *s = in->parent; 233 234 /* Defensive: guard against any straggler that escaped disarm. */ 235 if (s->done.set) return; 236 /* Capture the input's payload before resuming anyone. Sticky 237 * sources also keep it on themselves; transient sources (channels) 238 * deliver only here, so this is the only durable record. */ 239 in->value = value; 240 if (--s->remaining > 0) return; 241 242 size_t i = (size_t)(in - s->inputs); 243 /* Disarm anyone still parked so their waiters don't dangle on input 244 * waitlists past s's lifetime. Idempotent on already-detached waiters. */ 245 for (size_t j = 0; j < s->n; j++) { 246 if (j != i) xco_event_unpark(s->inputs[j].src, &s->inputs[j].w); 247 } 248 xco_latch_set(&s->done, i); 249 } 250 251 void xco_select_event_init(xco_select_event_t *s, 252 xco_select_input_t *inputs, size_t n, 253 xco_event_t *const *srcs) { 254 xco_latch_init(&s->done); 255 s->inputs = inputs; 256 s->n = n; 257 s->remaining = 1; /* any one fire closes the wait */ 258 259 /* Fast path: an input already ready. Fire and skip parking entirely 260 * so deinit has nothing to disarm. */ 261 for (size_t i = 0; i < n; i++) { 262 uintptr_t v; 263 if (xco_event_poll(srcs[i], &v, NULL)) { 264 inputs[i].value = v; /* captured for inputs[winner].value */ 265 xco_latch_set(&s->done, i); 266 return; 267 } 268 } 269 270 for (size_t i = 0; i < n; i++) { 271 inputs[i].w.next = NULL; 272 inputs[i].w.prev = NULL; 273 inputs[i].w.fire = xco_select_input_fire; 274 inputs[i].src = srcs[i]; 275 inputs[i].parent = s; 276 inputs[i].value = 0; 277 (void)xco_event_poll(srcs[i], NULL, &inputs[i].w); 278 } 279 } 280 281 void xco_allof_event_init(xco_select_event_t *s, 282 xco_select_input_t *inputs, size_t n, 283 xco_event_t *const *srcs) { 284 xco_latch_init(&s->done); 285 s->inputs = inputs; 286 s->n = n; 287 s->remaining = n; /* every input must fire to close */ 288 289 if (n == 0) { xco_latch_set(&s->done, 0); return; } 290 291 /* Initialize each input then poll. Fused: an already-ready input is 292 * consumed inline (value captured, remaining--, no parking); the 293 * rest end up parked by the same call. If everyone was inline, fire 294 * done at the end. */ 295 for (size_t i = 0; i < n; i++) { 296 inputs[i].w.next = NULL; 297 inputs[i].w.prev = NULL; 298 inputs[i].w.fire = xco_select_input_fire; 299 inputs[i].src = srcs[i]; 300 inputs[i].parent = s; 301 inputs[i].value = 0; 302 303 uintptr_t v; 304 if (xco_event_poll(srcs[i], &v, &inputs[i].w)) { 305 inputs[i].value = v; 306 s->remaining--; 307 } 308 } 309 310 /* All inline-ready: fire done with the last input's index, matching 311 * the "closing index" semantics of the parked path. */ 312 if (s->remaining == 0) xco_latch_set(&s->done, n - 1); 313 } 314 315 void xco_select_event_deinit(xco_select_event_t *s) { 316 /* done.set => the closing fire already disarmed everyone (or the 317 * fast path skipped parking entirely). Otherwise — possible after a 318 * partial allof — some inputs may still be parked; unpark is 319 * idempotent for already-detached waiters. */ 320 if (s->done.set) return; 321 for (size_t i = 0; i < s->n; i++) { 322 xco_event_unpark(s->inputs[i].src, &s->inputs[i].w); 323 } 324 } 325 326 /* ==================================================================== 327 * FIFO waitlist helpers (shared by queue and notify) 328 * 329 * Doubly-linked FIFO push/pop. Same shape as latch's list operations 330 * but with an explicit tail so arrival order is preserved (unlike 331 * latch, where waiter order is irrelevant). The xco_chan_q_* names are 332 * historical — the chan code that originally used them is now folded 333 * into queue (chan = queue at cap=0 + BLOCK). 334 * ==================================================================== */ 335 336 static void xco_chan_q_push(xco_waiter_t **head, xco_waiter_t **tail, xco_waiter_t *w) { 337 assert(!w->prev && !w->next); 338 w->prev = *tail; 339 w->next = NULL; 340 if (*tail) (*tail)->next = w; 341 else *head = w; 342 *tail = w; 343 } 344 345 static xco_waiter_t *xco_chan_q_pop(xco_waiter_t **head, xco_waiter_t **tail) { 346 xco_waiter_t *w = *head; 347 if (!w) return NULL; 348 *head = w->next; 349 if (*head) (*head)->prev = NULL; 350 else *tail = NULL; 351 w->prev = w->next = NULL; 352 return w; 353 } 354 355 static void xco_chan_q_remove(xco_waiter_t **head, xco_waiter_t **tail, xco_waiter_t *w) { 356 /* Same not-on-list test as xco_latch_unpark: a queued waiter has prev 357 * set OR is the head; a detached one has prev == NULL and isn't 358 * the head. */ 359 if (!w->prev && *head != w) return; 360 if (w->prev) w->prev->next = w->next; 361 else *head = w->next; 362 if (w->next) w->next->prev = w->prev; 363 else *tail = w->prev; 364 w->prev = w->next = NULL; 365 } 366 367 368 /* ==================================================================== 369 * Queue 370 * 371 * The FIFO list helpers (xco_chan_q_push/pop/remove) are reused for the 372 * queue's send and recv waitlists — same shape, same invariants. The 373 * ring buffer lives in caller-provided storage; we just track head and 374 * len. cap == 0 leaves the buffer logic dormant: every send either 375 * direct-hands or parks, every recv either takes from a parked sender 376 * or parks — i.e. it degenerates to chan rendezvous. 377 * ==================================================================== */ 378 379 static inline xco_queue_t *xco_queue_of_recv(xco_event_t *e) { 380 return (xco_queue_t *)((char *)e - offsetof(xco_queue_t, recv)); 381 } 382 383 static inline void xco_queue_push_buf(xco_queue_t *q, uintptr_t v) { 384 assert(q->len < q->cap); 385 q->buf[(q->head + q->len) % q->cap] = v; 386 q->len++; 387 } 388 389 static inline uintptr_t xco_queue_pop_buf(xco_queue_t *q) { 390 assert(q->len > 0); 391 uintptr_t v = q->buf[q->head]; 392 q->head = (q->head + 1) % q->cap; 393 q->len--; 394 return v; 395 } 396 397 /* Pop one parked sender's value into the now-free buffer slot, firing 398 * the sender. Caller must have ensured a free slot exists (just popped 399 * from the buffer, or cap > len). Maintains FIFO across the buffer + 400 * sender-waitlist boundary: oldest buffered values come out before any 401 * sender's value (which was queued later). No-op if no sender parked. */ 402 static void xco_queue_drain_one_sender(xco_queue_t *q) { 403 if (!q->send_head) return; 404 xco_waiter_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail); 405 xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w; 406 xco_queue_push_buf(q, qsw->value); 407 qsw->delivered = true; 408 /* Fire after pushing so the sender sees its delivery as complete. */ 409 xco_waiter_fire(w, 0); 410 } 411 412 static bool xco_queue_recv_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { 413 xco_queue_t *q = xco_queue_of_recv(e); 414 if (q->len > 0) { 415 uintptr_t v = xco_queue_pop_buf(q); 416 if (out) *out = v; 417 xco_queue_drain_one_sender(q); 418 return true; 419 } 420 /* Empty buffer. If a sender is parked here it can only mean cap==0 421 * (otherwise the sender would have used the buffer). Hand directly. */ 422 if (q->send_head) { 423 xco_waiter_t *sender = xco_chan_q_pop(&q->send_head, &q->send_tail); 424 xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)sender; 425 if (out) *out = qsw->value; 426 qsw->delivered = true; 427 xco_waiter_fire(sender, 0); 428 return true; 429 } 430 /* Closed and drained: receivers learn EOF via xco_queue_recv; out is 431 * undefined. */ 432 if (q->closed) { 433 if (out) *out = 0; 434 return true; 435 } 436 if (!w) return false; 437 xco_chan_q_push(&q->recv_head, &q->recv_tail, w); 438 return false; 439 } 440 441 static void xco_queue_recv_unpark(xco_event_t *e, xco_waiter_t *w) { 442 xco_queue_t *q = xco_queue_of_recv(e); 443 xco_chan_q_remove(&q->recv_head, &q->recv_tail, w); 444 } 445 446 const xco_event_vtable_t xco__queue_recv_vt = { 447 .poll = xco_queue_recv_poll, 448 .unpark = xco_queue_recv_unpark, 449 }; 450 451 xco_queue_send_status_t xco_queue_send_poll(xco_queue_t *q, uintptr_t value, 452 xco_queue_send_waiter_t *qsw) { 453 /* Closed is closed regardless of policy: caller learns the truth. */ 454 if (q->closed) return XCO_QSEND_CLOSED; 455 456 /* Direct handoff first: parked receivers always win over the buffer. 457 * This is the rendezvous case and the cap==0 case. */ 458 xco_waiter_t *w = xco_chan_q_pop(&q->recv_head, &q->recv_tail); 459 if (w) { 460 xco_waiter_fire(w, value); 461 return XCO_QSEND_ACCEPTED; 462 } 463 if (q->len < q->cap) { 464 xco_queue_push_buf(q, value); 465 return XCO_QSEND_ACCEPTED; 466 } 467 /* Buffer full and no waiting receiver. */ 468 switch (q->policy) { 469 case XCO_QUEUE_BLOCK: 470 if (!qsw) return XCO_QSEND_BLOCKED; 471 qsw->value = value; 472 xco_chan_q_push(&q->send_head, &q->send_tail, &qsw->sw.base); 473 return XCO_QSEND_BLOCKED; 474 case XCO_QUEUE_DROP_NEWEST: 475 return XCO_QSEND_ACCEPTED; 476 case XCO_QUEUE_DROP_OLDEST: 477 (void)xco_queue_pop_buf(q); 478 xco_queue_push_buf(q, value); 479 return XCO_QSEND_ACCEPTED; 480 } 481 __builtin_unreachable(); 482 } 483 484 void xco_queue_send_unpark(xco_queue_t *q, xco_queue_send_waiter_t *qsw) { 485 xco_chan_q_remove(&q->send_head, &q->send_tail, &qsw->sw.base); 486 } 487 488 xco_recv_status_t xco_queue_recv(xco_queue_t *q, uintptr_t *out) { 489 if (q->len > 0) { 490 uintptr_t v = xco_queue_pop_buf(q); 491 if (out) *out = v; 492 xco_queue_drain_one_sender(q); 493 return XCO_RECV_GOT; 494 } 495 if (q->send_head) { 496 xco_waiter_t *w = xco_chan_q_pop(&q->send_head, &q->send_tail); 497 xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w; 498 if (out) *out = qsw->value; 499 qsw->delivered = true; 500 xco_waiter_fire(w, 0); 501 return XCO_RECV_GOT; 502 } 503 if (q->closed) return XCO_RECV_CLOSED; 504 return XCO_RECV_EMPTY; 505 } 506 507 void xco_queue_close(xco_queue_t *q) { 508 if (q->closed) return; 509 q->closed = true; 510 511 /* Drain parked senders with delivered=false. Senders only park 512 * under BLOCK, so this is no-op for DROP_* (their waitlist is 513 * always empty). */ 514 xco_waiter_t *w; 515 while ((w = xco_chan_q_pop(&q->send_head, &q->send_tail)) != NULL) { 516 xco_queue_send_waiter_t *qsw = (xco_queue_send_waiter_t *)w; 517 qsw->delivered = false; 518 xco_waiter_fire(w, 0); 519 } 520 /* Wake parked receivers so they can observe closed via xco_queue_recv. 521 * Receivers may still drain buffered values first — xco_queue_recv's 522 * XCO_RECV_GOT path is hit before the XCO_RECV_CLOSED branch. */ 523 while ((w = xco_chan_q_pop(&q->recv_head, &q->recv_tail)) != NULL) { 524 xco_waiter_fire(w, 0); 525 } 526 } 527 528 /* ---- Queue send op (selectable send) ---------------------------------- */ 529 530 void xco__queue_send_op_fire(xco_waiter_t *w, uintptr_t value) { 531 (void)value; 532 xco_queue_send_op_t *op = (xco_queue_send_op_t *)w; 533 xco_latch_set(&op->done, op->qsw.delivered ? 1 : 0); 534 } 535 536 void xco_queue_send_op_init(xco_queue_send_op_t *op, xco_queue_t *q, uintptr_t value) { 537 xco_queue_send_waiter_init(&op->qsw, NULL, NULL); 538 op->qsw.sw.base.fire = xco__queue_send_op_fire; 539 op->queue = q; 540 xco_latch_init(&op->done); 541 542 switch (xco_queue_send_poll(q, value, &op->qsw)) { 543 case XCO_QSEND_ACCEPTED: 544 op->qsw.delivered = true; 545 xco_latch_set(&op->done, 1); 546 return; 547 case XCO_QSEND_CLOSED: 548 xco_latch_set(&op->done, 0); 549 return; 550 case XCO_QSEND_BLOCKED: 551 /* Only BLOCK + full buffer; parked. */ 552 return; 553 } 554 } 555 556 /* ==================================================================== 557 * Broadcast (slot) 558 * 559 * The waitlist uses the same doubly-linked LIFO shape as latch — there 560 * is no FIFO requirement because publish wakes everyone at once. The 561 * key difference from latch is publish vs set: publish never marks a 562 * sticky bit on the event, so try always returns false (subscribers 563 * wait for the *next* publish), and the waitlist is reusable across 564 * publishes — subscribers re-park to receive subsequent values. 565 * ==================================================================== */ 566 567 static bool xco_broadcast_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { 568 (void)out; 569 /* Transient: never reports ready; just parks if asked. */ 570 if (!w) return false; 571 xco_broadcast_t *b = (xco_broadcast_t *)e; 572 assert(!w->prev && !w->next); 573 w->next = b->waiters; 574 if (b->waiters) b->waiters->prev = w; 575 b->waiters = w; 576 return false; 577 } 578 579 static void xco_broadcast_unpark(xco_event_t *e, xco_waiter_t *w) { 580 xco_broadcast_t *b = (xco_broadcast_t *)e; 581 if (!w->prev && b->waiters != w) return; 582 if (w->prev) w->prev->next = w->next; 583 else b->waiters = w->next; 584 if (w->next) w->next->prev = w->prev; 585 w->prev = w->next = NULL; 586 } 587 588 const xco_event_vtable_t xco__broadcast_vt = { 589 .poll = xco_broadcast_poll, 590 .unpark = xco_broadcast_unpark, 591 }; 592 593 void xco_broadcast_publish(xco_broadcast_t *b, uintptr_t value) { 594 b->has_value = true; 595 b->value = value; 596 597 /* Detach the waitlist before iterating — same hazard-free pattern as 598 * xco_latch_set. A waiter's fire callback may re-park itself on us (the 599 * common case for a re-arming subscriber); decoupling means that 600 * re-park lands on a fresh waitlist, not on the snapshot we're 601 * walking. */ 602 xco_waiter_t *w = b->waiters; 603 b->waiters = NULL; 604 while (w) { 605 xco_waiter_t *next = w->next; /* save before xco_waiter_fire clears */ 606 xco_waiter_fire(w, value); 607 w = next; 608 } 609 } 610 611 /* ==================================================================== 612 * Notify 613 * 614 * Doubly-linked FIFO waitlist (same shape as the chan/queue waitlists). 615 * xco_notify_one fires the head; xco_notify_all detaches the whole list before 616 * iterating so callbacks can re-park onto a fresh waitlist without 617 * iterator hazards (same pattern as xco_latch_set). xco_event_poll never 618 * reports ready: notify is purely transient. 619 * ==================================================================== */ 620 621 static bool xco_notify_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { 622 (void)out; 623 if (!w) return false; 624 xco_notify_t *n = (xco_notify_t *)e; 625 xco_chan_q_push(&n->head, &n->tail, w); 626 return false; 627 } 628 629 static void xco_notify_unpark(xco_event_t *e, xco_waiter_t *w) { 630 xco_notify_t *n = (xco_notify_t *)e; 631 xco_chan_q_remove(&n->head, &n->tail, w); 632 } 633 634 const xco_event_vtable_t xco__notify_vt = { 635 .poll = xco_notify_poll, 636 .unpark = xco_notify_unpark, 637 }; 638 639 void xco_notify_one(xco_notify_t *n) { 640 xco_waiter_t *w = xco_chan_q_pop(&n->head, &n->tail); 641 if (!w) return; 642 xco_waiter_fire(w, 0); 643 } 644 645 void xco_notify_all(xco_notify_t *n) { 646 /* Detach before iterating: re-parking inside fire lands on a fresh 647 * (empty) list. Walk the snapshot via saved next pointers. */ 648 xco_waiter_t *w = n->head; 649 n->head = n->tail = NULL; 650 while (w) { 651 xco_waiter_t *next = w->next; 652 xco_waiter_fire(w, 0); 653 w = next; 654 } 655 } 656 657 /* ==================================================================== 658 * Pairing heap 659 * 660 * Standard intrusive pairing heap. Each node carries three link fields: 661 * 662 * child : head of children sibling list (NULL if leaf). 663 * prev : parent if this node is its parent's first child; 664 * previous sibling otherwise; NULL only for a detached tree 665 * root (including h->root). 666 * next : next sibling, or NULL for the last child / a detached root. 667 * 668 * The "prev points to either parent or a sibling" trick lets us splice 669 * a node out in O(1) without an extra parent pointer: parent->child==n 670 * distinguishes "first child" from "non-first sibling." 671 * 672 * meld picks the smaller-deadline root as winner and grafts the loser 673 * as its new first child. Pop-min and remove both rebuild via the 674 * classic two-pass pairwise merge of the resulting children list. 675 * ==================================================================== */ 676 677 /* Merge two detached subtree roots (each with prev=next=NULL). Returns 678 * the merged root (also detached: prev=next=NULL). */ 679 static xco_timer_t *xco_ph_meld(xco_timer_t *a, xco_timer_t *b) { 680 if (!a) return b; 681 if (!b) return a; 682 xco_timer_t *small, *large; 683 if (a->deadline <= b->deadline) { small = a; large = b; } 684 else { small = b; large = a; } 685 /* Graft `large` as the new first child of `small`. */ 686 large->next = small->child; 687 if (small->child) small->child->prev = large; 688 large->prev = small; /* parent link via prev */ 689 small->child = large; 690 small->prev = NULL; 691 small->next = NULL; 692 return small; 693 } 694 695 /* Two-pass pairwise meld of a children sibling list. Detaches each node 696 * before melding so meld inputs satisfy its prev=next=NULL contract. 697 * The output has prev=next=NULL. */ 698 static xco_timer_t *xco_ph_merge_pairs(xco_timer_t *first) { 699 /* Pass 1: walk the sibling list left-to-right, melding consecutive 700 * pairs. Chain results via `next` (ab)use as a temporary list link. */ 701 xco_timer_t *list = NULL; 702 while (first) { 703 xco_timer_t *a = first; 704 xco_timer_t *b = a->next; 705 xco_timer_t *rest = b ? b->next : NULL; 706 a->prev = a->next = NULL; 707 if (b) { b->prev = b->next = NULL; } 708 xco_timer_t *m = xco_ph_meld(a, b); 709 m->next = list; /* prepend to pass-1 list */ 710 list = m; 711 first = rest; 712 } 713 /* Pass 2: meld the pass-1 list into a single root. */ 714 xco_timer_t *acc = NULL; 715 while (list) { 716 xco_timer_t *nxt = list->next; 717 list->prev = NULL; 718 list->next = NULL; 719 acc = xco_ph_meld(acc, list); 720 list = nxt; 721 } 722 return acc; 723 } 724 725 /* Detach n from the tree (must currently be in the heap). Returns the 726 * (possibly new) main-heap root. n is left fully detached: child still 727 * points to its subtree, but prev/next are NULL — caller decides what 728 * to do with that subtree. */ 729 static xco_timer_t *xco_ph_detach(xco_pairing_heap_t *h, xco_timer_t *n) { 730 if (h->root == n) { 731 /* n is the main root; pop it and rebuild from its children. */ 732 xco_timer_t *new_root = xco_ph_merge_pairs(n->child); 733 n->child = NULL; 734 n->prev = NULL; 735 n->next = NULL; 736 return new_root; 737 } 738 /* n has a parent (recorded via prev — either as its parent's first 739 * child or as some sibling's successor). Splice out of the sibling 740 * list. */ 741 if (n->prev->child == n) { 742 /* First child: parent's child link skips us. */ 743 n->prev->child = n->next; 744 } else { 745 /* Mid/last sibling: previous sibling's next skips us. */ 746 n->prev->next = n->next; 747 } 748 if (n->next) n->next->prev = n->prev; 749 n->prev = NULL; 750 n->next = NULL; 751 /* The main-heap root is unchanged structurally; the caller of this 752 * function decides how (or whether) to reintroduce n's subtree. */ 753 return h->root; 754 } 755 756 static void xco_ph_insert(xco_timers_t *ts, xco_timer_t *t) { 757 xco_pairing_heap_t *h = (xco_pairing_heap_t *)ts; 758 /* Singleton tree (prev/next/child already NULL via xco_timer_init). */ 759 h->root = xco_ph_meld(h->root, t); 760 t->in_heap = true; 761 } 762 763 static void xco_ph_cancel(xco_timers_t *ts, xco_timer_t *t) { 764 xco_pairing_heap_t *h = (xco_pairing_heap_t *)ts; 765 if (!t->in_heap) return; 766 h->root = xco_ph_detach(h, t); 767 /* Now meld t's subtree (its children) back into the main heap. */ 768 xco_timer_t *sub = xco_ph_merge_pairs(t->child); 769 t->child = NULL; 770 h->root = xco_ph_meld(h->root, sub); 771 t->in_heap = false; 772 } 773 774 static void xco_ph_advance(xco_timers_t *ts, uint64_t now) { 775 xco_pairing_heap_t *h = (xco_pairing_heap_t *)ts; 776 /* Pop while the min-key timer is due. Each fire may run callbacks 777 * that insert *new* timers (with later deadlines, normally) — those 778 * land back in the heap, and we keep checking the root. */ 779 while (h->root && h->root->deadline <= now) { 780 xco_timer_t *t = h->root; 781 h->root = xco_ph_merge_pairs(t->child); 782 t->child = NULL; 783 t->prev = NULL; 784 t->next = NULL; 785 t->in_heap = false; 786 /* Trigger the latch: drains the waitlist and delivers the 787 * deadline as the fire payload. */ 788 xco_latch_set(&t->done, (uintptr_t)t->deadline); 789 } 790 } 791 792 static uint64_t xco_ph_peek(const xco_timers_t *ts) { 793 const xco_pairing_heap_t *h = (const xco_pairing_heap_t *)ts; 794 return h->root ? h->root->deadline : UINT64_MAX; 795 } 796 797 const xco_timers_vtable_t xco__pairing_heap_vt = { 798 .insert = xco_ph_insert, 799 .cancel = xco_ph_cancel, 800 .advance = xco_ph_advance, 801 .peek = xco_ph_peek, 802 }; 803 804 /* ==================================================================== 805 * Timeout 806 * ==================================================================== */ 807 808 /* Bridge waiter: parked on the timer's latch, fires the cancel when the 809 * timer fires. Two-step indirection so cancel can already have its own 810 * waiters (e.g. a xco_wait_or_cancel select) without the timer's waitlist 811 * caring about cancel internals. */ 812 static void xco__timeout_bridge_fire(xco_waiter_t *w, uintptr_t value) { 813 (void)value; 814 xco_timeout_t *to = (xco_timeout_t *)((char *)w - offsetof(xco_timeout_t, bridge)); 815 xco_cancel_set(&to->cancel); 816 } 817 818 void xco_timeout_init(xco_timeout_t *to, xco_timers_t *ts, uint64_t deadline) { 819 xco_timer_init(&to->timer, ts, deadline); 820 xco_cancel_init(&to->cancel); 821 to->bridge.next = NULL; 822 to->bridge.prev = NULL; 823 to->bridge.fire = xco__timeout_bridge_fire; 824 /* Park the bridge on the timer. If the timer fires, xco_latch_set 825 * detaches the bridge and calls our fire callback inline. The timer 826 * was just inserted into the heap and hasn't been advanced, so poll 827 * always parks here (return value ignored). */ 828 (void)xco_event_poll(xco_timer_event(&to->timer), NULL, &to->bridge); 829 } 830 831 /* ==================================================================== 832 * Ticker 833 * 834 * The ticker's event surface uses the broadcast-style LIFO doubly-linked 835 * waitlist: subscribers are fired all-at-once on each tick, so order 836 * doesn't matter; doubly-linked gives O(1) unpark for cancellation. 837 * ==================================================================== */ 838 839 static bool xco_ticker_poll(xco_event_t *e, uintptr_t *out, xco_waiter_t *w) { 840 (void)out; 841 /* Transient — never reports ready; just parks if asked. */ 842 if (!w) return false; 843 xco_ticker_t *t = (xco_ticker_t *)((char *)e - offsetof(xco_ticker_t, base)); 844 assert(!w->prev && !w->next); 845 w->next = t->waiters; 846 if (t->waiters) t->waiters->prev = w; 847 t->waiters = w; 848 return false; 849 } 850 851 static void xco_ticker_unpark(xco_event_t *e, xco_waiter_t *w) { 852 xco_ticker_t *t = (xco_ticker_t *)((char *)e - offsetof(xco_ticker_t, base)); 853 if (!w->prev && t->waiters != w) return; 854 if (w->prev) w->prev->next = w->next; 855 else t->waiters = w->next; 856 if (w->next) w->next->prev = w->prev; 857 w->prev = w->next = NULL; 858 } 859 860 const xco_event_vtable_t xco__ticker_vt = { 861 .poll = xco_ticker_poll, 862 .unpark = xco_ticker_unpark, 863 }; 864 865 /* Bridge waiter: parks on the underlying timer's latch. On fire, compute 866 * the next deadline (skip-ahead-safe), reinstall the timer, re-park the 867 * bridge on the new timer, then fire every parked subscriber with the 868 * just-fired deadline. The waitlist is detached before iteration so 869 * subscribers can re-park inside their fire callbacks. */ 870 static void xco__ticker_bridge_fire(xco_waiter_t *w, uintptr_t value) { 871 xco_ticker_t *t = (xco_ticker_t *)((char *)w - offsetof(xco_ticker_t, bridge)); 872 uint64_t fired = (uint64_t)value; 873 uint64_t next = fired + t->period; 874 /* Skip-ahead: in the rare overflow case (period = 0 or wraparound), 875 * step forward enough to keep next > fired. */ 876 if (next <= fired) { 877 next += ((fired - next) / t->period + 1) * t->period; 878 } 879 /* Reinstall the timer for the next tick. The latch's storage is 880 * reused — xco_timer_init runs xco_latch_init on it. */ 881 xco_timer_init(&t->timer, t->src, next); 882 /* Bridge waiter is fully detached (xco_waiter_fire just cleared its 883 * links); park it on the freshly-armed timer (which has not yet been 884 * advanced, so poll parks). */ 885 (void)xco_event_poll(xco_timer_event(&t->timer), NULL, &t->bridge); 886 887 /* Fire the subscribers. Detach the waitlist first so re-park inside 888 * fire lands on the now-empty list. */ 889 xco_waiter_t *waiters = t->waiters; 890 t->waiters = NULL; 891 while (waiters) { 892 xco_waiter_t *nxt = waiters->next; 893 xco_waiter_fire(waiters, (uintptr_t)fired); 894 waiters = nxt; 895 } 896 } 897 898 void xco_ticker_init(xco_ticker_t *t, xco_timers_t *ts, 899 uint64_t period, uint64_t first_deadline) { 900 /* period must be positive — the skip-ahead computation in the bridge 901 * divides by period, and a zero-period ticker would loop forever 902 * inside xco_ph_advance. */ 903 assert(period > 0); 904 t->base.vt = &xco__ticker_vt; 905 t->src = ts; 906 t->period = period; 907 t->waiters = NULL; 908 909 xco_timer_init(&t->timer, ts, first_deadline); 910 911 t->bridge.next = NULL; 912 t->bridge.prev = NULL; 913 t->bridge.fire = xco__ticker_bridge_fire; 914 (void)xco_event_poll(xco_timer_event(&t->timer), NULL, &t->bridge); 915 } 916 917 void xco_ticker_deinit(xco_ticker_t *t) { 918 /* Pull the bridge off the timer (no-op if already fired) and cancel 919 * the timer. Subscribers' waiters are the caller's storage; nothing 920 * to free here. */ 921 xco_event_unpark(xco_timer_event(&t->timer), &t->bridge); 922 xco_timer_deinit(&t->timer); 923 } 924 925 /* ==================================================================== 926 * Task group 927 * 928 * Each attach contributes one to the countdown and parks a bridge waiter 929 * on the task's done event. Bridge fire splices the slot out of the 930 * group's list and decrements the countdown — the join event fires when 931 * the last attached task finishes. 932 * 933 * Cancellation is fan-out: walk the list, set each task's cancel, then 934 * set the group-level cancel. Bodies cooperate by composing their work 935 * with xco_task_cancel(self); the group-level cancel is for non-task 936 * waiters that want to react to "the group has been told to stop." 937 * ==================================================================== */ 938 939 static void xco__task_group_detach_slot(xco_task_group_t *g, xco_group_attach_t *slot) { 940 /* Doubly-linked, head/tail tracked; same shape as other waitlists. */ 941 if (slot->prev) slot->prev->next = slot->next; 942 else g->head = slot->next; 943 if (slot->next) slot->next->prev = slot->prev; 944 else g->tail = slot->prev; 945 slot->prev = slot->next = NULL; 946 } 947 948 static void xco__task_group_bridge_fire(xco_waiter_t *w, uintptr_t value) { 949 (void)value; 950 xco_group_attach_t *slot = (xco_group_attach_t *)((char *)w - offsetof(xco_group_attach_t, bridge)); 951 xco_task_group_t *g = slot->group; 952 xco__task_group_detach_slot(g, slot); 953 xco_countdown_done(&g->pending); 954 } 955 956 void xco_task_group_init(xco_task_group_t *g) { 957 /* Don't go through xco_countdown_init(0) — that fires the latch 958 * immediately, which would make the very first attach's 959 * xco_countdown_add UB. The group's join must remain not-fired until at 960 * least one attached task has finished, so we open with 961 * remaining=0 and an unset latch. The first attach lifts remaining 962 * to 1, and matching countdown_dones bring it back to 0, firing 963 * the latch. */ 964 xco_latch_init(&g->pending.done); 965 g->pending.remaining = 0; 966 xco_cancel_init(&g->cancel); 967 g->head = g->tail = NULL; 968 } 969 970 void xco_task_group_attach(xco_task_group_t *g, xco_task_t *t, xco_group_attach_t *slot) { 971 xco_countdown_add(&g->pending, 1); 972 973 slot->task = t; 974 slot->group = g; 975 976 /* Append to the group's list (FIFO; ordering doesn't affect cancel 977 * fan-out semantics, but consistent with other waitlists in the 978 * codebase). */ 979 slot->prev = g->tail; 980 slot->next = NULL; 981 if (g->tail) g->tail->next = slot; 982 else g->head = slot; 983 g->tail = slot; 984 985 slot->bridge.next = NULL; 986 slot->bridge.prev = NULL; 987 slot->bridge.fire = xco__task_group_bridge_fire; 988 989 /* Park on the task's done event. Re-attaching a finished task is UB 990 * per the contract; the latch's clean-waiter assert inside poll 991 * would catch a double-park. */ 992 (void)xco_event_poll(xco_task_done_event(t), NULL, &slot->bridge); 993 } 994 995 void xco_task_group_cancel(xco_task_group_t *g) { 996 /* Fan-out cancel: signal each attached task. Walk the snapshot 997 * (cancel doesn't detach the slot — only task done does — so the 998 * list is stable across iteration). */ 999 for (xco_group_attach_t *s = g->head; s; s = s->next) { 1000 xco_cancel_set(&s->task->cancel); 1001 } 1002 /* Group-level cancel for anyone awaiting "the group as a whole." */ 1003 xco_cancel_set(&g->cancel); 1004 } 1005 1006 /* ==================================================================== 1007 * xco — coroutines 1008 * ==================================================================== */ 1009 1010 typedef struct xco_impl { 1011 xco_mach_t base; /* must be first; aliases xco_coro_t.base */ 1012 _Alignas(XCO__CTX_ALIGN) unsigned char ctx_buf[XCO__CTX_SIZE]; 1013 xco_platform_ctx_t *resumer_ctx; /* where suspend/return goes */ 1014 xco_fn fn; 1015 } xco_impl_t; 1016 1017 _Static_assert(sizeof(xco_impl_t) <= sizeof(xco_coro_t), "xco_coro_t too small"); 1018 _Static_assert(_Alignof(xco_impl_t) <= _Alignof(xco_coro_t), "xco_coro_t under-aligned"); 1019 1020 static inline xco_impl_t *xco_impl_of(xco_coro_t *c) { return (xco_impl_t *)c; } 1021 static inline xco_platform_ctx_t *xco_ctx_of(xco_impl_t *ci) { 1022 return (xco_platform_ctx_t *)ci->ctx_buf; 1023 } 1024 1025 /* Per-thread state. */ 1026 static _Thread_local xco_impl_t *xco_coro_current = NULL; 1027 static _Thread_local _Alignas(XCO__CTX_ALIGN) 1028 unsigned char xco_t_main_ctx_buf[XCO__CTX_SIZE]; 1029 static inline xco_platform_ctx_t *xco_main_ctx(void) { 1030 return (xco_platform_ctx_t *)xco_t_main_ctx_buf; 1031 } 1032 1033 /* Trampoline: runs on the coroutine's own stack, invoked by the 1034 * platform layer on the first switch into a fresh context. The 1035 * argument is the value passed to that first xco_step. The coroutine 1036 * identifies itself via xco_coro_current, set by the resumer just before 1037 * switching. */ 1038 static void xco_trampoline(uintptr_t arg) { 1039 xco_impl_t *self = xco_coro_current; 1040 uintptr_t ret = self->fn(arg); 1041 1042 self->base.status = XCO_STEP_DEAD; 1043 (void)xco_platform_switch(xco_ctx_of(self), self->resumer_ctx, ret); 1044 __builtin_unreachable(); 1045 } 1046 1047 /* xco_step_fn entry point wired into base.step at init time. All callers 1048 * — generic xco_step consumers and xco-aware code alike — route through 1049 * here. */ 1050 static xco_step_result_t xco_co_step(xco_mach_t *s, uintptr_t value) { 1051 xco_impl_t *next = (xco_impl_t *)s; 1052 assert(next->base.status == XCO_STEP_INIT || next->base.status == XCO_STEP_SUSPENDED); 1053 1054 xco_impl_t *prev = xco_coro_current; 1055 next->resumer_ctx = prev ? xco_ctx_of(prev) : xco_main_ctx(); 1056 next->base.status = XCO_STEP_RUNNING; 1057 xco_coro_current = next; 1058 1059 uintptr_t back = xco_platform_switch(next->resumer_ctx, 1060 xco_ctx_of(next), value); 1061 1062 /* Coroutine has either suspended or returned; status is already 1063 * set correctly by xco_suspend or by the xco_trampoline. */ 1064 xco_coro_current = prev; 1065 return (xco_step_result_t){ .value = back, .status = next->base.status }; 1066 } 1067 1068 void xco_init(xco_coro_t *c, xco_fn fn, 1069 void *stack_base, size_t stack_len) { 1070 xco_impl_t *ci = xco_impl_of(c); 1071 ci->base.step = xco_co_step; 1072 ci->base.status = XCO_STEP_INIT; 1073 ci->fn = fn; 1074 ci->resumer_ctx = NULL; 1075 xco_platform_init(xco_ctx_of(ci), stack_base, stack_len, xco_trampoline); 1076 } 1077 1078 uintptr_t xco_suspend(uintptr_t value) { 1079 xco_impl_t *self = xco_coro_current; 1080 assert(self != NULL); 1081 self->base.status = XCO_STEP_SUSPENDED; 1082 return xco_platform_switch(xco_ctx_of(self), self->resumer_ctx, value); 1083 } 1084 1085 xco_coro_t *xco_self(void) { 1086 return (xco_coro_t *)xco_coro_current; 1087 } 1088 1089 /* ---- xco-backed task -------------------------------------------------- */ 1090 1091 /* The xco_trampoline: runs as the coroutine's xco_fn. Recovers the owning 1092 * xco_cotask_t via container_of on the embedded co (xco_self() returns 1093 * the running xco), dispatches the user fn, and surfaces the return 1094 * value through xco_task_done — so a joiner waiting on xco_task_done_event 1095 * wakes with the body's return without the body needing to know about 1096 * the task surface at all. */ 1097 static uintptr_t xco_cotask_trampoline(uintptr_t arg) { 1098 xco_coro_t *self = xco_self(); 1099 xco_cotask_t *xt = (xco_cotask_t *)((char *)self - offsetof(xco_cotask_t, co)); 1100 uintptr_t r = xt->fn(&xt->task, arg); 1101 xco_task_done(&xt->task, r); 1102 return r; 1103 } 1104 1105 void xco_cotask_init(xco_cotask_t *xt, xco_cotask_fn fn, 1106 void *stack_base, size_t stack_len) { 1107 xco_task_init(&xt->task, &xt->co.base); 1108 xt->fn = fn; 1109 xco_init(&xt->co, xco_cotask_trampoline, stack_base, stack_len); 1110 } 1111 1112 /* ==================================================================== 1113 * xco_op — generic effect/IO layer 1114 * 1115 * The runtime owns a doubly-linked pending-ops list (op_head/op_tail). 1116 * Submit appends; cancel-while-PENDING splices in O(1); take detaches 1117 * the whole list in O(1) and lets the host iterate via the next pointers 1118 * (which are then the host's to reuse for in-flight tracking). 1119 * 1120 * Resolution always goes through xco_latch_set on op->done; awaiters 1121 * compose with the standard event API. Status is delivered as the latch 1122 * payload. 1123 * ==================================================================== */ 1124 1125 void xco_op_submit(xco_runtime_t *rt, xco_op_t *op) { 1126 xco_latch_init(&op->done); 1127 op->cancel_requested = false; 1128 op->rt = rt; 1129 op->epoch = rt->op_epoch; /* matches → PENDING */ 1130 op->next = NULL; 1131 op->prev = rt->op_tail; 1132 if (rt->op_tail) rt->op_tail->next = op; 1133 else rt->op_head = op; 1134 rt->op_tail = op; 1135 } 1136 1137 void xco_op_cancel(xco_op_t *op) { 1138 if (op->done.set) return; /* RESOLVED — no-op */ 1139 if (xco_op_is_pending(op)) { 1140 /* PENDING: splice from rt's pending list and resolve as cancelled. */ 1141 xco_runtime_t *rt = op->rt; 1142 if (op->prev) op->prev->next = op->next; 1143 else rt->op_head = op->next; 1144 if (op->next) op->next->prev = op->prev; 1145 else rt->op_tail = op->prev; 1146 op->prev = op->next = NULL; 1147 xco_latch_set(&op->done, (uintptr_t)XCO_OP_CANCELLED); 1148 return; 1149 } 1150 /* IN_FLIGHT: advisory. Host sees cancel_requested and decides whether 1151 * to honor it; xco_op_complete is still the final word either way. */ 1152 op->cancel_requested = true; 1153 } 1154 1155 void xco_op_complete(xco_op_t *op, xco_op_status_t status) { 1156 /* xco_latch_set is idempotent; second/late completion is a no-op. 1157 * Per the contract, host calls this only after take, so op is not 1158 * on rt's pending list. */ 1159 xco_latch_set(&op->done, (uintptr_t)status); 1160 } 1161 1162 xco_op_t *xco_rt_take_ops(xco_runtime_t *rt, xco_op_t **tail_out) { 1163 xco_op_t *head = rt->op_head; 1164 if (tail_out) *tail_out = rt->op_tail; 1165 rt->op_head = rt->op_tail = NULL; 1166 /* Bump the epoch: every op currently on the (now-detached) batch had 1167 * its epoch field set to the old value at submit, so they all flip to 1168 * IN_FLIGHT in O(1). Submits after this point land in the new 1169 * generation. */ 1170 rt->op_epoch++; 1171 return head; 1172 }