test_op.c (15267B)
1 /* 2 * test_op.c — exercises the xco_op layer. 3 * 4 * The op layer is a pure-effect IO substrate: submitters push ops onto 5 * the runtime's pending list; the host pulls a batch via xco_rt_take_ops, 6 * executes them, and feeds completions back via xco_op_complete. These 7 * tests drive the state machine (PENDING → IN_FLIGHT → RESOLVED) plus 8 * cancel-at-each-state, embedder-result patterns, and composition with 9 * the standard event API (xco_await, wait_or_cancel). 10 */ 11 12 #include "xco.h" 13 14 #include <assert.h> 15 #include <stdalign.h> 16 #include <stdio.h> 17 18 #define STACK_BYTES (64 * 1024) 19 20 /* ---- Embedder example: a "read" op carrying fd/buf/len + result --- */ 21 22 enum { OP_READ = 1, OP_WRITE = 2 }; 23 24 typedef struct { 25 xco_op_t base; 26 int fd; 27 void *buf; 28 size_t len; 29 /* Host writes these on COMPLETED. */ 30 ssize_t result; 31 int err; 32 } read_op_t; 33 34 static void read_op_init(read_op_t *r, int fd, void *buf, size_t len) { 35 r->base.kind = OP_READ; 36 r->fd = fd; r->buf = buf; r->len = len; 37 r->result = -1; r->err = 0; 38 } 39 40 /* ---- Submit + take batch ----------------------------------------- */ 41 42 static void test_submit_then_take(void) { 43 xco_runtime_t rt; xco_rt_init(&rt); 44 45 read_op_t a, b, c; 46 read_op_init(&a, 3, NULL, 4); 47 read_op_init(&b, 4, NULL, 8); 48 read_op_init(&c, 5, NULL, 16); 49 50 xco_op_submit(&rt, &a.base); 51 xco_op_submit(&rt, &b.base); 52 xco_op_submit(&rt, &c.base); 53 54 /* PENDING: rt back-pointer set, done unset. */ 55 assert(xco_op_is_pending(&a.base)); 56 assert(xco_op_is_pending(&b.base)); 57 assert(xco_op_is_pending(&c.base)); 58 assert(rt.op_head == &a.base && rt.op_tail == &c.base); 59 60 xco_op_t *batch = xco_rt_take_ops(&rt, NULL); 61 assert(batch == &a.base); 62 assert(rt.op_head == NULL && rt.op_tail == NULL); 63 64 /* FIFO order in the returned list, all transitioned to IN_FLIGHT. */ 65 assert(batch == &a.base && batch->next == &b.base); 66 assert(batch->next->next == &c.base); 67 assert(batch->next->next->next == NULL); 68 assert(!xco_op_is_pending(&a.base)); 69 assert(!xco_op_is_pending(&b.base)); 70 assert(!xco_op_is_pending(&c.base)); 71 72 /* Take from an empty list returns NULL. */ 73 assert(xco_rt_take_ops(&rt, NULL) == NULL); 74 } 75 76 static void test_take_tail_out(void) { 77 /* The tail out-param lets the host splice the batch onto an 78 * in-flight list in O(1) without walking. */ 79 xco_runtime_t rt; xco_rt_init(&rt); 80 read_op_t a, b, c; 81 read_op_init(&a, 1, NULL, 1); 82 read_op_init(&b, 2, NULL, 2); 83 read_op_init(&c, 3, NULL, 3); 84 xco_op_submit(&rt, &a.base); 85 xco_op_submit(&rt, &b.base); 86 xco_op_submit(&rt, &c.base); 87 88 xco_op_t *tail = (xco_op_t *)0x1; /* sentinel to verify it gets written */ 89 xco_op_t *head = xco_rt_take_ops(&rt, &tail); 90 assert(head == &a.base); 91 assert(tail == &c.base); 92 93 /* Empty take: tail_out written to NULL. */ 94 tail = (xco_op_t *)0x1; 95 head = xco_rt_take_ops(&rt, &tail); 96 assert(head == NULL); 97 assert(tail == NULL); 98 } 99 100 /* ---- Awaiter: blocks on op->done.base, captures status --------- */ 101 102 typedef struct { 103 xco_mach_t base; 104 xco_op_t *op; 105 xco_runtime_t *rt; 106 xco_waker_t sw; 107 int phase; 108 xco_op_status_t status; 109 } op_awaiter_t; 110 111 static xco_step_result_t op_awaiter_step(xco_mach_t *s, uintptr_t v) { 112 op_awaiter_t *a = (op_awaiter_t *)s; 113 switch (a->phase) { 114 case 0: { 115 uintptr_t out; 116 if (xco_event_poll(&a->op->done.base, &out, NULL)) { 117 a->status = (xco_op_status_t)out; 118 a->phase = 2; 119 return (xco_step_result_t){out, XCO_STEP_DEAD}; 120 } 121 xco_waker_init(&a->sw, a->rt, &a->base); 122 xco_event_poll(&a->op->done.base, NULL, &a->sw.base); 123 a->phase = 1; 124 return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; 125 } 126 case 1: 127 a->status = (xco_op_status_t)v; 128 a->phase = 2; 129 return (xco_step_result_t){v, XCO_STEP_DEAD}; 130 } 131 __builtin_unreachable(); 132 } 133 134 static void op_awaiter_init(op_awaiter_t *a, xco_runtime_t *rt, xco_op_t *op) { 135 a->base = (xco_mach_t){.step = op_awaiter_step, .status = XCO_STEP_INIT}; 136 a->op = op; 137 a->rt = rt; 138 a->phase = 0; 139 a->status = XCO_OP_PENDING; 140 } 141 142 /* ---- Complete after host take (the happy path) -------------------- */ 143 144 static void test_complete_after_take(void) { 145 xco_runtime_t rt; xco_rt_init(&rt); 146 read_op_t op; read_op_init(&op, 7, NULL, 32); 147 148 xco_op_submit(&rt, &op.base); 149 op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base); 150 151 /* Awaiter parks on the op's done latch. */ 152 xco_step(&a.base, 0); 153 assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED); 154 assert(op.base.done.waiters == &a.sw.base); 155 156 /* Host pulls the batch, runs the op, fills result fields. */ 157 xco_op_t *batch = xco_rt_take_ops(&rt, NULL); 158 assert(batch == &op.base); 159 assert(!xco_op_is_pending(&op.base)); 160 assert(!op.base.done.set); /* still IN_FLIGHT */ 161 162 op.result = 32; 163 op.err = 0; 164 xco_op_complete(&op.base, XCO_OP_COMPLETED); 165 166 assert(op.base.done.set); 167 xco_rt_run(&rt, 0); 168 assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); 169 assert(a.status == XCO_OP_COMPLETED); 170 assert(op.result == 32); 171 } 172 173 /* ---- Cancel while PENDING: splices, resolves CANCELLED ------------ */ 174 175 static void test_cancel_while_pending(void) { 176 xco_runtime_t rt; xco_rt_init(&rt); 177 read_op_t a, b, c; 178 read_op_init(&a, 1, NULL, 1); 179 read_op_init(&b, 2, NULL, 2); 180 read_op_init(&c, 3, NULL, 3); 181 xco_op_submit(&rt, &a.base); 182 xco_op_submit(&rt, &b.base); 183 xco_op_submit(&rt, &c.base); 184 185 /* Cancel the middle: list links splice cleanly. */ 186 xco_op_cancel(&b.base); 187 assert(b.base.done.set); 188 assert(!xco_op_is_pending(&b.base)); 189 assert(b.base.prev == NULL && b.base.next == NULL); 190 191 /* Resolved as CANCELLED. */ 192 uintptr_t v; 193 assert(xco_event_poll(&b.base.done.base, &v, NULL)); 194 assert(v == XCO_OP_CANCELLED); 195 196 /* a and c are still on the list, in order. */ 197 assert(rt.op_head == &a.base && rt.op_tail == &c.base); 198 assert(a.base.next == &c.base && c.base.prev == &a.base); 199 200 /* Cancel head and tail. */ 201 xco_op_cancel(&a.base); 202 assert(rt.op_head == &c.base && rt.op_tail == &c.base); 203 assert(c.base.prev == NULL); 204 xco_op_cancel(&c.base); 205 assert(rt.op_head == NULL && rt.op_tail == NULL); 206 assert(a.base.done.set && c.base.done.set); 207 } 208 209 /* ---- Cancel while IN_FLIGHT is advisory --------------------------- */ 210 211 static void test_cancel_while_in_flight(void) { 212 xco_runtime_t rt; xco_rt_init(&rt); 213 read_op_t op; read_op_init(&op, 9, NULL, 64); 214 215 xco_op_submit(&rt, &op.base); 216 (void)xco_rt_take_ops(&rt, NULL); /* host took it; now IN_FLIGHT */ 217 assert(!xco_op_is_pending(&op.base)); 218 219 xco_op_cancel(&op.base); 220 /* Advisory only: done not fired, host sees cancel_requested. */ 221 assert(op.base.cancel_requested); 222 assert(!op.base.done.set); 223 224 /* Host honors the cancel. */ 225 xco_op_complete(&op.base, XCO_OP_CANCELLED); 226 assert(op.base.done.set); 227 uintptr_t v; 228 assert(xco_event_poll(&op.base.done.base, &v, NULL)); 229 assert(v == XCO_OP_CANCELLED); 230 } 231 232 static void test_cancel_in_flight_lost_race(void) { 233 /* Cancel after take, but the syscall already finished — host 234 * completes with COMPLETED instead. The awaiter sees whichever 235 * status the host chose; cancel_requested is just an advisory bit. */ 236 xco_runtime_t rt; xco_rt_init(&rt); 237 read_op_t op; read_op_init(&op, 9, NULL, 64); 238 239 xco_op_submit(&rt, &op.base); 240 (void)xco_rt_take_ops(&rt, NULL); 241 xco_op_cancel(&op.base); 242 assert(op.base.cancel_requested); 243 244 op.result = 64; 245 xco_op_complete(&op.base, XCO_OP_COMPLETED); 246 uintptr_t v; 247 assert(xco_event_poll(&op.base.done.base, &v, NULL)); 248 assert(v == XCO_OP_COMPLETED); 249 assert(op.result == 64); 250 } 251 252 /* ---- Idempotency -------------------------------------------------- */ 253 254 static void test_complete_idempotent(void) { 255 xco_runtime_t rt; xco_rt_init(&rt); 256 read_op_t op; read_op_init(&op, 1, NULL, 1); 257 xco_op_submit(&rt, &op.base); 258 (void)xco_rt_take_ops(&rt, NULL); 259 260 xco_op_complete(&op.base, XCO_OP_COMPLETED); 261 /* Second complete is a no-op: latch is set, status doesn't change. */ 262 xco_op_complete(&op.base, XCO_OP_CANCELLED); 263 uintptr_t v; 264 assert(xco_event_poll(&op.base.done.base, &v, NULL)); 265 assert(v == XCO_OP_COMPLETED); 266 } 267 268 static void test_cancel_after_resolved(void) { 269 xco_runtime_t rt; xco_rt_init(&rt); 270 read_op_t op; read_op_init(&op, 1, NULL, 1); 271 xco_op_submit(&rt, &op.base); 272 (void)xco_rt_take_ops(&rt, NULL); 273 xco_op_complete(&op.base, XCO_OP_COMPLETED); 274 275 /* Cancel after RESOLVED is a no-op; status stays COMPLETED. */ 276 xco_op_cancel(&op.base); 277 uintptr_t v; 278 assert(xco_event_poll(&op.base.done.base, &v, NULL)); 279 assert(v == XCO_OP_COMPLETED); 280 } 281 282 /* ---- Cancel-while-PENDING wakes the awaiter ----------------------- */ 283 284 static void test_cancel_pending_wakes_awaiter(void) { 285 xco_runtime_t rt; xco_rt_init(&rt); 286 read_op_t op; read_op_init(&op, 1, NULL, 1); 287 xco_op_submit(&rt, &op.base); 288 289 op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base); 290 xco_step(&a.base, 0); 291 assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED); 292 293 xco_op_cancel(&op.base); 294 /* PENDING-cancel resolves done immediately and enqueues the waker. */ 295 assert(op.base.done.set); 296 xco_rt_run(&rt, 0); 297 assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); 298 assert(a.status == XCO_OP_CANCELLED); 299 } 300 301 /* ---- Already-resolved: awaiter takes fast path ------------------- */ 302 303 static void test_awaiter_fast_path(void) { 304 xco_runtime_t rt; xco_rt_init(&rt); 305 read_op_t op; read_op_init(&op, 1, NULL, 1); 306 xco_op_submit(&rt, &op.base); 307 (void)xco_rt_take_ops(&rt, NULL); 308 xco_op_complete(&op.base, XCO_OP_COMPLETED); 309 310 op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base); 311 xco_step_result_t r = xco_step(&a.base, 0); 312 assert(r.status == XCO_STEP_DEAD); 313 assert(a.status == XCO_OP_COMPLETED); 314 assert(rt.head == NULL); /* no parking, no enqueue */ 315 } 316 317 /* ---- Compose with wait_or_cancel: cooperative cancel pattern ----- */ 318 319 static uintptr_t coop_body(xco_task_t *self, uintptr_t arg) { 320 xco_runtime_t *rt = (xco_runtime_t *)arg; 321 read_op_t op; read_op_init(&op, 11, NULL, 128); 322 xco_op_submit(rt, &op.base); 323 324 uintptr_t status_v = 0; 325 bool ok = xco_await_or_cancel(rt, &op.base.done.base, 326 xco_task_cancel(self), &status_v); 327 if (!ok) { 328 /* Task cancel won the race; wind down the op cooperatively. */ 329 xco_op_cancel(&op.base); 330 status_v = xco_await(rt, &op.base.done.base); 331 } 332 return status_v; 333 } 334 335 static alignas(XCO_STACK_ALIGN) unsigned char coop_stack[STACK_BYTES]; 336 337 static void test_wait_or_cancel_op_wins(void) { 338 xco_runtime_t rt; xco_rt_init(&rt); 339 xco_cotask_t xt; 340 xco_cotask_spawn(&xt, coop_body, coop_stack, sizeof coop_stack, 341 (uintptr_t)&rt); 342 /* Body submitted an op and parked on a wait_or_cancel select. */ 343 assert(xco_mach_status(&xt.co.base) == XCO_STEP_SUSPENDED); 344 345 xco_op_t *batch = xco_rt_take_ops(&rt, NULL); 346 assert(batch != NULL); 347 /* Host runs the op and reports completion. */ 348 xco_op_complete(batch, XCO_OP_COMPLETED); 349 350 xco_rt_run(&rt, 0); 351 assert(xco_task_finished(&xt.task)); 352 uintptr_t v; 353 assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); 354 assert(v == XCO_OP_COMPLETED); 355 } 356 357 static alignas(XCO_STACK_ALIGN) unsigned char coop_stack2[STACK_BYTES]; 358 359 static void test_wait_or_cancel_task_cancel_pending(void) { 360 /* Task cancel fires while op is still PENDING. The body's 361 * xco_await_or_cancel returns false; the body then xco_op_cancels, 362 * which resolves done(CANCELLED) inline (PENDING path). */ 363 xco_runtime_t rt; xco_rt_init(&rt); 364 xco_cotask_t xt; 365 xco_cotask_spawn(&xt, coop_body, coop_stack2, sizeof coop_stack2, 366 (uintptr_t)&rt); 367 assert(xco_mach_status(&xt.co.base) == XCO_STEP_SUSPENDED); 368 /* Op is PENDING on the runtime. */ 369 assert(rt.op_head != NULL); 370 371 xco_cancel_set(xco_task_cancel(&xt.task)); 372 xco_rt_run(&rt, 0); 373 assert(xco_task_finished(&xt.task)); 374 uintptr_t v; 375 assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); 376 assert(v == XCO_OP_CANCELLED); 377 /* Op was spliced from the pending list by the body's xco_op_cancel. */ 378 assert(rt.op_head == NULL && rt.op_tail == NULL); 379 } 380 381 static alignas(XCO_STACK_ALIGN) unsigned char coop_stack3[STACK_BYTES]; 382 383 static void test_wait_or_cancel_task_cancel_after_take(void) { 384 /* Task cancel after host has taken the op. Body xco_op_cancels; 385 * since op is IN_FLIGHT this just sets cancel_requested. Body then 386 * waits for the host's eventual completion. */ 387 xco_runtime_t rt; xco_rt_init(&rt); 388 xco_cotask_t xt; 389 xco_cotask_spawn(&xt, coop_body, coop_stack3, sizeof coop_stack3, 390 (uintptr_t)&rt); 391 xco_op_t *batch = xco_rt_take_ops(&rt, NULL); 392 assert(batch != NULL); 393 394 /* Now cancel the task. */ 395 xco_cancel_set(xco_task_cancel(&xt.task)); 396 xco_rt_run(&rt, 0); 397 /* Body has wound back to xco_await on done; not yet finished. */ 398 assert(!xco_task_finished(&xt.task)); 399 assert(batch->cancel_requested); 400 401 /* Host honors the cancel. */ 402 xco_op_complete(batch, XCO_OP_CANCELLED); 403 xco_rt_run(&rt, 0); 404 assert(xco_task_finished(&xt.task)); 405 uintptr_t v; 406 assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); 407 assert(v == XCO_OP_CANCELLED); 408 } 409 410 /* ---- Open kind tag space ----------------------------------------- */ 411 412 static void test_open_kind_space(void) { 413 /* The runtime never inspects kind. Two op types share the pending 414 * list; the host pattern-matches and dispatches per-kind. */ 415 xco_runtime_t rt; xco_rt_init(&rt); 416 read_op_t r; read_op_init(&r, 1, NULL, 4); 417 typedef struct { xco_op_t base; const char *path; } stat_op_t; 418 stat_op_t s = { .base = { .kind = 99 }, .path = "/tmp/x" }; 419 420 xco_op_submit(&rt, &r.base); 421 xco_op_submit(&rt, &s.base); 422 423 int saw_read = 0, saw_stat = 0; 424 for (xco_op_t *o = xco_rt_take_ops(&rt, NULL); o; o = o->next) { 425 switch (o->kind) { 426 case OP_READ: saw_read++; xco_op_complete(o, XCO_OP_COMPLETED); break; 427 case 99: saw_stat++; xco_op_complete(o, XCO_OP_COMPLETED); break; 428 default: assert(0); 429 } 430 } 431 assert(saw_read == 1 && saw_stat == 1); 432 assert(r.base.done.set && s.base.done.set); 433 } 434 435 int main(void) { 436 test_submit_then_take(); 437 test_take_tail_out(); 438 test_complete_after_take(); 439 test_cancel_while_pending(); 440 test_cancel_while_in_flight(); 441 test_cancel_in_flight_lost_race(); 442 test_complete_idempotent(); 443 test_cancel_after_resolved(); 444 test_cancel_pending_wakes_awaiter(); 445 test_awaiter_fast_path(); 446 test_wait_or_cancel_op_wins(); 447 test_wait_or_cancel_task_cancel_pending(); 448 test_wait_or_cancel_task_cancel_after_take(); 449 test_open_kind_space(); 450 printf("ok\n"); 451 return 0; 452 }