test_event.c (96706B)
1 /* 2 * test_event.c — exercises xco_latch_t, xco_select_event_t, xco_runtime_t. 3 * 4 * Drives hand-coded xco_step state machines through the event substrate 5 * — no coroutines required. A "waiter" is the canonical pattern: try 6 * the event, park if not ready, re-try after wake. 7 */ 8 9 #include "xco.h" 10 11 #include <assert.h> 12 #include <stdio.h> 13 14 /* ---- Waiter: blocks on one event, captures its value -------------- */ 15 16 typedef struct { 17 xco_mach_t base; 18 xco_event_t *e; 19 xco_runtime_t *rt; 20 xco_waker_t sw; 21 int phase; 22 uintptr_t got; 23 } waiter_t; 24 25 static xco_step_result_t waiter_step(xco_mach_t *s, uintptr_t v) { 26 waiter_t *w = (waiter_t *)s; 27 switch (w->phase) { 28 case 0: { 29 uintptr_t out; 30 if (xco_event_poll(w->e, &out, NULL)) { 31 w->got = out; 32 w->phase = 2; 33 return (xco_step_result_t){out, XCO_STEP_DEAD}; 34 } 35 xco_waker_init(&w->sw, w->rt, &w->base); 36 xco_event_poll(w->e, NULL, &w->sw.base); 37 w->phase = 1; 38 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 39 } 40 case 1: 41 /* v is the value handed in by the fire site (latch payload, or 42 * for select, the winning index). No re-try needed. */ 43 w->got = v; 44 w->phase = 2; 45 return (xco_step_result_t){v, XCO_STEP_DEAD}; 46 } 47 __builtin_unreachable(); 48 } 49 50 static void waiter_init(waiter_t *w, xco_runtime_t *rt, xco_event_t *e) { 51 w->base = (xco_mach_t){.step = waiter_step, .status = XCO_STEP_INIT}; 52 w->e = e; 53 w->rt = rt; 54 w->phase = 0; 55 w->got = 0; 56 } 57 58 /* ---- Latch tests --------------------------------------------------- */ 59 60 static void test_latch_wake(void) { 61 xco_runtime_t rt; xco_rt_init(&rt); 62 xco_latch_t l; xco_latch_init(&l); 63 64 waiter_t w; waiter_init(&w, &rt, &l.base); 65 66 xco_step_result_t r = xco_step(&w.base, 0); 67 assert(r.status == XCO_STEP_SUSPENDED); 68 assert(rt.head == NULL); /* parked, not queued */ 69 assert(l.waiters == &w.sw.base); /* armed on the latch */ 70 71 xco_latch_set(&l, 42); 72 assert(l.waiters == NULL); /* drained on fire */ 73 assert(rt.head != NULL); /* fire enqueued the step */ 74 75 xco_rt_run(&rt, 0); 76 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 77 assert(w.got == 42); 78 assert(rt.head == NULL); 79 } 80 81 static void test_latch_already_set(void) { 82 /* Already-set latch: try succeeds inline, no park, no queueing. */ 83 xco_runtime_t rt; xco_rt_init(&rt); 84 xco_latch_t l; xco_latch_init(&l); 85 xco_latch_set(&l, 7); 86 87 waiter_t w; waiter_init(&w, &rt, &l.base); 88 89 xco_step_result_t r = xco_step(&w.base, 0); 90 assert(r.status == XCO_STEP_DEAD); 91 assert(r.value == 7); 92 assert(w.got == 7); 93 assert(rt.head == NULL); 94 } 95 96 static void test_latch_multi_waiter(void) { 97 /* One xco_latch_set wakes every waiter. */ 98 xco_runtime_t rt; xco_rt_init(&rt); 99 xco_latch_t l; xco_latch_init(&l); 100 101 waiter_t a, b, c; 102 waiter_init(&a, &rt, &l.base); 103 waiter_init(&b, &rt, &l.base); 104 waiter_init(&c, &rt, &l.base); 105 106 xco_step(&a.base, 0); 107 xco_step(&b.base, 0); 108 xco_step(&c.base, 0); 109 assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED); 110 assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); 111 assert(xco_mach_status(&c.base) == XCO_STEP_SUSPENDED); 112 113 xco_latch_set(&l, 99); 114 xco_rt_run(&rt, 0); 115 116 assert(xco_mach_status(&a.base) == XCO_STEP_DEAD && a.got == 99); 117 assert(xco_mach_status(&b.base) == XCO_STEP_DEAD && b.got == 99); 118 assert(xco_mach_status(&c.base) == XCO_STEP_DEAD && c.got == 99); 119 } 120 121 static void test_latch_set_idempotent(void) { 122 xco_latch_t l; xco_latch_init(&l); 123 xco_latch_set(&l, 1); 124 xco_latch_set(&l, 2); /* ignored */ 125 126 uintptr_t v = 0; 127 assert(xco_event_poll(&l.base, &v, NULL)); 128 assert(v == 1); 129 } 130 131 static void test_latch_unpark(void) { 132 /* Manually park then unpark — verify the waiter is removed cleanly. */ 133 xco_runtime_t rt; xco_rt_init(&rt); 134 xco_latch_t l; xco_latch_init(&l); 135 136 xco_waker_t sw1, sw2, sw3; 137 /* Step pointer is just a sentinel here; we never run them. */ 138 xco_waker_init(&sw1, &rt, (xco_mach_t *)0x1); 139 xco_waker_init(&sw2, &rt, (xco_mach_t *)0x2); 140 xco_waker_init(&sw3, &rt, (xco_mach_t *)0x3); 141 xco_event_poll(&l.base, NULL, &sw1.base); 142 xco_event_poll(&l.base, NULL, &sw2.base); 143 xco_event_poll(&l.base, NULL, &sw3.base); 144 145 /* Remove the middle one. */ 146 xco_event_unpark(&l.base, &sw2.base); 147 assert(sw2.base.next == NULL); 148 149 /* Unpark of an already-removed waiter is a no-op. */ 150 xco_event_unpark(&l.base, &sw2.base); 151 152 /* sw1 and sw3 still on the list. */ 153 int seen1 = 0, seen3 = 0; 154 for (xco_waiter_t *w = l.waiters; w; w = w->next) { 155 if (w == &sw1.base) seen1 = 1; 156 if (w == &sw3.base) seen3 = 1; 157 assert(w != &sw2.base); 158 } 159 assert(seen1 && seen3); 160 161 /* Drain so we don't leave dangling armed waiters. */ 162 xco_event_unpark(&l.base, &sw1.base); 163 xco_event_unpark(&l.base, &sw3.base); 164 assert(l.waiters == NULL); 165 } 166 167 /* ---- Select tests -------------------------------------------------- */ 168 169 static void test_select_winner(void) { 170 xco_runtime_t rt; xco_rt_init(&rt); 171 xco_latch_t a, b, c; 172 xco_latch_init(&a); xco_latch_init(&b); xco_latch_init(&c); 173 174 xco_select_event_t s; 175 xco_select_input_t inputs[3]; 176 xco_event_t *srcs[3] = {&a.base, &b.base, &c.base}; 177 xco_select_event_init(&s, inputs, 3, srcs); 178 179 waiter_t w; waiter_init(&w, &rt, &s.done.base); 180 xco_step(&w.base, 0); 181 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 182 assert(a.waiters && b.waiters && c.waiters); /* all armed */ 183 184 xco_latch_set(&b, 0xBBB); 185 xco_rt_run(&rt, 0); 186 187 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 188 assert(w.got == 1); /* b's index */ 189 190 /* Losers were disarmed by xco_select_input_fire. */ 191 assert(a.waiters == NULL); 192 assert(c.waiters == NULL); 193 194 /* Re-trying the winning input yields its actual payload. */ 195 uintptr_t v; 196 assert(xco_event_poll(&b.base, &v, NULL)); 197 assert(v == 0xBBB); 198 199 xco_select_event_deinit(&s); 200 } 201 202 static void test_select_fast_path(void) { 203 /* Input already set at init: fire immediately, never park anyone. */ 204 xco_latch_t a, b; 205 xco_latch_init(&a); xco_latch_init(&b); 206 xco_latch_set(&a, 0xAAA); 207 208 xco_select_event_t s; 209 xco_select_input_t inputs[2]; 210 xco_event_t *srcs[2] = {&a.base, &b.base}; 211 xco_select_event_init(&s, inputs, 2, srcs); 212 213 uintptr_t v; 214 assert(xco_event_poll(&s.done.base, &v, NULL)); 215 assert(v == 0); /* a wins */ 216 assert(b.waiters == NULL); /* nothing parked on the loser */ 217 218 xco_select_event_deinit(&s); 219 } 220 221 static void test_select_deinit_unparks(void) { 222 /* Selecting then dropping (no input fires) must release input waiters. */ 223 xco_latch_t a, b; 224 xco_latch_init(&a); xco_latch_init(&b); 225 226 xco_select_event_t s; 227 xco_select_input_t inputs[2]; 228 xco_event_t *srcs[2] = {&a.base, &b.base}; 229 xco_select_event_init(&s, inputs, 2, srcs); 230 assert(a.waiters != NULL); 231 assert(b.waiters != NULL); 232 233 xco_select_event_deinit(&s); 234 assert(a.waiters == NULL); 235 assert(b.waiters == NULL); 236 } 237 238 static void test_select_compose(void) { 239 /* select(select(A, B), C): firing A propagates through both layers. */ 240 xco_runtime_t rt; xco_rt_init(&rt); 241 xco_latch_t a, b, c; 242 xco_latch_init(&a); xco_latch_init(&b); xco_latch_init(&c); 243 244 xco_select_event_t inner; 245 xco_select_input_t in_inputs[2]; 246 xco_event_t *in_srcs[2] = {&a.base, &b.base}; 247 xco_select_event_init(&inner, in_inputs, 2, in_srcs); 248 249 xco_select_event_t outer; 250 xco_select_input_t out_inputs[2]; 251 xco_event_t *out_srcs[2] = {&inner.done.base, &c.base}; 252 xco_select_event_init(&outer, out_inputs, 2, out_srcs); 253 254 waiter_t w; waiter_init(&w, &rt, &outer.done.base); 255 xco_step(&w.base, 0); 256 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 257 258 xco_latch_set(&a, 0); 259 xco_rt_run(&rt, 0); 260 261 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 262 assert(w.got == 0); /* outer winner: inner (index 0) */ 263 264 uintptr_t v; 265 assert(xco_event_poll(&inner.done.base, &v, NULL)); 266 assert(v == 0); /* inner winner: a (index 0) */ 267 268 /* The unrelated source c was disarmed when outer fired. */ 269 assert(c.waiters == NULL); 270 /* And b was disarmed when inner fired. */ 271 assert(b.waiters == NULL); 272 273 xco_select_event_deinit(&outer); 274 xco_select_event_deinit(&inner); 275 } 276 277 /* ---- All-of -------------------------------------------------------- */ 278 279 static void test_allof_basic(void) { 280 /* Three latches: done fires only after all three set. Storage is the 281 * same xco_select_event_t / xco_select_input_t — only the init call differs. */ 282 xco_runtime_t rt; xco_rt_init(&rt); 283 xco_latch_t a, b, c; 284 xco_latch_init(&a); xco_latch_init(&b); xco_latch_init(&c); 285 286 xco_select_event_t s; 287 xco_select_input_t inputs[3]; 288 xco_event_t *srcs[3] = {&a.base, &b.base, &c.base}; 289 xco_allof_event_init(&s, inputs, 3, srcs); 290 assert(!s.done.set); 291 assert(a.waiters && b.waiters && c.waiters); 292 293 waiter_t w; waiter_init(&w, &rt, &s.done.base); 294 xco_step(&w.base, 0); 295 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 296 297 xco_latch_set(&a, 0xAAA); 298 xco_rt_run(&rt, 0); 299 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); /* still waiting */ 300 assert(!s.done.set); 301 302 xco_latch_set(&b, 0xBBB); 303 xco_rt_run(&rt, 0); 304 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 305 assert(!s.done.set); 306 307 xco_latch_set(&c, 0xCCC); 308 xco_rt_run(&rt, 0); 309 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 310 assert(w.got == 2); /* closing index = c */ 311 312 assert(inputs[0].value == 0xAAA); 313 assert(inputs[1].value == 0xBBB); 314 assert(inputs[2].value == 0xCCC); 315 316 xco_select_event_deinit(&s); 317 } 318 319 static void test_allof_fast_path_partial(void) { 320 /* Some ready at init: those are captured inline, no parking. */ 321 xco_latch_t a, b; 322 xco_latch_init(&a); xco_latch_init(&b); 323 xco_latch_set(&a, 0xAAA); 324 325 xco_select_event_t s; 326 xco_select_input_t inputs[2]; 327 xco_event_t *srcs[2] = {&a.base, &b.base}; 328 xco_allof_event_init(&s, inputs, 2, srcs); 329 330 assert(!s.done.set); /* still need b */ 331 assert(a.waiters == NULL); /* a was inline */ 332 assert(b.waiters == &inputs[1].w); /* b is parked */ 333 assert(inputs[0].value == 0xAAA); 334 335 xco_latch_set(&b, 0xBBB); 336 assert(s.done.set); 337 assert(inputs[1].value == 0xBBB); 338 uintptr_t v; 339 assert(xco_event_poll(&s.done.base, &v, NULL)); 340 assert(v == 1); /* b closed the wait */ 341 342 xco_select_event_deinit(&s); 343 } 344 345 static void test_allof_fast_path_all(void) { 346 /* All ready at init: done fires immediately, no waiter is ever parked. */ 347 xco_latch_t a, b; 348 xco_latch_init(&a); xco_latch_init(&b); 349 xco_latch_set(&a, 0xAAA); 350 xco_latch_set(&b, 0xBBB); 351 352 xco_select_event_t s; 353 xco_select_input_t inputs[2]; 354 xco_event_t *srcs[2] = {&a.base, &b.base}; 355 xco_allof_event_init(&s, inputs, 2, srcs); 356 357 assert(s.done.set); 358 uintptr_t v; 359 assert(xco_event_poll(&s.done.base, &v, NULL)); 360 assert(v == 1); /* last input's index */ 361 assert(inputs[0].value == 0xAAA); 362 assert(inputs[1].value == 0xBBB); 363 364 xco_select_event_deinit(&s); 365 } 366 367 static void test_allof_empty(void) { 368 /* n=0: fires immediately with payload 0. */ 369 xco_select_event_t s; 370 xco_allof_event_init(&s, NULL, 0, NULL); 371 assert(s.done.set); 372 uintptr_t v; 373 assert(xco_event_poll(&s.done.base, &v, NULL)); 374 assert(v == 0); 375 xco_select_event_deinit(&s); 376 } 377 378 static void test_allof_deinit_partial(void) { 379 /* One input fired, one still parked: deinit must release the parked 380 * waiter without disturbing the fired one. */ 381 xco_latch_t a, b; 382 xco_latch_init(&a); xco_latch_init(&b); 383 384 xco_select_event_t s; 385 xco_select_input_t inputs[2]; 386 xco_event_t *srcs[2] = {&a.base, &b.base}; 387 xco_allof_event_init(&s, inputs, 2, srcs); 388 389 xco_latch_set(&a, 0xAAA); 390 assert(!s.done.set); 391 assert(a.waiters == NULL); /* a's waiter detached on fire */ 392 assert(b.waiters == &inputs[1].w); /* b still parked */ 393 394 xco_select_event_deinit(&s); 395 assert(b.waiters == NULL); /* released */ 396 } 397 398 static void test_allof_compose_with_select(void) { 399 /* An allof-mode event is still a xco_select_event_t — feed it into a 400 * select-mode wait. */ 401 xco_runtime_t rt; xco_rt_init(&rt); 402 xco_latch_t a, b, c; 403 xco_latch_init(&a); xco_latch_init(&b); xco_latch_init(&c); 404 405 xco_select_event_t all; 406 xco_select_input_t all_inputs[2]; 407 xco_event_t *all_srcs[2] = {&a.base, &b.base}; 408 xco_allof_event_init(&all, all_inputs, 2, all_srcs); 409 410 xco_select_event_t sel; 411 xco_select_input_t sel_inputs[2]; 412 xco_event_t *sel_srcs[2] = {&all.done.base, &c.base}; 413 xco_select_event_init(&sel, sel_inputs, 2, sel_srcs); 414 415 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 416 xco_step(&w.base, 0); 417 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 418 419 /* Fire both halves of the allof; the select then sees its first input ready. */ 420 xco_latch_set(&a, 1); 421 xco_latch_set(&b, 2); 422 xco_rt_run(&rt, 0); 423 424 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 425 assert(w.got == 0); /* allof side won the select */ 426 assert(all.done.set); 427 assert(c.waiters == NULL); /* select disarmed the loser */ 428 429 xco_select_event_deinit(&sel); 430 xco_select_event_deinit(&all); 431 } 432 433 /* ---- Channel ------------------------------------------------------- */ 434 435 /* Sender state machine: poll; if blocked, the poll already parked us. */ 436 typedef struct { 437 xco_mach_t base; 438 xco_chan_t *c; 439 xco_runtime_t *rt; 440 xco_chan_send_waiter_t csw; 441 uintptr_t value; 442 int phase; 443 bool done; 444 } sender_t; 445 446 static xco_step_result_t sender_step(xco_mach_t *s, uintptr_t v) { 447 sender_t *snd = (sender_t *)s; 448 (void)v; 449 switch (snd->phase) { 450 case 0: 451 xco_chan_send_waiter_init(&snd->csw, snd->rt, &snd->base); 452 switch (xco_chan_send_poll(snd->c, snd->value, &snd->csw)) { 453 case XCO_SEND_DELIVERED: 454 case XCO_SEND_CLOSED: 455 snd->done = true; 456 snd->phase = 2; 457 return (xco_step_result_t){0, XCO_STEP_DEAD}; 458 case XCO_SEND_BLOCKED: 459 snd->phase = 1; 460 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 461 } 462 __builtin_unreachable(); 463 case 1: 464 snd->done = true; 465 snd->phase = 2; 466 return (xco_step_result_t){0, XCO_STEP_DEAD}; 467 } 468 __builtin_unreachable(); 469 } 470 471 static void sender_init(sender_t *snd, xco_runtime_t *rt, xco_chan_t *c, uintptr_t value) { 472 snd->base = (xco_mach_t){.step = sender_step, .status = XCO_STEP_INIT}; 473 snd->c = c; 474 snd->rt = rt; 475 snd->value = value; 476 snd->phase = 0; 477 snd->done = false; 478 } 479 480 static void test_chan_send_blocks_until_recv(void) { 481 /* Sender arrives first, parks; receiver pulls and both finish. */ 482 xco_runtime_t rt; xco_rt_init(&rt); 483 xco_chan_t c; xco_chan_init(&c); 484 485 sender_t snd; sender_init(&snd, &rt, &c, 0xDEADBEEF); 486 xco_step(&snd.base, 0); 487 assert(xco_mach_status(&snd.base) == XCO_STEP_SUSPENDED); 488 assert(c.send_head == &snd.csw.sw.base); 489 assert(c.send_tail == &snd.csw.sw.base); 490 491 waiter_t r; waiter_init(&r, &rt, &c.recv); 492 xco_step_result_t rr = xco_step(&r.base, 0); 493 assert(rr.status == XCO_STEP_DEAD); 494 assert(rr.value == 0xDEADBEEF); 495 assert(r.got == 0xDEADBEEF); 496 assert(c.send_head == NULL); 497 498 /* Sender's resumption is queued by the recv-side fire. */ 499 xco_rt_run(&rt, 0); 500 assert(xco_mach_status(&snd.base) == XCO_STEP_DEAD); 501 assert(snd.done); 502 } 503 504 static void test_chan_recv_blocks_until_send(void) { 505 /* Receiver arrives first, parks; sender delivers inline. */ 506 xco_runtime_t rt; xco_rt_init(&rt); 507 xco_chan_t c; xco_chan_init(&c); 508 509 waiter_t r; waiter_init(&r, &rt, &c.recv); 510 xco_step(&r.base, 0); 511 assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); 512 assert(c.recv_head == &r.sw.base); 513 514 assert(xco_chan_send_poll(&c, 0xCAFE, NULL) == XCO_SEND_DELIVERED); 515 assert(c.recv_head == NULL); 516 517 xco_rt_run(&rt, 0); 518 assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); 519 assert(r.got == 0xCAFE); 520 } 521 522 static void test_chan_send_try_no_recv(void) { 523 /* No receiver parked: pure-try poll reports BLOCKED without parking. */ 524 xco_chan_t c; xco_chan_init(&c); 525 assert(xco_chan_send_poll(&c, 1, NULL) == XCO_SEND_BLOCKED); 526 assert(c.send_head == NULL && c.recv_head == NULL); 527 } 528 529 static void test_chan_fifo(void) { 530 /* Three senders park; three receives pull values in arrival order. */ 531 xco_runtime_t rt; xco_rt_init(&rt); 532 xco_chan_t c; xco_chan_init(&c); 533 534 sender_t s[3]; 535 for (int i = 0; i < 3; i++) { 536 sender_init(&s[i], &rt, &c, (uintptr_t)(100 + i)); 537 xco_step(&s[i].base, 0); 538 assert(xco_mach_status(&s[i].base) == XCO_STEP_SUSPENDED); 539 } 540 assert(c.send_head == &s[0].csw.sw.base); 541 assert(c.send_tail == &s[2].csw.sw.base); 542 543 for (int i = 0; i < 3; i++) { 544 uintptr_t v; 545 assert(xco_event_poll(&c.recv, &v, NULL)); 546 assert(v == (uintptr_t)(100 + i)); 547 } 548 assert(c.send_head == NULL); 549 550 xco_rt_run(&rt, 0); 551 for (int i = 0; i < 3; i++) { 552 assert(xco_mach_status(&s[i].base) == XCO_STEP_DEAD); 553 assert(s[i].done); 554 } 555 } 556 557 static void test_chan_send_unpark(void) { 558 /* Cancel a parked sender — list is repaired; other waiters intact. */ 559 xco_runtime_t rt; xco_rt_init(&rt); 560 xco_chan_t c; xco_chan_init(&c); 561 562 sender_t a, b, d; 563 sender_init(&a, &rt, &c, 1); 564 sender_init(&b, &rt, &c, 2); 565 sender_init(&d, &rt, &c, 3); 566 xco_step(&a.base, 0); xco_step(&b.base, 0); xco_step(&d.base, 0); 567 assert(c.send_head == &a.csw.sw.base); 568 assert(c.send_tail == &d.csw.sw.base); 569 570 xco_chan_send_unpark(&c, &b.csw); /* middle */ 571 /* Idempotent: removing again is a no-op. */ 572 xco_chan_send_unpark(&c, &b.csw); 573 574 /* Order preserved: a, then d. */ 575 uintptr_t v; 576 assert(xco_event_poll(&c.recv, &v, NULL) && v == 1); 577 assert(xco_event_poll(&c.recv, &v, NULL) && v == 3); 578 assert(c.send_head == NULL); 579 580 /* a and d resume; b stays SUSPENDED (its waiter was unparked 581 * without firing). Drain so it doesn't dangle. */ 582 xco_rt_run(&rt, 0); 583 assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); 584 assert(xco_mach_status(&d.base) == XCO_STEP_DEAD); 585 assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); 586 } 587 588 static void test_chan_select_recv(void) { 589 /* select(xco_chan_recv, latch). Sender delivers; select fires with 590 * xco_chan_recv's index, and the value is captured in inputs[winner].value. */ 591 xco_runtime_t rt; xco_rt_init(&rt); 592 xco_chan_t c; xco_chan_init(&c); 593 xco_latch_t l; xco_latch_init(&l); 594 595 xco_select_event_t sel; 596 xco_select_input_t inputs[2]; 597 xco_event_t *srcs[2] = {&c.recv, &l.base}; 598 xco_select_event_init(&sel, inputs, 2, srcs); 599 600 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 601 xco_step(&w.base, 0); 602 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 603 assert(c.recv_head != NULL); /* select's input waiter parked here */ 604 605 assert(xco_chan_send_poll(&c, 0xABCDEF, NULL) == XCO_SEND_DELIVERED); 606 607 xco_rt_run(&rt, 0); 608 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 609 assert(w.got == 0); /* xco_chan_recv won (index 0) */ 610 assert(inputs[0].value == 0xABCDEF); /* captured value */ 611 612 /* Loser was disarmed; deinit is safe (no-op since fired). */ 613 assert(l.waiters == NULL); 614 xco_select_event_deinit(&sel); 615 } 616 617 static void test_chan_recv_fifo(void) { 618 /* Three receivers park; three sends deliver in arrival order. 619 * The two waitlists are mutually exclusive (try-then-park is 620 * atomic in single-threaded use), so the channel state at any 621 * moment has at most one side queued. */ 622 xco_runtime_t rt; xco_rt_init(&rt); 623 xco_chan_t c; xco_chan_init(&c); 624 625 waiter_t r[3]; 626 for (int i = 0; i < 3; i++) { 627 waiter_init(&r[i], &rt, &c.recv); 628 xco_step(&r[i].base, 0); 629 assert(xco_mach_status(&r[i].base) == XCO_STEP_SUSPENDED); 630 } 631 assert(c.recv_head == &r[0].sw.base); 632 assert(c.recv_tail == &r[2].sw.base); 633 assert(c.send_head == NULL); /* never both sides */ 634 635 for (int i = 0; i < 3; i++) { 636 assert(xco_chan_send_poll(&c, (uintptr_t)(200 + i), NULL) == XCO_SEND_DELIVERED); 637 } 638 assert(c.recv_head == NULL); 639 640 xco_rt_run(&rt, 0); 641 for (int i = 0; i < 3; i++) { 642 assert(xco_mach_status(&r[i].base) == XCO_STEP_DEAD); 643 assert(r[i].got == (uintptr_t)(200 + i)); 644 } 645 } 646 647 /* ---- Send op (selectable send) ------------------------------------- */ 648 649 static void test_chan_send_op_inline(void) { 650 /* Receiver already parked: op_init delivers immediately, done set. */ 651 xco_runtime_t rt; xco_rt_init(&rt); 652 xco_chan_t c; xco_chan_init(&c); 653 654 waiter_t r; waiter_init(&r, &rt, &c.recv); 655 xco_step(&r.base, 0); 656 assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); 657 658 xco_chan_send_op_t op; 659 xco_chan_send_op_init(&op, &c, 0xFEED); 660 assert(op.done.set); /* delivered inline */ 661 assert(c.send_head == NULL); /* nothing parked */ 662 663 xco_rt_run(&rt, 0); 664 assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); 665 assert(r.got == 0xFEED); 666 667 xco_chan_send_op_deinit(&op); 668 } 669 670 static void test_chan_send_op_blocks(void) { 671 /* No receiver: op parks. Receiver arrives later, op.done fires. */ 672 xco_chan_t c; xco_chan_init(&c); 673 674 xco_chan_send_op_t op; 675 xco_chan_send_op_init(&op, &c, 0xBEAD); 676 assert(!op.done.set); 677 assert(c.send_head == &op.qsw.sw.base); 678 679 /* Recv pulls value and fires op's waiter → sets op.done. */ 680 uintptr_t v; 681 assert(xco_event_poll(&c.recv, &v, NULL)); 682 assert(v == 0xBEAD); 683 assert(op.done.set); 684 assert(c.send_head == NULL); 685 686 xco_chan_send_op_deinit(&op); 687 } 688 689 static void test_chan_select_send(void) { 690 /* select(send_op, latch). Receiver pulls; send wins. */ 691 xco_runtime_t rt; xco_rt_init(&rt); 692 xco_chan_t c; xco_chan_init(&c); 693 xco_latch_t timeout; xco_latch_init(&timeout); 694 695 xco_chan_send_op_t op; 696 xco_chan_send_op_init(&op, &c, 0x5EED); 697 assert(!op.done.set); /* parked, no recv */ 698 699 xco_select_event_t sel; 700 xco_select_input_t inputs[2]; 701 xco_event_t *srcs[2] = {&op.done.base, &timeout.base}; 702 xco_select_event_init(&sel, inputs, 2, srcs); 703 704 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 705 xco_step(&w.base, 0); 706 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 707 708 /* A receiver arrives and pulls. op.done fires → select fires. */ 709 uintptr_t v; 710 assert(xco_event_poll(&c.recv, &v, NULL)); 711 assert(v == 0x5EED); 712 713 xco_rt_run(&rt, 0); 714 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 715 assert(w.got == 0); /* send op won */ 716 assert(timeout.waiters == NULL); /* loser disarmed */ 717 718 xco_select_event_deinit(&sel); 719 xco_chan_send_op_deinit(&op); 720 } 721 722 static void test_chan_select_send_loses(void) { 723 /* select(send_op, latch). Latch fires first; send is canceled 724 * cleanly via op_deinit (chan side disarmed, no dangling waiter). */ 725 xco_runtime_t rt; xco_rt_init(&rt); 726 xco_chan_t c; xco_chan_init(&c); 727 xco_latch_t l; xco_latch_init(&l); 728 729 xco_chan_send_op_t op; 730 xco_chan_send_op_init(&op, &c, 0xABBA); 731 assert(c.send_head == &op.qsw.sw.base); 732 733 xco_select_event_t sel; 734 xco_select_input_t inputs[2]; 735 xco_event_t *srcs[2] = {&op.done.base, &l.base}; 736 xco_select_event_init(&sel, inputs, 2, srcs); 737 738 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 739 xco_step(&w.base, 0); 740 741 xco_latch_set(&l, 0xDEAD); 742 xco_rt_run(&rt, 0); 743 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 744 assert(w.got == 1); /* latch (index 1) won */ 745 assert(inputs[1].value == 0xDEAD); 746 747 /* send didn't happen — op still parked on chan. deinit unparks. */ 748 assert(c.send_head == &op.qsw.sw.base); 749 xco_select_event_deinit(&sel); 750 xco_chan_send_op_deinit(&op); 751 assert(c.send_head == NULL); 752 } 753 754 static void test_chan_select_recv_fast_path(void) { 755 /* Sender already parked when select arms: fast path captures value. */ 756 xco_runtime_t rt; xco_rt_init(&rt); 757 xco_chan_t c; xco_chan_init(&c); 758 759 sender_t snd; sender_init(&snd, &rt, &c, 0x12345); 760 xco_step(&snd.base, 0); 761 assert(xco_mach_status(&snd.base) == XCO_STEP_SUSPENDED); 762 763 xco_latch_t l; xco_latch_init(&l); 764 xco_select_event_t sel; 765 xco_select_input_t inputs[2]; 766 xco_event_t *srcs[2] = {&c.recv, &l.base}; 767 xco_select_event_init(&sel, inputs, 2, srcs); 768 769 /* Fast path fired immediately. */ 770 uintptr_t winner; 771 assert(xco_event_poll(&sel.done.base, &winner, NULL)); 772 assert(winner == 0); 773 assert(inputs[0].value == 0x12345); 774 775 xco_rt_run(&rt, 0); 776 assert(xco_mach_status(&snd.base) == XCO_STEP_DEAD); 777 xco_select_event_deinit(&sel); 778 } 779 780 /* ---- Cancellation -------------------------------------------------- */ 781 782 static void test_cancel_basic(void) { 783 /* The alias is a thin rename over latch: not-set, set, idempotent set, 784 * and xco_event_poll yields the latch's payload of 0. */ 785 xco_cancel_t c; xco_cancel_init(&c); 786 assert(!xco_cancel_is_set(&c)); 787 xco_cancel_set(&c); 788 assert(xco_cancel_is_set(&c)); 789 xco_cancel_set(&c); /* idempotent */ 790 assert(xco_cancel_is_set(&c)); 791 792 uintptr_t v = 0xBADD; 793 assert(xco_event_poll(xco_cancel_event(&c), &v, NULL)); 794 assert(v == 0); 795 } 796 797 static void test_wait_or_cancel_ev_wins(void) { 798 /* Event resolves first: waiter sees XCO_WAIT_OK and inputs[0].value 799 * carries the event's payload. The cancel side is disarmed. */ 800 xco_runtime_t rt; xco_rt_init(&rt); 801 xco_latch_t l; xco_latch_init(&l); 802 xco_cancel_t c; xco_cancel_init(&c); 803 804 xco_select_event_t sel; 805 xco_select_input_t inputs[2]; 806 xco_wait_or_cancel(&sel, inputs, &l.base, &c); 807 808 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 809 xco_step(&w.base, 0); 810 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 811 assert(l.waiters && c.waiters); /* both armed */ 812 813 xco_latch_set(&l, 0xF00D); 814 xco_rt_run(&rt, 0); 815 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 816 assert(w.got == XCO_WAIT_OK); 817 assert(inputs[XCO_WAIT_OK].value == 0xF00D); 818 assert(c.waiters == NULL); /* cancel disarmed */ 819 820 xco_select_event_deinit(&sel); 821 } 822 823 static void test_wait_or_cancel_cancel_wins(void) { 824 /* Cancel fires while parked: waiter sees XCO_WAIT_CANCELLED, ev disarmed. */ 825 xco_runtime_t rt; xco_rt_init(&rt); 826 xco_latch_t l; xco_latch_init(&l); 827 xco_cancel_t c; xco_cancel_init(&c); 828 829 xco_select_event_t sel; 830 xco_select_input_t inputs[2]; 831 xco_wait_or_cancel(&sel, inputs, &l.base, &c); 832 833 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 834 xco_step(&w.base, 0); 835 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 836 837 xco_cancel_set(&c); 838 xco_rt_run(&rt, 0); 839 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 840 assert(w.got == XCO_WAIT_CANCELLED); 841 assert(l.waiters == NULL); /* ev disarmed */ 842 843 xco_select_event_deinit(&sel); 844 } 845 846 static void test_wait_or_cancel_already_cancelled(void) { 847 /* Pre-set cancel: select fast-path fires XCO_WAIT_CANCELLED at init time; 848 * ev is never parked, so deinit has nothing to disarm. */ 849 xco_latch_t l; xco_latch_init(&l); 850 xco_cancel_t c; xco_cancel_init(&c); 851 xco_cancel_set(&c); 852 853 xco_select_event_t sel; 854 xco_select_input_t inputs[2]; 855 xco_wait_or_cancel(&sel, inputs, &l.base, &c); 856 857 uintptr_t v; 858 assert(xco_event_poll(&sel.done.base, &v, NULL)); 859 assert(v == XCO_WAIT_CANCELLED); 860 assert(l.waiters == NULL); /* never parked */ 861 862 xco_select_event_deinit(&sel); 863 } 864 865 static void test_wait_or_cancel_ev_precedes_cancel(void) { 866 /* Both ready at init: ev wins (it's checked first in the fast path). 867 * This is the right semantic — if the work has already resolved, 868 * a concurrent cancel doesn't retroactively undo it. */ 869 xco_latch_t l; xco_latch_init(&l); 870 xco_cancel_t c; xco_cancel_init(&c); 871 xco_latch_set(&l, 0x600D); 872 xco_cancel_set(&c); 873 874 xco_select_event_t sel; 875 xco_select_input_t inputs[2]; 876 xco_wait_or_cancel(&sel, inputs, &l.base, &c); 877 878 uintptr_t v; 879 assert(xco_event_poll(&sel.done.base, &v, NULL)); 880 assert(v == XCO_WAIT_OK); 881 assert(inputs[XCO_WAIT_OK].value == 0x600D); 882 883 xco_select_event_deinit(&sel); 884 } 885 886 static void test_wait_or_cancel_chan_recv(void) { 887 /* Waiter blocks on xco_chan_recv via xco_wait_or_cancel; cancel fires while 888 * parked. After resume + deinit, the chan's recv list must be empty 889 * so a future send doesn't deliver to a freed waiter. */ 890 xco_runtime_t rt; xco_rt_init(&rt); 891 xco_chan_t ch; xco_chan_init(&ch); 892 xco_cancel_t c; xco_cancel_init(&c); 893 894 xco_select_event_t sel; 895 xco_select_input_t inputs[2]; 896 xco_wait_or_cancel(&sel, inputs, &ch.recv, &c); 897 assert(ch.recv_head == &inputs[0].w); /* select's input parked */ 898 899 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 900 xco_step(&w.base, 0); 901 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 902 903 xco_cancel_set(&c); 904 xco_rt_run(&rt, 0); 905 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 906 assert(w.got == XCO_WAIT_CANCELLED); 907 assert(ch.recv_head == NULL); /* xco_select_input_fire disarmed */ 908 909 xco_select_event_deinit(&sel); 910 /* No stale waiter lingering: pure-try poll reports BLOCKED, fires nothing. */ 911 assert(xco_chan_send_poll(&ch, 1, NULL) == XCO_SEND_BLOCKED); 912 } 913 914 static void test_wait_or_cancel_send_op(void) { 915 /* Selectable send under cancel: cancel wins, the send_op is still 916 * parked on the chan — xco_chan_send_op_deinit must release it so the 917 * sender's value is not silently dropped from the chan's list. */ 918 xco_chan_t ch; xco_chan_init(&ch); 919 xco_cancel_t c; xco_cancel_init(&c); 920 921 xco_chan_send_op_t op; 922 xco_chan_send_op_init(&op, &ch, 0xABBA); 923 assert(!op.done.set); 924 assert(ch.send_head == &op.qsw.sw.base); 925 926 xco_select_event_t sel; 927 xco_select_input_t inputs[2]; 928 xco_wait_or_cancel(&sel, inputs, &op.done.base, &c); 929 930 xco_cancel_set(&c); /* fires sel synchronously */ 931 uintptr_t v; 932 assert(xco_event_poll(&sel.done.base, &v, NULL)); 933 assert(v == XCO_WAIT_CANCELLED); 934 935 /* Send didn't happen — op still parked on the chan. The select 936 * winning doesn't drain the chan's send list; deinit does. */ 937 assert(ch.send_head == &op.qsw.sw.base); 938 xco_select_event_deinit(&sel); 939 xco_chan_send_op_deinit(&op); 940 assert(ch.send_head == NULL); 941 } 942 943 /* ---- Timers -------------------------------------------------------- */ 944 945 static void test_timer_basic(void) { 946 /* Insert one timer; advance past its deadline; fire payload is the 947 * deadline value. */ 948 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 949 xco_timer_t t; 950 xco_timer_init(&t, &h.base, 100); 951 assert(!xco_timer_fired(&t)); 952 assert(t.in_heap); 953 954 /* Not yet expired: advance is a no-op. */ 955 xco_timers_advance(&h.base, 50); 956 assert(!xco_timer_fired(&t)); 957 958 /* Expired: fires. */ 959 xco_timers_advance(&h.base, 100); 960 assert(xco_timer_fired(&t)); 961 assert(!t.in_heap); 962 963 uintptr_t v; 964 assert(xco_event_poll(xco_timer_event(&t), &v, NULL)); 965 assert(v == 100); 966 967 /* Idempotent deinit (already fired). */ 968 xco_timer_deinit(&t); 969 } 970 971 static void test_timer_peek(void) { 972 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 973 assert(xco_timers_peek(&h.base) == UINT64_MAX); 974 975 xco_timer_t a, b, c; 976 xco_timer_init(&a, &h.base, 300); 977 xco_timer_init(&b, &h.base, 100); 978 xco_timer_init(&c, &h.base, 200); 979 980 assert(xco_timers_peek(&h.base) == 100); /* earliest deadline */ 981 982 xco_timers_advance(&h.base, 250); 983 /* b and c fired; a remains. */ 984 assert(xco_timer_fired(&b)); 985 assert(xco_timer_fired(&c)); 986 assert(!xco_timer_fired(&a)); 987 assert(xco_timers_peek(&h.base) == 300); 988 989 xco_timer_deinit(&a); 990 xco_timer_deinit(&b); 991 xco_timer_deinit(&c); 992 } 993 994 static void test_timer_cancel(void) { 995 /* Cancel before fire; later advance must not fire it. */ 996 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 997 xco_timer_t a, b, c; 998 xco_timer_init(&a, &h.base, 100); 999 xco_timer_init(&b, &h.base, 200); 1000 xco_timer_init(&c, &h.base, 300); 1001 1002 xco_timer_deinit(&b); /* cancel middle */ 1003 assert(!b.in_heap); 1004 assert(!xco_timer_fired(&b)); 1005 1006 xco_timers_advance(&h.base, 1000); 1007 assert(xco_timer_fired(&a)); 1008 assert(!xco_timer_fired(&b)); /* cancelled — never fired */ 1009 assert(xco_timer_fired(&c)); 1010 1011 xco_timer_deinit(&a); 1012 xco_timer_deinit(&c); 1013 } 1014 1015 static void test_timer_park_wake(void) { 1016 /* A waiter parked on a timer wakes when the timer fires. */ 1017 xco_runtime_t rt; xco_rt_init(&rt); 1018 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1019 xco_timer_t t; 1020 xco_timer_init(&t, &h.base, 500); 1021 1022 waiter_t w; waiter_init(&w, &rt, xco_timer_event(&t)); 1023 xco_step(&w.base, 0); 1024 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1025 assert(t.done.waiters == &w.sw.base); 1026 1027 xco_timers_advance(&h.base, 500); 1028 /* fire enqueued the step. */ 1029 assert(rt.head != NULL); 1030 xco_rt_run(&rt, 500); 1031 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1032 assert(w.got == 500); /* deadline as payload */ 1033 1034 xco_timer_deinit(&t); 1035 } 1036 1037 static void test_timer_select(void) { 1038 /* Compose a timer into a select. */ 1039 xco_runtime_t rt; xco_rt_init(&rt); 1040 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1041 xco_timer_t t; xco_timer_init(&t, &h.base, 200); 1042 xco_latch_t l; xco_latch_init(&l); 1043 1044 xco_select_event_t sel; 1045 xco_select_input_t inputs[2]; 1046 xco_event_t *srcs[2] = {xco_timer_event(&t), &l.base}; 1047 xco_select_event_init(&sel, inputs, 2, srcs); 1048 1049 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 1050 xco_step(&w.base, 0); 1051 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1052 1053 xco_timers_advance(&h.base, 200); 1054 xco_rt_run(&rt, 200); 1055 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1056 assert(w.got == 0); /* timer (input 0) won */ 1057 assert(inputs[0].value == 200); /* deadline captured */ 1058 assert(l.waiters == NULL); /* loser disarmed */ 1059 1060 xco_select_event_deinit(&sel); 1061 xco_timer_deinit(&t); 1062 } 1063 1064 static void test_timer_pairing_heap_order(void) { 1065 /* Insert many timers with non-sorted deadlines; advance must pop 1066 * them in deadline order. Stress the heap structure. */ 1067 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1068 enum { N = 32 }; 1069 xco_timer_t ts[N]; 1070 /* A scrambled sequence — interleave halves so the insert order 1071 * exercises the heap's restructuring. */ 1072 uint64_t deadlines[N] = { 1073 17, 3, 25, 9, 31, 1, 28, 12, 1074 20, 7, 22, 5, 30, 14, 27, 10, 1075 16, 2, 24, 8, 32, 11, 29, 13, 1076 19, 6, 21, 4, 26, 15, 23, 18, 1077 }; 1078 1079 for (int i = 0; i < N; i++) xco_timer_init(&ts[i], &h.base, deadlines[i]); 1080 1081 /* Advance one tick at a time; verify exactly one timer fires per 1082 * matching deadline (since deadlines 1..32 are unique). */ 1083 int fired = 0; 1084 for (uint64_t now = 1; now <= N; now++) { 1085 xco_timers_advance(&h.base, now); 1086 int fired_now = 0; 1087 for (int i = 0; i < N; i++) { 1088 if (xco_timer_fired(&ts[i]) && ts[i].deadline == now) fired_now++; 1089 } 1090 assert(fired_now == 1); 1091 fired++; 1092 } 1093 assert(fired == N); 1094 for (int i = 0; i < N; i++) assert(xco_timer_fired(&ts[i])); 1095 } 1096 1097 static void test_timer_cancel_stress(void) { 1098 /* Insert N timers; cancel every other one; advance and verify only 1099 * the survivors fire, in order. Exercises non-root cancel paths. */ 1100 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1101 enum { N = 16 }; 1102 xco_timer_t ts[N]; 1103 /* Deadlines: 1..N. Inserted in order so the heap shape is a long 1104 * single-spine; cancels hit non-root nodes. */ 1105 for (int i = 0; i < N; i++) xco_timer_init(&ts[i], &h.base, (uint64_t)(i + 1)); 1106 1107 /* Cancel even indices (deadlines 2,4,6,...). */ 1108 for (int i = 0; i < N; i += 2) xco_timer_deinit(&ts[i]); 1109 1110 xco_timers_advance(&h.base, N); 1111 for (int i = 0; i < N; i++) { 1112 if (i % 2 == 0) assert(!xco_timer_fired(&ts[i])); 1113 else assert( xco_timer_fired(&ts[i])); 1114 } 1115 1116 for (int i = 1; i < N; i += 2) xco_timer_deinit(&ts[i]); 1117 } 1118 1119 static void test_timer_deinit_idempotent(void) { 1120 /* Deinit after fire is a no-op; deinit twice (without fire) is too. */ 1121 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1122 xco_timer_t a, b; 1123 xco_timer_init(&a, &h.base, 10); 1124 xco_timer_init(&b, &h.base, 20); 1125 1126 xco_timers_advance(&h.base, 10); 1127 assert(xco_timer_fired(&a) && !xco_timer_fired(&b)); 1128 xco_timer_deinit(&a); /* already fired */ 1129 xco_timer_deinit(&a); /* twice — still no-op */ 1130 1131 xco_timer_deinit(&b); /* cancel before fire */ 1132 xco_timer_deinit(&b); /* twice — must not corrupt */ 1133 xco_timers_advance(&h.base, 1000); 1134 assert(!xco_timer_fired(&b)); /* never fired */ 1135 } 1136 1137 /* ---- Timeout ------------------------------------------------------- */ 1138 1139 static void test_timeout_fires(void) { 1140 /* Bare timeout: advance past deadline, cancel becomes set. */ 1141 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1142 xco_timeout_t to; 1143 xco_timeout_init(&to, &h.base, 100); 1144 assert(!xco_cancel_is_set(&to.cancel)); 1145 1146 xco_timers_advance(&h.base, 100); 1147 assert(xco_cancel_is_set(&to.cancel)); /* bridge fired */ 1148 assert(xco_timer_fired(&to.timer)); 1149 1150 xco_timeout_deinit(&to); /* idempotent */ 1151 } 1152 1153 static void test_timeout_deinit_before_fire(void) { 1154 /* Deinit a timeout before its deadline: cancel must remain unset 1155 * and the timer must be removed from the source so a later advance 1156 * doesn't fire freed memory. */ 1157 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1158 xco_timeout_t to; 1159 xco_timeout_init(&to, &h.base, 100); 1160 xco_timeout_deinit(&to); 1161 assert(!xco_cancel_is_set(&to.cancel)); 1162 1163 /* Advance past the original deadline: nothing left to fire. */ 1164 xco_timers_advance(&h.base, 1000); 1165 assert(!xco_cancel_is_set(&to.cancel)); 1166 } 1167 1168 static void test_timeout_with_wait_or_cancel(void) { 1169 /* Canonical pattern: a waiter blocks on (ev, timeout.cancel). The 1170 * timeout fires first; waiter resumes with XCO_WAIT_CANCELLED. */ 1171 xco_runtime_t rt; xco_rt_init(&rt); 1172 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1173 xco_rt_attach_timers(&rt, &h.base); 1174 1175 xco_latch_t ev; xco_latch_init(&ev); 1176 xco_timeout_t to; 1177 xco_timeout_init(&to, &h.base, 50); 1178 1179 xco_select_event_t sel; 1180 xco_select_input_t inputs[2]; 1181 xco_wait_or_cancel(&sel, inputs, &ev.base, &to.cancel); 1182 1183 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 1184 xco_step(&w.base, 0); 1185 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1186 1187 /* xco_rt_run advances the timer source itself: now=50 fires the timer, 1188 * which fires the bridge, which sets cancel, which fires sel, 1189 * which enqueues the waiter — all in one xco_rt_run call. */ 1190 xco_rt_run(&rt, 50); 1191 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1192 assert(w.got == XCO_WAIT_CANCELLED); 1193 1194 xco_select_event_deinit(&sel); 1195 xco_timeout_deinit(&to); 1196 } 1197 1198 static void test_timeout_ev_wins(void) { 1199 /* Event wins the race; timeout should be deinit'd cleanly without 1200 * having fired. */ 1201 xco_runtime_t rt; xco_rt_init(&rt); 1202 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1203 xco_rt_attach_timers(&rt, &h.base); 1204 1205 xco_latch_t ev; xco_latch_init(&ev); 1206 xco_timeout_t to; 1207 xco_timeout_init(&to, &h.base, 1000); 1208 1209 xco_select_event_t sel; 1210 xco_select_input_t inputs[2]; 1211 xco_wait_or_cancel(&sel, inputs, &ev.base, &to.cancel); 1212 1213 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 1214 xco_step(&w.base, 0); 1215 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1216 1217 xco_latch_set(&ev, 0xF00D); 1218 xco_rt_run(&rt, 50); /* now < deadline; timer untouched */ 1219 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1220 assert(w.got == XCO_WAIT_OK); 1221 assert(inputs[XCO_WAIT_OK].value == 0xF00D); 1222 assert(!xco_cancel_is_set(&to.cancel)); 1223 1224 xco_select_event_deinit(&sel); 1225 xco_timeout_deinit(&to); 1226 /* Subsequent advance must not fire anything (timer was removed). */ 1227 xco_rt_run(&rt, 10000); 1228 assert(!xco_cancel_is_set(&to.cancel)); 1229 } 1230 1231 /* ---- xco_rt_run + timer integration ------------------------------------ */ 1232 1233 static void test_rt_run_advances_timers(void) { 1234 /* A waiter parked on a timer; xco_rt_run with now=deadline advances the 1235 * source, fires the timer, drains the waiter — all in one call. */ 1236 xco_runtime_t rt; xco_rt_init(&rt); 1237 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 1238 xco_rt_attach_timers(&rt, &h.base); 1239 1240 xco_timer_t t; xco_timer_init(&t, &h.base, 42); 1241 waiter_t w; waiter_init(&w, &rt, xco_timer_event(&t)); 1242 xco_step(&w.base, 0); 1243 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1244 assert(rt.head == NULL); /* parked, not queued */ 1245 1246 xco_rt_run(&rt, 42); 1247 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1248 assert(w.got == 42); 1249 assert(rt.head == NULL); 1250 1251 xco_timer_deinit(&t); 1252 } 1253 1254 static void test_rt_run_no_timers(void) { 1255 /* Without an attached timer source, xco_rt_run(rt, now) is the pure 1256 * drainer regardless of `now`. */ 1257 xco_runtime_t rt; xco_rt_init(&rt); 1258 xco_latch_t l; xco_latch_init(&l); 1259 1260 waiter_t w; waiter_init(&w, &rt, &l.base); 1261 xco_step(&w.base, 0); 1262 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1263 1264 xco_latch_set(&l, 7); 1265 xco_rt_run(&rt, 99999); /* now ignored */ 1266 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1267 assert(w.got == 7); 1268 } 1269 1270 /* ---- Semaphore ----------------------------------------------------- */ 1271 1272 /* Acquirer state machine: one-shot acquire of 1 permit. Mirrors waiter_t 1273 * but specialized for the "permit handed at fire time" semantics — the 1274 * resume is itself the proof of acquisition. */ 1275 typedef struct { 1276 xco_mach_t base; 1277 xco_semaphore_t *sem; 1278 xco_runtime_t *rt; 1279 xco_waker_t sw; 1280 int phase; 1281 bool got; 1282 } sem_acquirer_t; 1283 1284 static xco_step_result_t sem_acquirer_step(xco_mach_t *s, uintptr_t v) { 1285 sem_acquirer_t *a = (sem_acquirer_t *)s; 1286 (void)v; 1287 switch (a->phase) { 1288 case 0: 1289 if (xco_event_poll(xco_semaphore_event(a->sem), NULL, NULL)) { 1290 a->got = true; 1291 a->phase = 2; 1292 return (xco_step_result_t){0, XCO_STEP_DEAD}; 1293 } 1294 xco_waker_init(&a->sw, a->rt, &a->base); 1295 xco_event_poll(xco_semaphore_event(a->sem), NULL, &a->sw.base); 1296 a->phase = 1; 1297 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 1298 case 1: 1299 a->got = true; 1300 a->phase = 2; 1301 return (xco_step_result_t){0, XCO_STEP_DEAD}; 1302 } 1303 __builtin_unreachable(); 1304 } 1305 1306 static void sem_acquirer_init(sem_acquirer_t *a, xco_runtime_t *rt, xco_semaphore_t *s) { 1307 a->base = (xco_mach_t){.step = sem_acquirer_step, .status = XCO_STEP_INIT}; 1308 a->sem = s; 1309 a->rt = rt; 1310 a->phase = 0; 1311 a->got = false; 1312 } 1313 1314 static void test_semaphore_inline_acquire(void) { 1315 /* permits > 0: try succeeds and decrements. */ 1316 xco_semaphore_t s; xco_semaphore_init(&s, 2); 1317 assert(xco_event_poll(xco_semaphore_event(&s), NULL, NULL)); 1318 assert(s.permits == 1); 1319 assert(xco_event_poll(xco_semaphore_event(&s), NULL, NULL)); 1320 assert(s.permits == 0); 1321 assert(!xco_event_poll(xco_semaphore_event(&s), NULL, NULL)); 1322 assert(s.permits == 0); 1323 } 1324 1325 static void test_semaphore_park_then_release(void) { 1326 /* Empty sem: acquirer parks; release wakes it with a permit. */ 1327 xco_runtime_t rt; xco_rt_init(&rt); 1328 xco_semaphore_t s; xco_semaphore_init(&s, 0); 1329 1330 sem_acquirer_t a; sem_acquirer_init(&a, &rt, &s); 1331 xco_step(&a.base, 0); 1332 assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED); 1333 assert(s.head == &a.sw.base); 1334 1335 xco_semaphore_release(&s, 1); 1336 /* Release prefers parked waiters: permit went straight to a, count stays 0. */ 1337 assert(s.permits == 0); 1338 assert(s.head == NULL); 1339 1340 xco_rt_run(&rt, 0); 1341 assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); 1342 assert(a.got); 1343 } 1344 1345 static void test_semaphore_fifo(void) { 1346 /* Three parked acquirers; release(2) wakes the first two in order; 1347 * the third stays parked. release(1) again wakes it. */ 1348 xco_runtime_t rt; xco_rt_init(&rt); 1349 xco_semaphore_t s; xco_semaphore_init(&s, 0); 1350 1351 sem_acquirer_t a[3]; 1352 for (int i = 0; i < 3; i++) { 1353 sem_acquirer_init(&a[i], &rt, &s); 1354 xco_step(&a[i].base, 0); 1355 } 1356 assert(s.head == &a[0].sw.base); 1357 assert(s.tail == &a[2].sw.base); 1358 1359 xco_semaphore_release(&s, 2); 1360 xco_rt_run(&rt, 0); 1361 assert(xco_mach_status(&a[0].base) == XCO_STEP_DEAD); 1362 assert(xco_mach_status(&a[1].base) == XCO_STEP_DEAD); 1363 assert(xco_mach_status(&a[2].base) == XCO_STEP_SUSPENDED); 1364 assert(s.head == &a[2].sw.base); 1365 1366 xco_semaphore_release(&s, 1); 1367 xco_rt_run(&rt, 0); 1368 assert(xco_mach_status(&a[2].base) == XCO_STEP_DEAD); 1369 assert(s.head == NULL); 1370 assert(s.permits == 0); 1371 } 1372 1373 static void test_semaphore_release_overflow_to_count(void) { 1374 /* release(n) where n > waiters: hand to all waiters first, then add 1375 * the leftover to permits. */ 1376 xco_runtime_t rt; xco_rt_init(&rt); 1377 xco_semaphore_t s; xco_semaphore_init(&s, 0); 1378 1379 sem_acquirer_t a; sem_acquirer_init(&a, &rt, &s); 1380 xco_step(&a.base, 0); 1381 1382 xco_semaphore_release(&s, 5); 1383 /* a got 1; 4 leftover sat as permits. */ 1384 assert(s.permits == 4); 1385 xco_rt_run(&rt, 0); 1386 assert(a.got); 1387 assert(s.permits == 4); 1388 } 1389 1390 static void test_semaphore_select_acquire(void) { 1391 /* Compose a sem acquire with a cancel via xco_wait_or_cancel: cancel wins. */ 1392 xco_runtime_t rt; xco_rt_init(&rt); 1393 xco_semaphore_t s; xco_semaphore_init(&s, 0); 1394 xco_cancel_t c; xco_cancel_init(&c); 1395 1396 xco_select_event_t sel; 1397 xco_select_input_t inputs[2]; 1398 xco_wait_or_cancel(&sel, inputs, xco_semaphore_event(&s), &c); 1399 1400 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 1401 xco_step(&w.base, 0); 1402 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1403 assert(s.head != NULL); /* sem-side waiter parked */ 1404 1405 xco_cancel_set(&c); 1406 xco_rt_run(&rt, 0); 1407 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1408 assert(w.got == XCO_WAIT_CANCELLED); 1409 assert(s.head == NULL); /* xco_select_input_fire detached us */ 1410 1411 xco_select_event_deinit(&sel); 1412 /* No leak: a future release does not over-grant past the count. */ 1413 xco_semaphore_release(&s, 1); 1414 assert(s.permits == 1); 1415 } 1416 1417 static void test_semaphore_binary_mutex(void) { 1418 /* Binary semaphore as mutex: init permits=1; first take succeeds inline, 1419 * second parks, release on the way out wakes the waiter. */ 1420 xco_runtime_t rt; xco_rt_init(&rt); 1421 xco_semaphore_t mu; xco_semaphore_init(&mu, 1); 1422 1423 /* Holder takes it inline. */ 1424 assert(xco_event_poll(xco_semaphore_event(&mu), NULL, NULL)); 1425 assert(mu.permits == 0); 1426 1427 /* Contender parks. */ 1428 sem_acquirer_t b; sem_acquirer_init(&b, &rt, &mu); 1429 xco_step(&b.base, 0); 1430 assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); 1431 1432 /* Holder releases on exit. */ 1433 xco_semaphore_release(&mu, 1); 1434 xco_rt_run(&rt, 0); 1435 assert(xco_mach_status(&b.base) == XCO_STEP_DEAD); 1436 assert(b.got); 1437 } 1438 1439 /* ---- Queue --------------------------------------------------------- */ 1440 1441 /* Queue sender: same shape as the chan sender_t. */ 1442 typedef struct { 1443 xco_mach_t base; 1444 xco_queue_t *q; 1445 xco_runtime_t *rt; 1446 xco_queue_send_waiter_t qsw; 1447 uintptr_t value; 1448 int phase; 1449 bool done; 1450 } queue_sender_t; 1451 1452 static xco_step_result_t queue_sender_step(xco_mach_t *s, uintptr_t v) { 1453 queue_sender_t *snd = (queue_sender_t *)s; 1454 (void)v; 1455 switch (snd->phase) { 1456 case 0: 1457 xco_queue_send_waiter_init(&snd->qsw, snd->rt, &snd->base); 1458 switch (xco_queue_send_poll(snd->q, snd->value, &snd->qsw)) { 1459 case XCO_QSEND_ACCEPTED: 1460 case XCO_QSEND_CLOSED: 1461 snd->done = true; 1462 snd->phase = 2; 1463 return (xco_step_result_t){0, XCO_STEP_DEAD}; 1464 case XCO_QSEND_BLOCKED: 1465 snd->phase = 1; 1466 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 1467 } 1468 __builtin_unreachable(); 1469 case 1: 1470 snd->done = true; 1471 snd->phase = 2; 1472 return (xco_step_result_t){0, XCO_STEP_DEAD}; 1473 } 1474 __builtin_unreachable(); 1475 } 1476 1477 static void queue_sender_init(queue_sender_t *s, xco_runtime_t *rt, xco_queue_t *q, uintptr_t value) { 1478 s->base = (xco_mach_t){.step = queue_sender_step, .status = XCO_STEP_INIT}; 1479 s->q = q; 1480 s->rt = rt; 1481 s->value = value; 1482 s->phase = 0; 1483 s->done = false; 1484 } 1485 1486 static void test_queue_block_buffered(void) { 1487 /* cap=3, BLOCK: three sends fill the buffer; three recvs drain in order. */ 1488 xco_runtime_t rt; xco_rt_init(&rt); 1489 uintptr_t buf[3]; 1490 xco_queue_t q; xco_queue_init(&q, buf, 3, XCO_QUEUE_BLOCK); 1491 1492 assert(xco_queue_send_poll(&q, 10, NULL) == XCO_QSEND_ACCEPTED); 1493 assert(xco_queue_send_poll(&q, 20, NULL) == XCO_QSEND_ACCEPTED); 1494 assert(xco_queue_send_poll(&q, 30, NULL) == XCO_QSEND_ACCEPTED); 1495 assert(q.len == 3); 1496 /* Buffer full under BLOCK: pure-try poll reports BLOCKED. */ 1497 assert(xco_queue_send_poll(&q, 40, NULL) == XCO_QSEND_BLOCKED); 1498 1499 uintptr_t v; 1500 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 10); 1501 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 20); 1502 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 30); 1503 assert(!xco_event_poll(xco_queue_recv_event(&q), &v, NULL)); /* empty */ 1504 assert(q.len == 0); 1505 (void)rt; 1506 } 1507 1508 static void test_queue_block_sender_parks(void) { 1509 /* Buffer fills, next sender parks; a recv unblocks it. */ 1510 xco_runtime_t rt; xco_rt_init(&rt); 1511 uintptr_t buf[2]; 1512 xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_BLOCK); 1513 1514 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 1515 assert(xco_queue_send_poll(&q, 2, NULL) == XCO_QSEND_ACCEPTED); 1516 1517 queue_sender_t s; queue_sender_init(&s, &rt, &q, 3); 1518 xco_step(&s.base, 0); 1519 assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); 1520 assert(q.send_head == &s.qsw.sw.base); 1521 1522 /* Recv pops 1; 3 should slot into the buffer and the parked sender wakes. */ 1523 uintptr_t v; 1524 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 1); 1525 xco_rt_run(&rt, 0); 1526 assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); 1527 assert(s.done); 1528 assert(q.send_head == NULL); 1529 1530 /* Buffer should now hold 2, 3. */ 1531 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 2); 1532 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 3); 1533 assert(q.len == 0); 1534 } 1535 1536 static void test_queue_drop_newest(void) { 1537 /* When full, send_try returns true but silently discards. */ 1538 uintptr_t buf[2]; 1539 xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_DROP_NEWEST); 1540 1541 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 1542 assert(xco_queue_send_poll(&q, 2, NULL) == XCO_QSEND_ACCEPTED); 1543 assert(xco_queue_send_poll(&q, 3, NULL) == XCO_QSEND_ACCEPTED); /* full → drop 3 */ 1544 assert(xco_queue_send_poll(&q, 4, NULL) == XCO_QSEND_ACCEPTED); /* still full → drop 4 */ 1545 1546 uintptr_t v; 1547 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 1); 1548 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 2); 1549 assert(!xco_event_poll(xco_queue_recv_event(&q), &v, NULL)); 1550 } 1551 1552 static void test_queue_drop_oldest(void) { 1553 /* When full, send_try returns true and evicts head, pushes new tail. */ 1554 uintptr_t buf[2]; 1555 xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_DROP_OLDEST); 1556 1557 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 1558 assert(xco_queue_send_poll(&q, 2, NULL) == XCO_QSEND_ACCEPTED); 1559 assert(xco_queue_send_poll(&q, 3, NULL) == XCO_QSEND_ACCEPTED); /* full → evict 1, buffer = [2, 3] */ 1560 assert(xco_queue_send_poll(&q, 4, NULL) == XCO_QSEND_ACCEPTED); /* full → evict 2, buffer = [3, 4] */ 1561 1562 uintptr_t v; 1563 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 3); 1564 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 4); 1565 assert(!xco_event_poll(xco_queue_recv_event(&q), &v, NULL)); 1566 } 1567 1568 static void test_queue_direct_handoff(void) { 1569 /* Receiver parked; send_try hands off directly, bypassing the buffer. 1570 * Works the same regardless of policy — exercise BLOCK here. */ 1571 xco_runtime_t rt; xco_rt_init(&rt); 1572 uintptr_t buf[2]; 1573 xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_BLOCK); 1574 1575 waiter_t r; waiter_init(&r, &rt, xco_queue_recv_event(&q)); 1576 xco_step(&r.base, 0); 1577 assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); 1578 assert(q.recv_head == &r.sw.base); 1579 1580 assert(xco_queue_send_poll(&q, 0xCAFE, NULL) == XCO_QSEND_ACCEPTED); 1581 assert(q.len == 0); /* didn't touch the buffer */ 1582 assert(q.recv_head == NULL); 1583 1584 xco_rt_run(&rt, 0); 1585 assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); 1586 assert(r.got == 0xCAFE); 1587 } 1588 1589 static void test_queue_recv_blocks_then_drains_buffered(void) { 1590 /* Receivers wait when empty. After buffer warms up, recvs pop FIFO 1591 * — no value-skipping when both sides race past one another. */ 1592 xco_runtime_t rt; xco_rt_init(&rt); 1593 uintptr_t buf[2]; 1594 xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_BLOCK); 1595 1596 /* 2 receivers parked. */ 1597 waiter_t r1, r2; 1598 waiter_init(&r1, &rt, xco_queue_recv_event(&q)); 1599 waiter_init(&r2, &rt, xco_queue_recv_event(&q)); 1600 xco_step(&r1.base, 0); 1601 xco_step(&r2.base, 0); 1602 1603 /* Two sends → direct handoff to the two parked receivers, not buffered. */ 1604 assert(xco_queue_send_poll(&q, 100, NULL) == XCO_QSEND_ACCEPTED); 1605 assert(xco_queue_send_poll(&q, 200, NULL) == XCO_QSEND_ACCEPTED); 1606 assert(q.len == 0); 1607 1608 xco_rt_run(&rt, 0); 1609 assert(xco_mach_status(&r1.base) == XCO_STEP_DEAD && r1.got == 100); 1610 assert(xco_mach_status(&r2.base) == XCO_STEP_DEAD && r2.got == 200); 1611 } 1612 1613 static void test_queue_select_recv(void) { 1614 /* Queue recv composes into select. */ 1615 xco_runtime_t rt; xco_rt_init(&rt); 1616 uintptr_t buf[1]; 1617 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); 1618 xco_latch_t l; xco_latch_init(&l); 1619 1620 xco_select_event_t sel; 1621 xco_select_input_t inputs[2]; 1622 xco_event_t *srcs[2] = {xco_queue_recv_event(&q), &l.base}; 1623 xco_select_event_init(&sel, inputs, 2, srcs); 1624 1625 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 1626 xco_step(&w.base, 0); 1627 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1628 1629 assert(xco_queue_send_poll(&q, 0xBEEF, NULL) == XCO_QSEND_ACCEPTED); 1630 xco_rt_run(&rt, 0); 1631 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1632 assert(w.got == 0); 1633 assert(inputs[0].value == 0xBEEF); 1634 1635 xco_select_event_deinit(&sel); 1636 } 1637 1638 static void test_queue_send_unpark(void) { 1639 /* Cancel a parked sender; remaining FIFO order intact. */ 1640 xco_runtime_t rt; xco_rt_init(&rt); 1641 uintptr_t buf[1]; 1642 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); 1643 1644 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); /* fills buffer */ 1645 1646 queue_sender_t a, b, d; 1647 queue_sender_init(&a, &rt, &q, 2); 1648 queue_sender_init(&b, &rt, &q, 3); 1649 queue_sender_init(&d, &rt, &q, 4); 1650 xco_step(&a.base, 0); 1651 xco_step(&b.base, 0); 1652 xco_step(&d.base, 0); 1653 1654 xco_queue_send_unpark(&q, &b.qsw); 1655 xco_queue_send_unpark(&q, &b.qsw); /* idempotent */ 1656 1657 /* Drain: 1, then a's 2 (slots into buffer when 1 popped), then d's 4. */ 1658 uintptr_t v; 1659 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 1); 1660 xco_rt_run(&rt, 0); 1661 assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); 1662 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 2); 1663 xco_rt_run(&rt, 0); 1664 assert(xco_mach_status(&d.base) == XCO_STEP_DEAD); 1665 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 4); 1666 /* b never sent its value, stayed SUSPENDED. */ 1667 assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); 1668 } 1669 1670 /* ---- Broadcast ----------------------------------------------------- */ 1671 1672 /* Subscriber: parks on the broadcast, captures fire payload, re-parks 1673 * on resume. n_seen counts how many publishes it received. */ 1674 typedef struct { 1675 xco_mach_t base; 1676 xco_broadcast_t *b; 1677 xco_runtime_t *rt; 1678 xco_waker_t sw; 1679 uintptr_t last; 1680 int n_seen; 1681 int target; /* unsubscribe after this many */ 1682 } bcast_sub_t; 1683 1684 static xco_step_result_t bcast_sub_step(xco_mach_t *s, uintptr_t v) { 1685 bcast_sub_t *b = (bcast_sub_t *)s; 1686 if (b->n_seen > 0) { 1687 /* Resume from a fire — capture and decide whether to re-park. */ 1688 b->last = v; 1689 } 1690 if (b->n_seen >= b->target) { 1691 return (xco_step_result_t){b->last, XCO_STEP_DEAD}; 1692 } 1693 /* Re-park: the runtime returned sw fully detached, so xco_event_poll 1694 * works without re-init. (Init was done once in bcast_sub_init.) */ 1695 xco_event_poll(xco_broadcast_event(b->b), NULL, &b->sw.base); 1696 b->n_seen++; 1697 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 1698 } 1699 1700 static void bcast_sub_init(bcast_sub_t *s, xco_runtime_t *rt, xco_broadcast_t *b, int target) { 1701 s->base = (xco_mach_t){.step = bcast_sub_step, .status = XCO_STEP_INIT}; 1702 s->b = b; 1703 s->rt = rt; 1704 s->last = 0; 1705 s->n_seen = 0; 1706 s->target = target; 1707 xco_waker_init(&s->sw, rt, &s->base); 1708 } 1709 1710 static void test_broadcast_try_always_false(void) { 1711 /* No "ready now" state; subscribers always wait for the next publish. */ 1712 xco_broadcast_t b; xco_broadcast_init(&b); 1713 assert(!xco_event_poll(xco_broadcast_event(&b), NULL, NULL)); 1714 xco_broadcast_publish(&b, 7); 1715 /* Even after a publish, try is still false — value is read via accessor. */ 1716 assert(!xco_event_poll(xco_broadcast_event(&b), NULL, NULL)); 1717 assert(xco_broadcast_has_value(&b)); 1718 assert(xco_broadcast_value(&b) == 7); 1719 } 1720 1721 static void test_broadcast_publish_wakes_all(void) { 1722 /* Three subscribers parked; one publish wakes all with the value. */ 1723 xco_runtime_t rt; xco_rt_init(&rt); 1724 xco_broadcast_t b; xco_broadcast_init(&b); 1725 1726 bcast_sub_t s[3]; 1727 for (int i = 0; i < 3; i++) { 1728 bcast_sub_init(&s[i], &rt, &b, 1); /* one publish then exit */ 1729 xco_step(&s[i].base, 0); 1730 assert(xco_mach_status(&s[i].base) == XCO_STEP_SUSPENDED); 1731 } 1732 1733 xco_broadcast_publish(&b, 0xBEEF); 1734 assert(b.waiters == NULL); /* drained */ 1735 1736 xco_rt_run(&rt, 0); 1737 for (int i = 0; i < 3; i++) { 1738 assert(xco_mach_status(&s[i].base) == XCO_STEP_DEAD); 1739 assert(s[i].last == 0xBEEF); 1740 } 1741 } 1742 1743 static void test_broadcast_rearm(void) { 1744 /* Subscriber parks for two publishes; receives both values in order. */ 1745 xco_runtime_t rt; xco_rt_init(&rt); 1746 xco_broadcast_t b; xco_broadcast_init(&b); 1747 1748 bcast_sub_t s; bcast_sub_init(&s, &rt, &b, 2); 1749 xco_step(&s.base, 0); 1750 1751 xco_broadcast_publish(&b, 11); 1752 xco_rt_run(&rt, 0); 1753 assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); /* re-armed */ 1754 assert(s.last == 11); 1755 1756 xco_broadcast_publish(&b, 22); 1757 xco_rt_run(&rt, 0); 1758 assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); 1759 assert(s.last == 22); 1760 } 1761 1762 static void test_broadcast_missed_publish(void) { 1763 /* Subscriber not parked at publish time misses it; the next publish 1764 * wakes them with that value (no replay, no coalescing into one). */ 1765 xco_runtime_t rt; xco_rt_init(&rt); 1766 xco_broadcast_t b; xco_broadcast_init(&b); 1767 1768 /* Publish before anyone subscribes — value is in the slot but no one wakes. */ 1769 xco_broadcast_publish(&b, 1); 1770 assert(xco_broadcast_value(&b) == 1); 1771 1772 bcast_sub_t s; bcast_sub_init(&s, &rt, &b, 1); 1773 xco_step(&s.base, 0); 1774 assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); /* didn't see the prior */ 1775 1776 xco_broadcast_publish(&b, 2); 1777 xco_rt_run(&rt, 0); 1778 assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); 1779 assert(s.last == 2); 1780 } 1781 1782 static void test_broadcast_unpark(void) { 1783 /* xco_event_unpark removes a subscriber cleanly. */ 1784 xco_runtime_t rt; xco_rt_init(&rt); 1785 xco_broadcast_t b; xco_broadcast_init(&b); 1786 1787 xco_waker_t sw1, sw2, sw3; 1788 xco_waker_init(&sw1, &rt, (xco_mach_t *)0x1); 1789 xco_waker_init(&sw2, &rt, (xco_mach_t *)0x2); 1790 xco_waker_init(&sw3, &rt, (xco_mach_t *)0x3); 1791 xco_event_poll(xco_broadcast_event(&b), NULL, &sw1.base); 1792 xco_event_poll(xco_broadcast_event(&b), NULL, &sw2.base); 1793 xco_event_poll(xco_broadcast_event(&b), NULL, &sw3.base); 1794 1795 xco_event_unpark(xco_broadcast_event(&b), &sw2.base); 1796 xco_event_unpark(xco_broadcast_event(&b), &sw2.base); /* idempotent */ 1797 1798 int seen1 = 0, seen3 = 0; 1799 for (xco_waiter_t *w = b.waiters; w; w = w->next) { 1800 if (w == &sw1.base) seen1 = 1; 1801 if (w == &sw3.base) seen3 = 1; 1802 assert(w != &sw2.base); 1803 } 1804 assert(seen1 && seen3); 1805 1806 xco_event_unpark(xco_broadcast_event(&b), &sw1.base); 1807 xco_event_unpark(xco_broadcast_event(&b), &sw3.base); 1808 } 1809 1810 /* ---- Task (state-machine xco_step) ------------------------------------ */ 1811 1812 /* A countdown SM that completes after N steps, returning a final value. 1813 * On every step it checks task->cancel via xco_event_poll and bails early 1814 * (winding down to XCO_STEP_DEAD) if it's set. Demonstrates the cooperation 1815 * pattern: cancel is a signal, the SM owns the unwind. */ 1816 typedef struct { 1817 xco_mach_t base; 1818 xco_task_t *task; 1819 int remaining; 1820 uintptr_t final; 1821 } taskbody_t; 1822 1823 static xco_step_result_t taskbody_step(xco_mach_t *s, uintptr_t v) { 1824 taskbody_t *cd = (taskbody_t *)s; 1825 (void)v; 1826 if (xco_task_is_cancelled(cd->task)) { 1827 xco_task_done(cd->task, 0); /* cooperative unwind */ 1828 return (xco_step_result_t){0, XCO_STEP_DEAD}; 1829 } 1830 if (cd->remaining-- == 0) { 1831 xco_task_done(cd->task, cd->final); 1832 return (xco_step_result_t){cd->final, XCO_STEP_DEAD}; 1833 } 1834 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 1835 } 1836 1837 static void taskbody_init(taskbody_t *cd, xco_task_t *t, int n, uintptr_t final) { 1838 cd->base = (xco_mach_t){.step = taskbody_step, .status = XCO_STEP_INIT}; 1839 cd->task = t; 1840 cd->remaining = n; 1841 cd->final = final; 1842 } 1843 1844 static void test_task_state_machine_join(void) { 1845 /* Drive a countdown SM as a task; join via xco_task_done_event. */ 1846 xco_runtime_t rt; xco_rt_init(&rt); 1847 1848 taskbody_t cd; 1849 xco_task_t t; 1850 taskbody_init(&cd, &t, 3, 0xAAAA); 1851 xco_task_init(&t, &cd.base); 1852 1853 assert(!xco_task_finished(&t)); 1854 1855 /* Pump it manually. */ 1856 while (xco_mach_status(&cd.base) != XCO_STEP_DEAD) { 1857 xco_step(&cd.base, 0); 1858 } 1859 assert(xco_task_finished(&t)); 1860 1861 /* A latched join: try the done event for the return value. */ 1862 uintptr_t v; 1863 assert(xco_event_poll(xco_task_done_event(&t), &v, NULL)); 1864 assert(v == 0xAAAA); 1865 } 1866 1867 static void test_task_state_machine_cancel(void) { 1868 /* Cancel mid-flight; SM observes and unwinds. Joiner sees done with 1869 * the cooperative-unwind sentinel value (here, 0). */ 1870 xco_runtime_t rt; xco_rt_init(&rt); 1871 1872 taskbody_t cd; 1873 xco_task_t t; 1874 taskbody_init(&cd, &t, 100, 0xAAAA); 1875 xco_task_init(&t, &cd.base); 1876 1877 /* A second xco_step waiting for completion via xco_wait_or_cancel-style 1878 * shape — here, just a direct waiter on the done event. */ 1879 waiter_t joiner; waiter_init(&joiner, &rt, xco_task_done_event(&t)); 1880 xco_step(&joiner.base, 0); 1881 assert(xco_mach_status(&joiner.base) == XCO_STEP_SUSPENDED); 1882 1883 /* Step the task a few times. */ 1884 xco_step(&cd.base, 0); 1885 xco_step(&cd.base, 0); 1886 assert(!xco_task_finished(&t)); 1887 1888 xco_cancel_set(xco_task_cancel(&t)); 1889 /* Next step observes cancel and dies. */ 1890 xco_step(&cd.base, 0); 1891 assert(xco_mach_status(&cd.base) == XCO_STEP_DEAD); 1892 assert(xco_task_finished(&t)); 1893 1894 /* Joiner wakes with the done payload (0 from the cooperative unwind). */ 1895 xco_rt_run(&rt, 0); 1896 assert(xco_mach_status(&joiner.base) == XCO_STEP_DEAD); 1897 assert(joiner.got == 0); 1898 } 1899 1900 /* ---- Countdown ----------------------------------------------------- */ 1901 1902 static void test_countdown_basic(void) { 1903 xco_countdown_t cd; xco_countdown_init(&cd, 3); 1904 assert(!xco_countdown_fired(&cd)); 1905 xco_countdown_done(&cd); 1906 xco_countdown_done(&cd); 1907 assert(!xco_countdown_fired(&cd)); 1908 xco_countdown_done(&cd); 1909 assert(xco_countdown_fired(&cd)); 1910 1911 uintptr_t v = 0xBAD; 1912 assert(xco_event_poll(xco_countdown_event(&cd), &v, NULL)); 1913 assert(v == 0); 1914 } 1915 1916 static void test_countdown_zero_init(void) { 1917 /* n=0: latch fires immediately. */ 1918 xco_countdown_t cd; xco_countdown_init(&cd, 0); 1919 assert(xco_countdown_fired(&cd)); 1920 } 1921 1922 static void test_countdown_add(void) { 1923 xco_countdown_t cd; xco_countdown_init(&cd, 1); 1924 xco_countdown_add(&cd, 2); 1925 xco_countdown_done(&cd); 1926 xco_countdown_done(&cd); 1927 assert(!xco_countdown_fired(&cd)); 1928 xco_countdown_done(&cd); 1929 assert(xco_countdown_fired(&cd)); 1930 } 1931 1932 static void test_countdown_park_wake(void) { 1933 xco_runtime_t rt; xco_rt_init(&rt); 1934 xco_countdown_t cd; xco_countdown_init(&cd, 2); 1935 1936 waiter_t w; waiter_init(&w, &rt, xco_countdown_event(&cd)); 1937 xco_step(&w.base, 0); 1938 assert(xco_mach_status(&w.base) == XCO_STEP_SUSPENDED); 1939 1940 xco_countdown_done(&cd); 1941 assert(rt.head == NULL); 1942 xco_countdown_done(&cd); 1943 xco_rt_run(&rt, 0); 1944 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 1945 assert(w.got == 0); 1946 } 1947 1948 /* ---- Notify -------------------------------------------------------- */ 1949 1950 static void test_notify_try_always_false(void) { 1951 xco_notify_t n; xco_notify_init(&n); 1952 assert(!xco_event_poll(xco_notify_event(&n), NULL, NULL)); 1953 xco_notify_one(&n); /* empty: no-op */ 1954 xco_notify_all(&n); /* empty: no-op */ 1955 assert(!xco_event_poll(xco_notify_event(&n), NULL, NULL)); 1956 } 1957 1958 static void test_notify_one_fifo(void) { 1959 xco_runtime_t rt; xco_rt_init(&rt); 1960 xco_notify_t n; xco_notify_init(&n); 1961 1962 waiter_t a, b, c; 1963 waiter_init(&a, &rt, xco_notify_event(&n)); 1964 waiter_init(&b, &rt, xco_notify_event(&n)); 1965 waiter_init(&c, &rt, xco_notify_event(&n)); 1966 xco_step(&a.base, 0); xco_step(&b.base, 0); xco_step(&c.base, 0); 1967 1968 xco_notify_one(&n); 1969 xco_rt_run(&rt, 0); 1970 assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); 1971 assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); 1972 1973 xco_notify_one(&n); 1974 xco_rt_run(&rt, 0); 1975 assert(xco_mach_status(&b.base) == XCO_STEP_DEAD); 1976 assert(xco_mach_status(&c.base) == XCO_STEP_SUSPENDED); 1977 1978 xco_notify_one(&n); 1979 xco_rt_run(&rt, 0); 1980 assert(xco_mach_status(&c.base) == XCO_STEP_DEAD); 1981 assert(n.head == NULL); 1982 1983 xco_notify_one(&n); /* empty: no-op */ 1984 assert(rt.head == NULL); 1985 } 1986 1987 static void test_notify_all(void) { 1988 xco_runtime_t rt; xco_rt_init(&rt); 1989 xco_notify_t n; xco_notify_init(&n); 1990 1991 waiter_t a, b, c; 1992 waiter_init(&a, &rt, xco_notify_event(&n)); 1993 waiter_init(&b, &rt, xco_notify_event(&n)); 1994 waiter_init(&c, &rt, xco_notify_event(&n)); 1995 xco_step(&a.base, 0); xco_step(&b.base, 0); xco_step(&c.base, 0); 1996 1997 xco_notify_all(&n); 1998 assert(n.head == NULL); 1999 xco_rt_run(&rt, 0); 2000 assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); 2001 assert(xco_mach_status(&b.base) == XCO_STEP_DEAD); 2002 assert(xco_mach_status(&c.base) == XCO_STEP_DEAD); 2003 } 2004 2005 static void test_notify_select(void) { 2006 xco_runtime_t rt; xco_rt_init(&rt); 2007 xco_notify_t n; xco_notify_init(&n); 2008 xco_latch_t l; xco_latch_init(&l); 2009 2010 xco_select_event_t sel; 2011 xco_select_input_t inputs[2]; 2012 xco_event_t *srcs[2] = {xco_notify_event(&n), &l.base}; 2013 xco_select_event_init(&sel, inputs, 2, srcs); 2014 2015 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 2016 xco_step(&w.base, 0); 2017 2018 xco_notify_one(&n); 2019 xco_rt_run(&rt, 0); 2020 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 2021 assert(w.got == 0); 2022 assert(l.waiters == NULL); 2023 2024 xco_select_event_deinit(&sel); 2025 } 2026 2027 static void test_notify_unpark(void) { 2028 xco_runtime_t rt; xco_rt_init(&rt); 2029 xco_notify_t n; xco_notify_init(&n); 2030 2031 xco_waker_t s1, s2, s3; 2032 xco_waker_init(&s1, &rt, (xco_mach_t *)0x1); 2033 xco_waker_init(&s2, &rt, (xco_mach_t *)0x2); 2034 xco_waker_init(&s3, &rt, (xco_mach_t *)0x3); 2035 xco_event_poll(xco_notify_event(&n), NULL, &s1.base); 2036 xco_event_poll(xco_notify_event(&n), NULL, &s2.base); 2037 xco_event_poll(xco_notify_event(&n), NULL, &s3.base); 2038 2039 xco_event_unpark(xco_notify_event(&n), &s2.base); 2040 xco_event_unpark(xco_notify_event(&n), &s2.base); /* idempotent */ 2041 2042 /* FIFO head is s1; pop s1, then s3 remains. */ 2043 xco_notify_one(&n); 2044 assert(n.head == &s3.base); 2045 xco_event_unpark(xco_notify_event(&n), &s3.base); 2046 assert(n.head == NULL); 2047 } 2048 2049 /* ---- Mutex --------------------------------------------------------- */ 2050 2051 static void test_mutex_basic(void) { 2052 xco_runtime_t rt; xco_rt_init(&rt); 2053 xco_mutex_t mu; xco_mutex_init(&mu); 2054 2055 assert(xco_event_poll(xco_mutex_event(&mu), NULL, NULL)); 2056 assert(!xco_event_poll(xco_mutex_event(&mu), NULL, NULL)); 2057 2058 sem_acquirer_t b; sem_acquirer_init(&b, &rt, &mu); 2059 xco_step(&b.base, 0); 2060 assert(xco_mach_status(&b.base) == XCO_STEP_SUSPENDED); 2061 2062 xco_mutex_release(&mu); 2063 xco_rt_run(&rt, 0); 2064 assert(xco_mach_status(&b.base) == XCO_STEP_DEAD); 2065 assert(b.got); 2066 } 2067 2068 /* ---- Queue send op ------------------------------------------------- */ 2069 2070 static void test_queue_send_op_inline(void) { 2071 /* BLOCK with room: init delivers immediately, payload 1. */ 2072 uintptr_t buf[2]; 2073 xco_queue_t q; xco_queue_init(&q, buf, 2, XCO_QUEUE_BLOCK); 2074 2075 xco_queue_send_op_t op; 2076 xco_queue_send_op_init(&op, &q, 0xAA); 2077 assert(op.done.set); 2078 uintptr_t v; 2079 assert(xco_event_poll(&op.done.base, &v, NULL)); 2080 assert(v == 1); 2081 assert(q.len == 1); 2082 2083 xco_queue_send_op_deinit(&op); 2084 } 2085 2086 static void test_queue_send_op_blocks(void) { 2087 /* BLOCK full: parks. Recv frees a slot, sender drains, op fires. */ 2088 uintptr_t buf[1]; 2089 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); 2090 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 2091 2092 xco_queue_send_op_t op; 2093 xco_queue_send_op_init(&op, &q, 2); 2094 assert(!op.done.set); 2095 assert(q.send_head == &op.qsw.sw.base); 2096 2097 uintptr_t v; 2098 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 1); 2099 assert(op.done.set); 2100 uintptr_t pv; 2101 assert(xco_event_poll(&op.done.base, &pv, NULL)); 2102 assert(pv == 1); 2103 assert(xco_event_poll(xco_queue_recv_event(&q), &v, NULL) && v == 2); 2104 2105 xco_queue_send_op_deinit(&op); 2106 } 2107 2108 static void test_queue_send_op_drop_inline(void) { 2109 /* DROP_NEWEST: op resolves inline regardless of fullness. */ 2110 uintptr_t buf[1]; 2111 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_DROP_NEWEST); 2112 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 2113 2114 xco_queue_send_op_t op; 2115 xco_queue_send_op_init(&op, &q, 2); /* dropped */ 2116 assert(op.done.set); 2117 uintptr_t v; 2118 assert(xco_event_poll(&op.done.base, &v, NULL)); 2119 assert(v == 1); 2120 2121 xco_queue_send_op_deinit(&op); 2122 } 2123 2124 static void test_queue_send_op_select_loses(void) { 2125 xco_runtime_t rt; xco_rt_init(&rt); 2126 uintptr_t buf[1]; 2127 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); 2128 xco_latch_t l; xco_latch_init(&l); 2129 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 2130 2131 xco_queue_send_op_t op; 2132 xco_queue_send_op_init(&op, &q, 2); 2133 assert(q.send_head == &op.qsw.sw.base); 2134 2135 xco_select_event_t sel; 2136 xco_select_input_t inputs[2]; 2137 xco_event_t *srcs[2] = {&op.done.base, &l.base}; 2138 xco_select_event_init(&sel, inputs, 2, srcs); 2139 2140 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 2141 xco_step(&w.base, 0); 2142 2143 xco_latch_set(&l, 0xCAFE); 2144 xco_rt_run(&rt, 0); 2145 assert(w.got == 1); 2146 assert(q.send_head == &op.qsw.sw.base); 2147 2148 xco_select_event_deinit(&sel); 2149 xco_queue_send_op_deinit(&op); 2150 assert(q.send_head == NULL); 2151 } 2152 2153 /* ---- Chan close ---------------------------------------------------- */ 2154 2155 typedef struct { 2156 xco_mach_t base; 2157 xco_chan_t *c; 2158 xco_runtime_t *rt; 2159 xco_waker_t sw; 2160 int phase; 2161 xco_recv_status_t status; 2162 uintptr_t value; 2163 } chan_rx_t; 2164 2165 static xco_step_result_t chan_rx_step(xco_mach_t *s, uintptr_t v) { 2166 chan_rx_t *r = (chan_rx_t *)s; 2167 (void)v; 2168 switch (r->phase) { 2169 case 0: { 2170 xco_recv_status_t st = xco_chan_recv(r->c, &r->value); 2171 if (st != XCO_RECV_EMPTY) { 2172 r->status = st; 2173 r->phase = 2; 2174 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2175 } 2176 xco_waker_init(&r->sw, r->rt, &r->base); 2177 xco_event_poll(&r->c->recv, NULL, &r->sw.base); 2178 r->phase = 1; 2179 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 2180 } 2181 case 1: 2182 r->status = xco_chan_recv(r->c, &r->value); 2183 r->phase = 2; 2184 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2185 } 2186 __builtin_unreachable(); 2187 } 2188 2189 static void chan_rx_init(chan_rx_t *r, xco_runtime_t *rt, xco_chan_t *c) { 2190 r->base = (xco_mach_t){.step = chan_rx_step, .status = XCO_STEP_INIT}; 2191 r->c = c; 2192 r->rt = rt; 2193 r->phase = 0; 2194 r->status = XCO_RECV_EMPTY; 2195 r->value = 0; 2196 } 2197 2198 static void test_chan_close_empty(void) { 2199 xco_chan_t c; xco_chan_init(&c); 2200 assert(!xco_chan_is_closed(&c)); 2201 xco_chan_close(&c); 2202 assert(xco_chan_is_closed(&c)); 2203 xco_chan_close(&c); /* idempotent */ 2204 2205 uintptr_t v; 2206 assert(xco_chan_recv(&c, &v) == XCO_RECV_CLOSED); 2207 } 2208 2209 static void test_chan_close_wakes_receiver(void) { 2210 xco_runtime_t rt; xco_rt_init(&rt); 2211 xco_chan_t c; xco_chan_init(&c); 2212 2213 chan_rx_t r; chan_rx_init(&r, &rt, &c); 2214 xco_step(&r.base, 0); 2215 assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); 2216 assert(c.recv_head == &r.sw.base); 2217 2218 xco_chan_close(&c); 2219 assert(c.recv_head == NULL); 2220 xco_rt_run(&rt, 0); 2221 assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); 2222 assert(r.status == XCO_RECV_CLOSED); 2223 } 2224 2225 typedef struct { 2226 xco_mach_t base; 2227 xco_chan_t *c; 2228 xco_runtime_t *rt; 2229 xco_chan_send_waiter_t csw; 2230 uintptr_t value; 2231 int phase; 2232 bool done; 2233 bool delivered; 2234 } chan_tx_t; 2235 2236 static xco_step_result_t chan_tx_step(xco_mach_t *s, uintptr_t v) { 2237 chan_tx_t *snd = (chan_tx_t *)s; 2238 (void)v; 2239 switch (snd->phase) { 2240 case 0: 2241 xco_chan_send_waiter_init(&snd->csw, snd->rt, &snd->base); 2242 switch (xco_chan_send_poll(snd->c, snd->value, &snd->csw)) { 2243 case XCO_SEND_DELIVERED: 2244 snd->done = true; 2245 snd->delivered = true; 2246 snd->phase = 2; 2247 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2248 case XCO_SEND_CLOSED: 2249 snd->done = true; 2250 snd->delivered = false; 2251 snd->phase = 2; 2252 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2253 case XCO_SEND_BLOCKED: 2254 snd->phase = 1; 2255 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 2256 } 2257 __builtin_unreachable(); 2258 case 1: 2259 snd->done = true; 2260 snd->delivered = snd->csw.delivered; 2261 snd->phase = 2; 2262 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2263 } 2264 __builtin_unreachable(); 2265 } 2266 2267 static void chan_tx_init(chan_tx_t *snd, xco_runtime_t *rt, xco_chan_t *c, uintptr_t value) { 2268 snd->base = (xco_mach_t){.step = chan_tx_step, .status = XCO_STEP_INIT}; 2269 snd->c = c; 2270 snd->rt = rt; 2271 snd->value = value; 2272 snd->phase = 0; 2273 snd->done = false; 2274 snd->delivered = false; 2275 } 2276 2277 static void test_chan_close_drains_senders(void) { 2278 xco_runtime_t rt; xco_rt_init(&rt); 2279 xco_chan_t c; xco_chan_init(&c); 2280 2281 chan_tx_t s[3]; 2282 for (int i = 0; i < 3; i++) { 2283 chan_tx_init(&s[i], &rt, &c, (uintptr_t)(100 + i)); 2284 xco_step(&s[i].base, 0); 2285 } 2286 assert(c.send_head == &s[0].csw.sw.base); 2287 2288 xco_chan_close(&c); 2289 assert(c.send_head == NULL); 2290 xco_rt_run(&rt, 0); 2291 for (int i = 0; i < 3; i++) { 2292 assert(xco_mach_status(&s[i].base) == XCO_STEP_DEAD); 2293 assert(s[i].done); 2294 assert(!s[i].delivered); 2295 } 2296 } 2297 2298 static void test_chan_send_try_after_close(void) { 2299 xco_chan_t c; xco_chan_init(&c); 2300 xco_chan_close(&c); 2301 assert(xco_chan_send_poll(&c, 1, NULL) == XCO_SEND_CLOSED); 2302 } 2303 2304 static void test_chan_send_op_close_inline(void) { 2305 xco_chan_t c; xco_chan_init(&c); 2306 xco_chan_close(&c); 2307 2308 xco_chan_send_op_t op; 2309 xco_chan_send_op_init(&op, &c, 0xABBA); 2310 assert(op.done.set); 2311 uintptr_t v; 2312 assert(xco_event_poll(&op.done.base, &v, NULL)); 2313 assert(v == 0); /* delivered=false */ 2314 xco_chan_send_op_deinit(&op); 2315 } 2316 2317 static void test_chan_send_op_close_drain(void) { 2318 xco_chan_t c; xco_chan_init(&c); 2319 2320 xco_chan_send_op_t op; 2321 xco_chan_send_op_init(&op, &c, 0xABBA); 2322 assert(!op.done.set); 2323 2324 xco_chan_close(&c); 2325 assert(op.done.set); 2326 uintptr_t v; 2327 assert(xco_event_poll(&op.done.base, &v, NULL)); 2328 assert(v == 0); 2329 xco_chan_send_op_deinit(&op); 2330 } 2331 2332 static void test_chan_send_op_delivered_payload(void) { 2333 xco_chan_t c; xco_chan_init(&c); 2334 2335 xco_chan_send_op_t op; 2336 xco_chan_send_op_init(&op, &c, 0xFEED); 2337 assert(!op.done.set); 2338 2339 uintptr_t v; 2340 assert(xco_chan_recv(&c, &v) == XCO_RECV_GOT); 2341 assert(v == 0xFEED); 2342 assert(op.done.set); 2343 uintptr_t pv; 2344 assert(xco_event_poll(&op.done.base, &pv, NULL)); 2345 assert(pv == 1); 2346 2347 xco_chan_send_op_deinit(&op); 2348 } 2349 2350 /* ---- Queue close --------------------------------------------------- */ 2351 2352 typedef struct { 2353 xco_mach_t base; 2354 xco_queue_t *q; 2355 xco_runtime_t *rt; 2356 xco_waker_t sw; 2357 int phase; 2358 xco_recv_status_t status; 2359 uintptr_t value; 2360 } queue_rx_t; 2361 2362 static xco_step_result_t queue_rx_step(xco_mach_t *s, uintptr_t v) { 2363 queue_rx_t *r = (queue_rx_t *)s; 2364 (void)v; 2365 switch (r->phase) { 2366 case 0: { 2367 xco_recv_status_t st = xco_queue_recv(r->q, &r->value); 2368 if (st != XCO_RECV_EMPTY) { 2369 r->status = st; 2370 r->phase = 2; 2371 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2372 } 2373 xco_waker_init(&r->sw, r->rt, &r->base); 2374 xco_event_poll(xco_queue_recv_event(r->q), NULL, &r->sw.base); 2375 r->phase = 1; 2376 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 2377 } 2378 case 1: 2379 r->status = xco_queue_recv(r->q, &r->value); 2380 r->phase = 2; 2381 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2382 } 2383 __builtin_unreachable(); 2384 } 2385 2386 static void queue_rx_init(queue_rx_t *r, xco_runtime_t *rt, xco_queue_t *q) { 2387 r->base = (xco_mach_t){.step = queue_rx_step, .status = XCO_STEP_INIT}; 2388 r->q = q; 2389 r->rt = rt; 2390 r->phase = 0; 2391 r->status = XCO_RECV_EMPTY; 2392 r->value = 0; 2393 } 2394 2395 static void test_queue_close_drains_buffered_then_eof(void) { 2396 uintptr_t buf[3]; 2397 xco_queue_t q; xco_queue_init(&q, buf, 3, XCO_QUEUE_BLOCK); 2398 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 2399 assert(xco_queue_send_poll(&q, 2, NULL) == XCO_QSEND_ACCEPTED); 2400 2401 xco_queue_close(&q); 2402 assert(xco_queue_is_closed(&q)); 2403 2404 uintptr_t v; 2405 assert(xco_queue_recv(&q, &v) == XCO_RECV_GOT && v == 1); 2406 assert(xco_queue_recv(&q, &v) == XCO_RECV_GOT && v == 2); 2407 assert(xco_queue_recv(&q, &v) == XCO_RECV_CLOSED); 2408 xco_queue_close(&q); /* idempotent */ 2409 assert(xco_queue_recv(&q, &v) == XCO_RECV_CLOSED); 2410 } 2411 2412 static void test_queue_close_wakes_receiver(void) { 2413 xco_runtime_t rt; xco_rt_init(&rt); 2414 uintptr_t buf[1]; 2415 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); 2416 2417 queue_rx_t r; queue_rx_init(&r, &rt, &q); 2418 xco_step(&r.base, 0); 2419 assert(xco_mach_status(&r.base) == XCO_STEP_SUSPENDED); 2420 2421 xco_queue_close(&q); 2422 xco_rt_run(&rt, 0); 2423 assert(xco_mach_status(&r.base) == XCO_STEP_DEAD); 2424 assert(r.status == XCO_RECV_CLOSED); 2425 } 2426 2427 typedef struct { 2428 xco_mach_t base; 2429 xco_queue_t *q; 2430 xco_runtime_t *rt; 2431 xco_queue_send_waiter_t qsw; 2432 uintptr_t value; 2433 int phase; 2434 bool done; 2435 bool delivered; 2436 } queue_tx_t; 2437 2438 static xco_step_result_t queue_tx_step(xco_mach_t *s, uintptr_t v) { 2439 queue_tx_t *snd = (queue_tx_t *)s; 2440 (void)v; 2441 switch (snd->phase) { 2442 case 0: 2443 xco_queue_send_waiter_init(&snd->qsw, snd->rt, &snd->base); 2444 switch (xco_queue_send_poll(snd->q, snd->value, &snd->qsw)) { 2445 case XCO_QSEND_ACCEPTED: 2446 snd->done = true; 2447 snd->delivered = true; 2448 snd->phase = 2; 2449 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2450 case XCO_QSEND_CLOSED: 2451 snd->done = true; 2452 snd->delivered = false; 2453 snd->phase = 2; 2454 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2455 case XCO_QSEND_BLOCKED: 2456 snd->phase = 1; 2457 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 2458 } 2459 __builtin_unreachable(); 2460 case 1: 2461 snd->done = true; 2462 snd->delivered = snd->qsw.delivered; 2463 snd->phase = 2; 2464 return (xco_step_result_t){0, XCO_STEP_DEAD}; 2465 } 2466 __builtin_unreachable(); 2467 } 2468 2469 static void queue_tx_init(queue_tx_t *s, xco_runtime_t *rt, xco_queue_t *q, uintptr_t v) { 2470 s->base = (xco_mach_t){.step = queue_tx_step, .status = XCO_STEP_INIT}; 2471 s->q = q; 2472 s->rt = rt; 2473 s->value = v; 2474 s->phase = 0; 2475 s->done = false; 2476 s->delivered = false; 2477 } 2478 2479 static void test_queue_close_drains_senders(void) { 2480 xco_runtime_t rt; xco_rt_init(&rt); 2481 uintptr_t buf[1]; 2482 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); 2483 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 2484 2485 queue_tx_t s[2]; 2486 for (int i = 0; i < 2; i++) { 2487 queue_tx_init(&s[i], &rt, &q, (uintptr_t)(100 + i)); 2488 xco_step(&s[i].base, 0); 2489 assert(xco_mach_status(&s[i].base) == XCO_STEP_SUSPENDED); 2490 } 2491 2492 xco_queue_close(&q); 2493 xco_rt_run(&rt, 0); 2494 for (int i = 0; i < 2; i++) { 2495 assert(xco_mach_status(&s[i].base) == XCO_STEP_DEAD); 2496 assert(!s[i].delivered); 2497 } 2498 } 2499 2500 static void test_queue_send_try_after_close_block(void) { 2501 uintptr_t buf[1]; 2502 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); 2503 xco_queue_close(&q); 2504 assert(xco_queue_send_poll(&q, 42, NULL) == XCO_QSEND_CLOSED); 2505 } 2506 2507 static void test_queue_send_try_after_close_drop(void) { 2508 /* Closed is closed regardless of policy: DROP_* also reports CLOSED 2509 * (behavior change from the old try API, which returned "accepted" 2510 * after close under DROP_*). */ 2511 uintptr_t buf[1]; 2512 xco_queue_t q1; xco_queue_init(&q1, buf, 1, XCO_QUEUE_DROP_NEWEST); 2513 xco_queue_close(&q1); 2514 assert(xco_queue_send_poll(&q1, 42, NULL) == XCO_QSEND_CLOSED); 2515 2516 uintptr_t buf2[1]; 2517 xco_queue_t q2; xco_queue_init(&q2, buf2, 1, XCO_QUEUE_DROP_OLDEST); 2518 xco_queue_close(&q2); 2519 assert(xco_queue_send_poll(&q2, 42, NULL) == XCO_QSEND_CLOSED); 2520 } 2521 2522 static void test_queue_send_op_close_drain(void) { 2523 uintptr_t buf[1]; 2524 xco_queue_t q; xco_queue_init(&q, buf, 1, XCO_QUEUE_BLOCK); 2525 assert(xco_queue_send_poll(&q, 1, NULL) == XCO_QSEND_ACCEPTED); 2526 2527 xco_queue_send_op_t op; 2528 xco_queue_send_op_init(&op, &q, 2); 2529 assert(!op.done.set); 2530 2531 xco_queue_close(&q); 2532 assert(op.done.set); 2533 uintptr_t v; 2534 assert(xco_event_poll(&op.done.base, &v, NULL)); 2535 assert(v == 0); 2536 xco_queue_send_op_deinit(&op); 2537 } 2538 2539 /* ---- Ticker -------------------------------------------------------- */ 2540 2541 typedef struct { 2542 xco_mach_t base; 2543 xco_ticker_t *t; 2544 xco_runtime_t *rt; 2545 xco_waker_t sw; 2546 int n_seen; 2547 int target; 2548 uint64_t last; 2549 } ticker_sub_t; 2550 2551 static xco_step_result_t ticker_sub_step(xco_mach_t *s, uintptr_t v) { 2552 ticker_sub_t *sub = (ticker_sub_t *)s; 2553 if (sub->n_seen > 0) sub->last = (uint64_t)v; 2554 if (sub->n_seen >= sub->target) return (xco_step_result_t){v, XCO_STEP_DEAD}; 2555 xco_event_poll(xco_ticker_event(sub->t), NULL, &sub->sw.base); 2556 sub->n_seen++; 2557 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 2558 } 2559 2560 static void ticker_sub_init(ticker_sub_t *s, xco_runtime_t *rt, xco_ticker_t *t, int target) { 2561 s->base = (xco_mach_t){.step = ticker_sub_step, .status = XCO_STEP_INIT}; 2562 s->t = t; 2563 s->rt = rt; 2564 s->n_seen = 0; 2565 s->target = target; 2566 s->last = 0; 2567 xco_waker_init(&s->sw, rt, &s->base); 2568 } 2569 2570 static void test_ticker_single_tick(void) { 2571 xco_runtime_t rt; xco_rt_init(&rt); 2572 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 2573 xco_rt_attach_timers(&rt, &h.base); 2574 2575 xco_ticker_t ti; 2576 xco_ticker_init(&ti, &h.base, 100, 100); 2577 2578 ticker_sub_t s; ticker_sub_init(&s, &rt, &ti, 1); 2579 xco_step(&s.base, 0); 2580 assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); 2581 2582 xco_rt_run(&rt, 100); 2583 assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); 2584 assert(s.last == 100); 2585 2586 xco_ticker_deinit(&ti); 2587 } 2588 2589 static void test_ticker_multiple_ticks(void) { 2590 xco_runtime_t rt; xco_rt_init(&rt); 2591 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 2592 xco_rt_attach_timers(&rt, &h.base); 2593 2594 xco_ticker_t ti; 2595 xco_ticker_init(&ti, &h.base, 50, 50); 2596 2597 ticker_sub_t s; ticker_sub_init(&s, &rt, &ti, 2); 2598 xco_step(&s.base, 0); 2599 2600 xco_rt_run(&rt, 50); 2601 assert(xco_mach_status(&s.base) == XCO_STEP_SUSPENDED); 2602 assert(s.last == 50); 2603 2604 xco_rt_run(&rt, 100); 2605 assert(xco_mach_status(&s.base) == XCO_STEP_DEAD); 2606 assert(s.last == 100); /* fired = 50 + period */ 2607 2608 xco_ticker_deinit(&ti); 2609 } 2610 2611 static void test_ticker_deinit_before_fire(void) { 2612 xco_runtime_t rt; xco_rt_init(&rt); 2613 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 2614 xco_rt_attach_timers(&rt, &h.base); 2615 2616 xco_ticker_t ti; 2617 xco_ticker_init(&ti, &h.base, 100, 100); 2618 assert(ti.timer.in_heap); 2619 2620 xco_ticker_deinit(&ti); 2621 assert(!ti.timer.in_heap); 2622 2623 assert(xco_timers_peek(&h.base) == UINT64_MAX); 2624 xco_rt_run(&rt, 10000); 2625 } 2626 2627 static void test_ticker_select(void) { 2628 xco_runtime_t rt; xco_rt_init(&rt); 2629 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 2630 xco_rt_attach_timers(&rt, &h.base); 2631 2632 xco_ticker_t ti; xco_ticker_init(&ti, &h.base, 100, 100); 2633 xco_cancel_t c; xco_cancel_init(&c); 2634 2635 xco_select_event_t sel; 2636 xco_select_input_t inputs[2]; 2637 xco_wait_or_cancel(&sel, inputs, xco_ticker_event(&ti), &c); 2638 2639 waiter_t w; waiter_init(&w, &rt, &sel.done.base); 2640 xco_step(&w.base, 0); 2641 2642 xco_rt_run(&rt, 100); 2643 assert(xco_mach_status(&w.base) == XCO_STEP_DEAD); 2644 assert(w.got == XCO_WAIT_OK); 2645 assert(inputs[XCO_WAIT_OK].value == 100); 2646 2647 xco_select_event_deinit(&sel); 2648 xco_ticker_deinit(&ti); 2649 } 2650 2651 static void test_ticker_try_always_false(void) { 2652 xco_pairing_heap_t h; xco_pairing_heap_init(&h); 2653 xco_ticker_t ti; xco_ticker_init(&ti, &h.base, 100, 100); 2654 assert(!xco_event_poll(xco_ticker_event(&ti), NULL, NULL)); 2655 xco_ticker_deinit(&ti); 2656 } 2657 2658 /* ---- Task group ---------------------------------------------------- */ 2659 2660 static void test_task_group_empty_join(void) { 2661 /* An empty (no-attach) group's join is not fired — fan-in semantics 2662 * require at least one attach. */ 2663 xco_task_group_t g; xco_task_group_init(&g); 2664 assert(!xco_event_poll(xco_task_group_join_event(&g), NULL, NULL)); 2665 } 2666 2667 static void test_task_group_join_after_all_finish(void) { 2668 xco_runtime_t rt; xco_rt_init(&rt); (void)rt; 2669 2670 taskbody_t a_body, b_body; 2671 xco_task_t a, b; 2672 taskbody_init(&a_body, &a, 1, 0xA); 2673 xco_task_init(&a, &a_body.base); 2674 taskbody_init(&b_body, &b, 1, 0xB); 2675 xco_task_init(&b, &b_body.base); 2676 2677 xco_task_group_t g; xco_task_group_init(&g); 2678 xco_group_attach_t sa, sb; 2679 xco_task_group_attach(&g, &a, &sa); 2680 xco_task_group_attach(&g, &b, &sb); 2681 assert(!xco_event_poll(xco_task_group_join_event(&g), NULL, NULL)); 2682 2683 while (xco_mach_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); 2684 assert(xco_task_finished(&a)); 2685 assert(!xco_event_poll(xco_task_group_join_event(&g), NULL, NULL)); 2686 2687 while (xco_mach_status(&b_body.base) != XCO_STEP_DEAD) xco_step(&b_body.base, 0); 2688 assert(xco_task_finished(&b)); 2689 uintptr_t v; 2690 assert(xco_event_poll(xco_task_group_join_event(&g), &v, NULL)); 2691 assert(v == 0); 2692 } 2693 2694 static void test_task_group_cancel_fanout(void) { 2695 xco_runtime_t rt; xco_rt_init(&rt); (void)rt; 2696 2697 taskbody_t a_body, b_body; 2698 xco_task_t a, b; 2699 taskbody_init(&a_body, &a, 100, 0xA); 2700 xco_task_init(&a, &a_body.base); 2701 taskbody_init(&b_body, &b, 100, 0xB); 2702 xco_task_init(&b, &b_body.base); 2703 2704 xco_task_group_t g; xco_task_group_init(&g); 2705 xco_group_attach_t sa, sb; 2706 xco_task_group_attach(&g, &a, &sa); 2707 xco_task_group_attach(&g, &b, &sb); 2708 2709 xco_task_group_cancel(&g); 2710 assert(xco_task_is_cancelled(&a)); 2711 assert(xco_task_is_cancelled(&b)); 2712 assert(xco_cancel_is_set(xco_task_group_cancel_handle(&g))); 2713 2714 while (xco_mach_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); 2715 while (xco_mach_status(&b_body.base) != XCO_STEP_DEAD) xco_step(&b_body.base, 0); 2716 2717 assert(xco_event_poll(xco_task_group_join_event(&g), NULL, NULL)); 2718 } 2719 2720 static void test_task_group_finished_detaches(void) { 2721 taskbody_t a_body, b_body; 2722 xco_task_t a, b; 2723 taskbody_init(&a_body, &a, 1, 0xA); 2724 xco_task_init(&a, &a_body.base); 2725 taskbody_init(&b_body, &b, 100, 0xB); 2726 xco_task_init(&b, &b_body.base); 2727 2728 xco_task_group_t g; xco_task_group_init(&g); 2729 xco_group_attach_t sa, sb; 2730 xco_task_group_attach(&g, &a, &sa); 2731 xco_task_group_attach(&g, &b, &sb); 2732 assert(g.head == &sa && g.tail == &sb); 2733 2734 while (xco_mach_status(&a_body.base) != XCO_STEP_DEAD) xco_step(&a_body.base, 0); 2735 /* a's bridge has fired → sa is detached. */ 2736 assert(g.head == &sb && g.tail == &sb); 2737 assert(sa.next == NULL && sa.prev == NULL); 2738 2739 /* Cancel only touches still-attached b. */ 2740 xco_task_group_cancel(&g); 2741 assert(xco_task_is_cancelled(&b)); 2742 } 2743 2744 /* ---- Runtime test -------------------------------------------------- */ 2745 2746 static void test_runtime_drains(void) { 2747 /* xco_rt_run keeps going until quiescent — including steps queued 2748 * during another step's resumption. Three waiters on one latch 2749 * all reach DEAD in a single xco_rt_run. */ 2750 xco_runtime_t rt; xco_rt_init(&rt); 2751 xco_latch_t l; xco_latch_init(&l); 2752 2753 waiter_t w[3]; 2754 for (int i = 0; i < 3; i++) waiter_init(&w[i], &rt, &l.base); 2755 for (int i = 0; i < 3; i++) xco_step(&w[i].base, 0); 2756 2757 xco_latch_set(&l, 0xC0DE); 2758 xco_rt_run(&rt, 0); 2759 2760 assert(rt.head == NULL); 2761 for (int i = 0; i < 3; i++) { 2762 assert(xco_mach_status(&w[i].base) == XCO_STEP_DEAD); 2763 assert(w[i].got == 0xC0DE); 2764 } 2765 } 2766 2767 int main(void) { 2768 test_latch_wake(); 2769 test_latch_already_set(); 2770 test_latch_multi_waiter(); 2771 test_latch_set_idempotent(); 2772 test_latch_unpark(); 2773 2774 test_select_winner(); 2775 test_select_fast_path(); 2776 test_select_deinit_unparks(); 2777 test_select_compose(); 2778 2779 test_allof_basic(); 2780 test_allof_fast_path_partial(); 2781 test_allof_fast_path_all(); 2782 test_allof_empty(); 2783 test_allof_deinit_partial(); 2784 test_allof_compose_with_select(); 2785 2786 test_chan_send_blocks_until_recv(); 2787 test_chan_recv_blocks_until_send(); 2788 test_chan_send_try_no_recv(); 2789 test_chan_fifo(); 2790 test_chan_recv_fifo(); 2791 test_chan_send_unpark(); 2792 test_chan_select_recv(); 2793 test_chan_select_recv_fast_path(); 2794 test_chan_send_op_inline(); 2795 test_chan_send_op_blocks(); 2796 test_chan_select_send(); 2797 test_chan_select_send_loses(); 2798 2799 test_cancel_basic(); 2800 test_wait_or_cancel_ev_wins(); 2801 test_wait_or_cancel_cancel_wins(); 2802 test_wait_or_cancel_already_cancelled(); 2803 test_wait_or_cancel_ev_precedes_cancel(); 2804 test_wait_or_cancel_chan_recv(); 2805 test_wait_or_cancel_send_op(); 2806 2807 test_timer_basic(); 2808 test_timer_peek(); 2809 test_timer_cancel(); 2810 test_timer_park_wake(); 2811 test_timer_select(); 2812 test_timer_pairing_heap_order(); 2813 test_timer_cancel_stress(); 2814 test_timer_deinit_idempotent(); 2815 2816 test_timeout_fires(); 2817 test_timeout_deinit_before_fire(); 2818 test_timeout_with_wait_or_cancel(); 2819 test_timeout_ev_wins(); 2820 2821 test_rt_run_advances_timers(); 2822 test_rt_run_no_timers(); 2823 2824 test_runtime_drains(); 2825 2826 test_semaphore_inline_acquire(); 2827 test_semaphore_park_then_release(); 2828 test_semaphore_fifo(); 2829 test_semaphore_release_overflow_to_count(); 2830 test_semaphore_select_acquire(); 2831 test_semaphore_binary_mutex(); 2832 2833 test_queue_block_buffered(); 2834 test_queue_block_sender_parks(); 2835 test_queue_drop_newest(); 2836 test_queue_drop_oldest(); 2837 test_queue_direct_handoff(); 2838 test_queue_recv_blocks_then_drains_buffered(); 2839 test_queue_select_recv(); 2840 test_queue_send_unpark(); 2841 2842 test_broadcast_try_always_false(); 2843 test_broadcast_publish_wakes_all(); 2844 test_broadcast_rearm(); 2845 test_broadcast_missed_publish(); 2846 test_broadcast_unpark(); 2847 2848 test_task_state_machine_join(); 2849 test_task_state_machine_cancel(); 2850 2851 test_countdown_basic(); 2852 test_countdown_zero_init(); 2853 test_countdown_add(); 2854 test_countdown_park_wake(); 2855 2856 test_notify_try_always_false(); 2857 test_notify_one_fifo(); 2858 test_notify_all(); 2859 test_notify_select(); 2860 test_notify_unpark(); 2861 2862 test_mutex_basic(); 2863 2864 test_queue_send_op_inline(); 2865 test_queue_send_op_blocks(); 2866 test_queue_send_op_drop_inline(); 2867 test_queue_send_op_select_loses(); 2868 2869 test_chan_close_empty(); 2870 test_chan_close_wakes_receiver(); 2871 test_chan_close_drains_senders(); 2872 test_chan_send_try_after_close(); 2873 test_chan_send_op_close_inline(); 2874 test_chan_send_op_close_drain(); 2875 test_chan_send_op_delivered_payload(); 2876 2877 test_queue_close_drains_buffered_then_eof(); 2878 test_queue_close_wakes_receiver(); 2879 test_queue_close_drains_senders(); 2880 test_queue_send_try_after_close_block(); 2881 test_queue_send_try_after_close_drop(); 2882 test_queue_send_op_close_drain(); 2883 2884 test_ticker_single_tick(); 2885 test_ticker_multiple_ticks(); 2886 test_ticker_deinit_before_fire(); 2887 test_ticker_select(); 2888 test_ticker_try_always_false(); 2889 2890 test_task_group_empty_join(); 2891 test_task_group_join_after_all_finish(); 2892 test_task_group_cancel_fanout(); 2893 test_task_group_finished_detaches(); 2894 2895 printf("ok\n"); 2896 return 0; 2897 }