xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs | README

test_op.c (15267B)


      1 /*
      2  * test_op.c — exercises the xco_op layer.
      3  *
      4  * The op layer is a pure-effect IO substrate: submitters push ops onto
      5  * the runtime's pending list; the host pulls a batch via xco_rt_take_ops,
      6  * executes them, and feeds completions back via xco_op_complete. These
      7  * tests drive the state machine (PENDING → IN_FLIGHT → RESOLVED) plus
      8  * cancel-at-each-state, embedder-result patterns, and composition with
      9  * the standard event API (xco_await, wait_or_cancel).
     10  */
     11 
     12 #include "xco.h"
     13 
     14 #include <assert.h>
     15 #include <stdalign.h>
     16 #include <stdio.h>
     17 
     18 #define STACK_BYTES (64 * 1024)
     19 
     20 /* ---- Embedder example: a "read" op carrying fd/buf/len + result --- */
     21 
     22 enum { OP_READ = 1, OP_WRITE = 2 };
     23 
     24 typedef struct {
     25     xco_op_t base;
     26     int      fd;
     27     void    *buf;
     28     size_t   len;
     29     /* Host writes these on COMPLETED. */
     30     ssize_t  result;
     31     int      err;
     32 } read_op_t;
     33 
     34 static void read_op_init(read_op_t *r, int fd, void *buf, size_t len) {
     35     r->base.kind = OP_READ;
     36     r->fd = fd; r->buf = buf; r->len = len;
     37     r->result = -1; r->err = 0;
     38 }
     39 
     40 /* ---- Submit + take batch ----------------------------------------- */
     41 
     42 static void test_submit_then_take(void) {
     43     xco_runtime_t rt; xco_rt_init(&rt);
     44 
     45     read_op_t a, b, c;
     46     read_op_init(&a, 3, NULL, 4);
     47     read_op_init(&b, 4, NULL, 8);
     48     read_op_init(&c, 5, NULL, 16);
     49 
     50     xco_op_submit(&rt, &a.base);
     51     xco_op_submit(&rt, &b.base);
     52     xco_op_submit(&rt, &c.base);
     53 
     54     /* PENDING: rt back-pointer set, done unset. */
     55     assert(xco_op_is_pending(&a.base));
     56     assert(xco_op_is_pending(&b.base));
     57     assert(xco_op_is_pending(&c.base));
     58     assert(rt.op_head == &a.base && rt.op_tail == &c.base);
     59 
     60     xco_op_t *batch = xco_rt_take_ops(&rt, NULL);
     61     assert(batch == &a.base);
     62     assert(rt.op_head == NULL && rt.op_tail == NULL);
     63 
     64     /* FIFO order in the returned list, all transitioned to IN_FLIGHT. */
     65     assert(batch == &a.base && batch->next == &b.base);
     66     assert(batch->next->next == &c.base);
     67     assert(batch->next->next->next == NULL);
     68     assert(!xco_op_is_pending(&a.base));
     69     assert(!xco_op_is_pending(&b.base));
     70     assert(!xco_op_is_pending(&c.base));
     71 
     72     /* Take from an empty list returns NULL. */
     73     assert(xco_rt_take_ops(&rt, NULL) == NULL);
     74 }
     75 
     76 static void test_take_tail_out(void) {
     77     /* The tail out-param lets the host splice the batch onto an
     78      * in-flight list in O(1) without walking. */
     79     xco_runtime_t rt; xco_rt_init(&rt);
     80     read_op_t a, b, c;
     81     read_op_init(&a, 1, NULL, 1);
     82     read_op_init(&b, 2, NULL, 2);
     83     read_op_init(&c, 3, NULL, 3);
     84     xco_op_submit(&rt, &a.base);
     85     xco_op_submit(&rt, &b.base);
     86     xco_op_submit(&rt, &c.base);
     87 
     88     xco_op_t *tail = (xco_op_t *)0x1;     /* sentinel to verify it gets written */
     89     xco_op_t *head = xco_rt_take_ops(&rt, &tail);
     90     assert(head == &a.base);
     91     assert(tail == &c.base);
     92 
     93     /* Empty take: tail_out written to NULL. */
     94     tail = (xco_op_t *)0x1;
     95     head = xco_rt_take_ops(&rt, &tail);
     96     assert(head == NULL);
     97     assert(tail == NULL);
     98 }
     99 
    100 /* ---- Awaiter: blocks on op->done.base, captures status --------- */
    101 
    102 typedef struct {
    103     xco_mach_t       base;
    104     xco_op_t        *op;
    105     xco_runtime_t   *rt;
    106     xco_waker_t      sw;
    107     int              phase;
    108     xco_op_status_t  status;
    109 } op_awaiter_t;
    110 
    111 static xco_step_result_t op_awaiter_step(xco_mach_t *s, uintptr_t v) {
    112     op_awaiter_t *a = (op_awaiter_t *)s;
    113     switch (a->phase) {
    114     case 0: {
    115         uintptr_t out;
    116         if (xco_event_poll(&a->op->done.base, &out, NULL)) {
    117             a->status = (xco_op_status_t)out;
    118             a->phase  = 2;
    119             return (xco_step_result_t){out, XCO_STEP_DEAD};
    120         }
    121         xco_waker_init(&a->sw, a->rt, &a->base);
    122         xco_event_poll(&a->op->done.base, NULL, &a->sw.base);
    123         a->phase = 1;
    124         return (xco_step_result_t){0, XCO_STEP_SUSPENDED};
    125     }
    126     case 1:
    127         a->status = (xco_op_status_t)v;
    128         a->phase  = 2;
    129         return (xco_step_result_t){v, XCO_STEP_DEAD};
    130     }
    131     __builtin_unreachable();
    132 }
    133 
    134 static void op_awaiter_init(op_awaiter_t *a, xco_runtime_t *rt, xco_op_t *op) {
    135     a->base   = (xco_mach_t){.step = op_awaiter_step, .status = XCO_STEP_INIT};
    136     a->op     = op;
    137     a->rt     = rt;
    138     a->phase  = 0;
    139     a->status = XCO_OP_PENDING;
    140 }
    141 
    142 /* ---- Complete after host take (the happy path) -------------------- */
    143 
    144 static void test_complete_after_take(void) {
    145     xco_runtime_t rt; xco_rt_init(&rt);
    146     read_op_t op; read_op_init(&op, 7, NULL, 32);
    147 
    148     xco_op_submit(&rt, &op.base);
    149     op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base);
    150 
    151     /* Awaiter parks on the op's done latch. */
    152     xco_step(&a.base, 0);
    153     assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED);
    154     assert(op.base.done.waiters == &a.sw.base);
    155 
    156     /* Host pulls the batch, runs the op, fills result fields. */
    157     xco_op_t *batch = xco_rt_take_ops(&rt, NULL);
    158     assert(batch == &op.base);
    159     assert(!xco_op_is_pending(&op.base));
    160     assert(!op.base.done.set);   /* still IN_FLIGHT */
    161 
    162     op.result = 32;
    163     op.err    = 0;
    164     xco_op_complete(&op.base, XCO_OP_COMPLETED);
    165 
    166     assert(op.base.done.set);
    167     xco_rt_run(&rt, 0);
    168     assert(xco_mach_status(&a.base) == XCO_STEP_DEAD);
    169     assert(a.status == XCO_OP_COMPLETED);
    170     assert(op.result == 32);
    171 }
    172 
    173 /* ---- Cancel while PENDING: splices, resolves CANCELLED ------------ */
    174 
    175 static void test_cancel_while_pending(void) {
    176     xco_runtime_t rt; xco_rt_init(&rt);
    177     read_op_t a, b, c;
    178     read_op_init(&a, 1, NULL, 1);
    179     read_op_init(&b, 2, NULL, 2);
    180     read_op_init(&c, 3, NULL, 3);
    181     xco_op_submit(&rt, &a.base);
    182     xco_op_submit(&rt, &b.base);
    183     xco_op_submit(&rt, &c.base);
    184 
    185     /* Cancel the middle: list links splice cleanly. */
    186     xco_op_cancel(&b.base);
    187     assert(b.base.done.set);
    188     assert(!xco_op_is_pending(&b.base));
    189     assert(b.base.prev == NULL && b.base.next == NULL);
    190 
    191     /* Resolved as CANCELLED. */
    192     uintptr_t v;
    193     assert(xco_event_poll(&b.base.done.base, &v, NULL));
    194     assert(v == XCO_OP_CANCELLED);
    195 
    196     /* a and c are still on the list, in order. */
    197     assert(rt.op_head == &a.base && rt.op_tail == &c.base);
    198     assert(a.base.next == &c.base && c.base.prev == &a.base);
    199 
    200     /* Cancel head and tail. */
    201     xco_op_cancel(&a.base);
    202     assert(rt.op_head == &c.base && rt.op_tail == &c.base);
    203     assert(c.base.prev == NULL);
    204     xco_op_cancel(&c.base);
    205     assert(rt.op_head == NULL && rt.op_tail == NULL);
    206     assert(a.base.done.set && c.base.done.set);
    207 }
    208 
    209 /* ---- Cancel while IN_FLIGHT is advisory --------------------------- */
    210 
    211 static void test_cancel_while_in_flight(void) {
    212     xco_runtime_t rt; xco_rt_init(&rt);
    213     read_op_t op; read_op_init(&op, 9, NULL, 64);
    214 
    215     xco_op_submit(&rt, &op.base);
    216     (void)xco_rt_take_ops(&rt, NULL);     /* host took it; now IN_FLIGHT */
    217     assert(!xco_op_is_pending(&op.base));
    218 
    219     xco_op_cancel(&op.base);
    220     /* Advisory only: done not fired, host sees cancel_requested. */
    221     assert(op.base.cancel_requested);
    222     assert(!op.base.done.set);
    223 
    224     /* Host honors the cancel. */
    225     xco_op_complete(&op.base, XCO_OP_CANCELLED);
    226     assert(op.base.done.set);
    227     uintptr_t v;
    228     assert(xco_event_poll(&op.base.done.base, &v, NULL));
    229     assert(v == XCO_OP_CANCELLED);
    230 }
    231 
    232 static void test_cancel_in_flight_lost_race(void) {
    233     /* Cancel after take, but the syscall already finished — host
    234      * completes with COMPLETED instead. The awaiter sees whichever
    235      * status the host chose; cancel_requested is just an advisory bit. */
    236     xco_runtime_t rt; xco_rt_init(&rt);
    237     read_op_t op; read_op_init(&op, 9, NULL, 64);
    238 
    239     xco_op_submit(&rt, &op.base);
    240     (void)xco_rt_take_ops(&rt, NULL);
    241     xco_op_cancel(&op.base);
    242     assert(op.base.cancel_requested);
    243 
    244     op.result = 64;
    245     xco_op_complete(&op.base, XCO_OP_COMPLETED);
    246     uintptr_t v;
    247     assert(xco_event_poll(&op.base.done.base, &v, NULL));
    248     assert(v == XCO_OP_COMPLETED);
    249     assert(op.result == 64);
    250 }
    251 
    252 /* ---- Idempotency -------------------------------------------------- */
    253 
    254 static void test_complete_idempotent(void) {
    255     xco_runtime_t rt; xco_rt_init(&rt);
    256     read_op_t op; read_op_init(&op, 1, NULL, 1);
    257     xco_op_submit(&rt, &op.base);
    258     (void)xco_rt_take_ops(&rt, NULL);
    259 
    260     xco_op_complete(&op.base, XCO_OP_COMPLETED);
    261     /* Second complete is a no-op: latch is set, status doesn't change. */
    262     xco_op_complete(&op.base, XCO_OP_CANCELLED);
    263     uintptr_t v;
    264     assert(xco_event_poll(&op.base.done.base, &v, NULL));
    265     assert(v == XCO_OP_COMPLETED);
    266 }
    267 
    268 static void test_cancel_after_resolved(void) {
    269     xco_runtime_t rt; xco_rt_init(&rt);
    270     read_op_t op; read_op_init(&op, 1, NULL, 1);
    271     xco_op_submit(&rt, &op.base);
    272     (void)xco_rt_take_ops(&rt, NULL);
    273     xco_op_complete(&op.base, XCO_OP_COMPLETED);
    274 
    275     /* Cancel after RESOLVED is a no-op; status stays COMPLETED. */
    276     xco_op_cancel(&op.base);
    277     uintptr_t v;
    278     assert(xco_event_poll(&op.base.done.base, &v, NULL));
    279     assert(v == XCO_OP_COMPLETED);
    280 }
    281 
    282 /* ---- Cancel-while-PENDING wakes the awaiter ----------------------- */
    283 
    284 static void test_cancel_pending_wakes_awaiter(void) {
    285     xco_runtime_t rt; xco_rt_init(&rt);
    286     read_op_t op; read_op_init(&op, 1, NULL, 1);
    287     xco_op_submit(&rt, &op.base);
    288 
    289     op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base);
    290     xco_step(&a.base, 0);
    291     assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED);
    292 
    293     xco_op_cancel(&op.base);
    294     /* PENDING-cancel resolves done immediately and enqueues the waker. */
    295     assert(op.base.done.set);
    296     xco_rt_run(&rt, 0);
    297     assert(xco_mach_status(&a.base) == XCO_STEP_DEAD);
    298     assert(a.status == XCO_OP_CANCELLED);
    299 }
    300 
    301 /* ---- Already-resolved: awaiter takes fast path ------------------- */
    302 
    303 static void test_awaiter_fast_path(void) {
    304     xco_runtime_t rt; xco_rt_init(&rt);
    305     read_op_t op; read_op_init(&op, 1, NULL, 1);
    306     xco_op_submit(&rt, &op.base);
    307     (void)xco_rt_take_ops(&rt, NULL);
    308     xco_op_complete(&op.base, XCO_OP_COMPLETED);
    309 
    310     op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base);
    311     xco_step_result_t r = xco_step(&a.base, 0);
    312     assert(r.status == XCO_STEP_DEAD);
    313     assert(a.status == XCO_OP_COMPLETED);
    314     assert(rt.head == NULL);          /* no parking, no enqueue */
    315 }
    316 
    317 /* ---- Compose with wait_or_cancel: cooperative cancel pattern ----- */
    318 
    319 static uintptr_t coop_body(xco_task_t *self, uintptr_t arg) {
    320     xco_runtime_t *rt = (xco_runtime_t *)arg;
    321     read_op_t op; read_op_init(&op, 11, NULL, 128);
    322     xco_op_submit(rt, &op.base);
    323 
    324     uintptr_t status_v = 0;
    325     bool ok = xco_await_or_cancel(rt, &op.base.done.base,
    326                                   xco_task_cancel(self), &status_v);
    327     if (!ok) {
    328         /* Task cancel won the race; wind down the op cooperatively. */
    329         xco_op_cancel(&op.base);
    330         status_v = xco_await(rt, &op.base.done.base);
    331     }
    332     return status_v;
    333 }
    334 
    335 static alignas(XCO_STACK_ALIGN) unsigned char coop_stack[STACK_BYTES];
    336 
    337 static void test_wait_or_cancel_op_wins(void) {
    338     xco_runtime_t rt; xco_rt_init(&rt);
    339     xco_cotask_t xt;
    340     xco_cotask_spawn(&xt, coop_body, coop_stack, sizeof coop_stack,
    341                      (uintptr_t)&rt);
    342     /* Body submitted an op and parked on a wait_or_cancel select. */
    343     assert(xco_mach_status(&xt.co.base) == XCO_STEP_SUSPENDED);
    344 
    345     xco_op_t *batch = xco_rt_take_ops(&rt, NULL);
    346     assert(batch != NULL);
    347     /* Host runs the op and reports completion. */
    348     xco_op_complete(batch, XCO_OP_COMPLETED);
    349 
    350     xco_rt_run(&rt, 0);
    351     assert(xco_task_finished(&xt.task));
    352     uintptr_t v;
    353     assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL));
    354     assert(v == XCO_OP_COMPLETED);
    355 }
    356 
    357 static alignas(XCO_STACK_ALIGN) unsigned char coop_stack2[STACK_BYTES];
    358 
    359 static void test_wait_or_cancel_task_cancel_pending(void) {
    360     /* Task cancel fires while op is still PENDING. The body's
    361      * xco_await_or_cancel returns false; the body then xco_op_cancels,
    362      * which resolves done(CANCELLED) inline (PENDING path). */
    363     xco_runtime_t rt; xco_rt_init(&rt);
    364     xco_cotask_t xt;
    365     xco_cotask_spawn(&xt, coop_body, coop_stack2, sizeof coop_stack2,
    366                      (uintptr_t)&rt);
    367     assert(xco_mach_status(&xt.co.base) == XCO_STEP_SUSPENDED);
    368     /* Op is PENDING on the runtime. */
    369     assert(rt.op_head != NULL);
    370 
    371     xco_cancel_set(xco_task_cancel(&xt.task));
    372     xco_rt_run(&rt, 0);
    373     assert(xco_task_finished(&xt.task));
    374     uintptr_t v;
    375     assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL));
    376     assert(v == XCO_OP_CANCELLED);
    377     /* Op was spliced from the pending list by the body's xco_op_cancel. */
    378     assert(rt.op_head == NULL && rt.op_tail == NULL);
    379 }
    380 
    381 static alignas(XCO_STACK_ALIGN) unsigned char coop_stack3[STACK_BYTES];
    382 
    383 static void test_wait_or_cancel_task_cancel_after_take(void) {
    384     /* Task cancel after host has taken the op. Body xco_op_cancels;
    385      * since op is IN_FLIGHT this just sets cancel_requested. Body then
    386      * waits for the host's eventual completion. */
    387     xco_runtime_t rt; xco_rt_init(&rt);
    388     xco_cotask_t xt;
    389     xco_cotask_spawn(&xt, coop_body, coop_stack3, sizeof coop_stack3,
    390                      (uintptr_t)&rt);
    391     xco_op_t *batch = xco_rt_take_ops(&rt, NULL);
    392     assert(batch != NULL);
    393 
    394     /* Now cancel the task. */
    395     xco_cancel_set(xco_task_cancel(&xt.task));
    396     xco_rt_run(&rt, 0);
    397     /* Body has wound back to xco_await on done; not yet finished. */
    398     assert(!xco_task_finished(&xt.task));
    399     assert(batch->cancel_requested);
    400 
    401     /* Host honors the cancel. */
    402     xco_op_complete(batch, XCO_OP_CANCELLED);
    403     xco_rt_run(&rt, 0);
    404     assert(xco_task_finished(&xt.task));
    405     uintptr_t v;
    406     assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL));
    407     assert(v == XCO_OP_CANCELLED);
    408 }
    409 
    410 /* ---- Open kind tag space ----------------------------------------- */
    411 
    412 static void test_open_kind_space(void) {
    413     /* The runtime never inspects kind. Two op types share the pending
    414      * list; the host pattern-matches and dispatches per-kind. */
    415     xco_runtime_t rt; xco_rt_init(&rt);
    416     read_op_t r;  read_op_init(&r, 1, NULL, 4);
    417     typedef struct { xco_op_t base; const char *path; } stat_op_t;
    418     stat_op_t s = { .base = { .kind = 99 }, .path = "/tmp/x" };
    419 
    420     xco_op_submit(&rt, &r.base);
    421     xco_op_submit(&rt, &s.base);
    422 
    423     int saw_read = 0, saw_stat = 0;
    424     for (xco_op_t *o = xco_rt_take_ops(&rt, NULL); o; o = o->next) {
    425         switch (o->kind) {
    426         case OP_READ: saw_read++; xco_op_complete(o, XCO_OP_COMPLETED); break;
    427         case 99:      saw_stat++; xco_op_complete(o, XCO_OP_COMPLETED); break;
    428         default: assert(0);
    429         }
    430     }
    431     assert(saw_read == 1 && saw_stat == 1);
    432     assert(r.base.done.set && s.base.done.set);
    433 }
    434 
    435 int main(void) {
    436     test_submit_then_take();
    437     test_take_tail_out();
    438     test_complete_after_take();
    439     test_cancel_while_pending();
    440     test_cancel_while_in_flight();
    441     test_cancel_in_flight_lost_race();
    442     test_complete_idempotent();
    443     test_cancel_after_resolved();
    444     test_cancel_pending_wakes_awaiter();
    445     test_awaiter_fast_path();
    446     test_wait_or_cancel_op_wins();
    447     test_wait_or_cancel_task_cancel_pending();
    448     test_wait_or_cancel_task_cancel_after_take();
    449     test_open_kind_space();
    450     printf("ok\n");
    451     return 0;
    452 }