xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs | README

test_xco.c (11785B)


      1 /*
      2  * test_xco.c — smoke test for the xco API.
      3  *
      4  * Exercises: status transitions, value channel in both directions,
      5  * multiple suspends, nested resumes, xco_self. Also drives a hand-
      6  * coded xco_mach_t state machine through the same generic interface as
      7  * a coroutine, to verify the unification.
      8  */
      9 
     10 #include "xco.h"
     11 
     12 #include <assert.h>
     13 #include <stdalign.h>
     14 #include <stdio.h>
     15 #include <stdint.h>
     16 
     17 #define STACK_BYTES (64 * 1024)
     18 
     19 static alignas(XCO_STACK_ALIGN) unsigned char stack_a[STACK_BYTES];
     20 static alignas(XCO_STACK_ALIGN) unsigned char stack_b[STACK_BYTES];
     21 static alignas(XCO_STACK_ALIGN) unsigned char stack_c[STACK_BYTES];
     22 static alignas(XCO_STACK_ALIGN) unsigned char stack_y1[STACK_BYTES];
     23 static alignas(XCO_STACK_ALIGN) unsigned char stack_y2[STACK_BYTES];
     24 static alignas(XCO_STACK_ALIGN) unsigned char stack_t1[STACK_BYTES];
     25 static alignas(XCO_STACK_ALIGN) unsigned char stack_t2[STACK_BYTES];
     26 static alignas(XCO_STACK_ALIGN) unsigned char stack_t3[STACK_BYTES];
     27 static alignas(XCO_STACK_ALIGN) unsigned char stack_t4[STACK_BYTES];
     28 
     29 /* Receives n via the first resume, then yields n+1, n+2, n+3, returns n+4. */
     30 static uintptr_t counter(uintptr_t n) {
     31     assert(xco_self() != NULL);
     32     uintptr_t v = xco_suspend(n + 1);
     33     assert(v == 100);
     34     v = xco_suspend(n + 2);
     35     assert(v == 200);
     36     v = xco_suspend(n + 3);
     37     assert(v == 300);
     38     return n + 4;
     39 }
     40 
     41 /* Hand-coded state-machine implementation of the same contract:
     42  * first input n, then yield n+1, n+2, n+3, return n+4. No stack
     43  * switch — suspension is just returning XCO_STEP_SUSPENDED. */
     44 typedef struct {
     45     xco_mach_t   base;
     46     int       phase;
     47     uintptr_t n;
     48 } counter_sm_t;
     49 
     50 static xco_step_result_t counter_sm_step(xco_mach_t *s, uintptr_t v) {
     51     counter_sm_t *p = (counter_sm_t *)s;
     52     switch (p->phase++) {
     53     case 0: p->n = v; return (xco_step_result_t){v + 1,    XCO_STEP_SUSPENDED};
     54     case 1:           return (xco_step_result_t){p->n + 2, XCO_STEP_SUSPENDED};
     55     case 2:           return (xco_step_result_t){p->n + 3, XCO_STEP_SUSPENDED};
     56     case 3:           return (xco_step_result_t){p->n + 4, XCO_STEP_DEAD};
     57     }
     58     __builtin_unreachable();
     59 }
     60 
     61 static void counter_sm_init(counter_sm_t *p) {
     62     p->base  = (xco_mach_t){.step = counter_sm_step, .status = XCO_STEP_INIT};
     63     p->phase = 0;
     64     p->n     = 0;
     65 }
     66 
     67 /* Generic driver: takes any xco_mach_t implementing the counter contract
     68  * and walks it to completion. Knows nothing about coroutines. */
     69 static void drive_counter(xco_mach_t *s) {
     70     assert(xco_mach_status(s) == XCO_STEP_INIT);
     71     xco_step_result_t r = xco_step(s, 10);
     72     assert(r.status == XCO_STEP_SUSPENDED && r.value == 11);
     73     r = xco_step(s, 100);
     74     assert(r.status == XCO_STEP_SUSPENDED && r.value == 12);
     75     r = xco_step(s, 200);
     76     assert(r.status == XCO_STEP_SUSPENDED && r.value == 13);
     77     r = xco_step(s, 300);
     78     assert(r.status == XCO_STEP_DEAD && r.value == 14);
     79     assert(xco_mach_status(s) == XCO_STEP_DEAD);
     80 }
     81 
     82 static xco_coro_t inner;
     83 
     84 /* Spawns `inner` and pumps it once, demonstrating nested resumes. */
     85 static uintptr_t outer(uintptr_t arg) {
     86     (void)arg;
     87     xco_step_result_t r = xco_spawn(&inner, counter, stack_b, STACK_BYTES, 10);
     88     assert(r.status == XCO_STEP_SUSPENDED && r.value == 11);
     89     /* Yield the inner coroutine's first value back to our resumer. */
     90     uintptr_t v = xco_suspend(r.value);
     91     assert(v == 42);
     92     /* Drive inner to completion via the generic xco_step — same surface
     93      * used for hand-coded state machines. */
     94     r = xco_step(&inner.base, 100);
     95     assert(r.status == XCO_STEP_SUSPENDED && r.value == 12);
     96     r = xco_step(&inner.base, 200);
     97     assert(r.status == XCO_STEP_SUSPENDED && r.value == 13);
     98     r = xco_step(&inner.base, 300);
     99     assert(r.status == XCO_STEP_DEAD && r.value == 14);
    100     return 999;
    101 }
    102 
    103 /* ---- xco_yield ----------------------------------------------------- */
    104 
    105 /* Each yielder appends its id to the trace on every step; ends after N
    106  * yields. With two yielders alternately-scheduled by rt, the trace
    107  * shows interleaving — the proof that yield gives the runtime control. */
    108 static int trace[64];
    109 static int trace_len;
    110 
    111 typedef struct {
    112     int        id;
    113     int        n;
    114     xco_runtime_t *rt;
    115 } yielder_args_t;
    116 
    117 static uintptr_t yielder(uintptr_t arg) {
    118     yielder_args_t *ya = (yielder_args_t *)arg;
    119     for (int i = 0; i < ya->n; i++) {
    120         trace[trace_len++] = ya->id;
    121         xco_yield(ya->rt);
    122     }
    123     return (uintptr_t)ya->id;
    124 }
    125 
    126 static void test_xco_yield_alternates(void) {
    127     xco_runtime_t rt; xco_rt_init(&rt);
    128     trace_len = 0;
    129 
    130     yielder_args_t a1 = {.id = 1, .n = 3, .rt = &rt};
    131     yielder_args_t a2 = {.id = 2, .n = 3, .rt = &rt};
    132 
    133     xco_coro_t c1, c2;
    134     /* Spawn each: first step records the id, then yields back to caller
    135      * via the runtime ready queue. After spawn, both are SUSPENDED and
    136      * enqueued. */
    137     xco_step_result_t r1 = xco_spawn(&c1, yielder, stack_y1, STACK_BYTES, (uintptr_t)&a1);
    138     assert(r1.status == XCO_STEP_SUSPENDED);
    139     xco_step_result_t r2 = xco_spawn(&c2, yielder, stack_y2, STACK_BYTES, (uintptr_t)&a2);
    140     assert(r2.status == XCO_STEP_SUSPENDED);
    141 
    142     /* Drain the runtime: each yielder runs another step, yields again,
    143      * until both reach DEAD. */
    144     xco_rt_run(&rt, 0);
    145     assert(xco_mach_status(&c1.base) == XCO_STEP_DEAD);
    146     assert(xco_mach_status(&c2.base) == XCO_STEP_DEAD);
    147 
    148     /* Trace alternates: 1 2 1 2 1 2 (FIFO ready-queue ordering). */
    149     assert(trace_len == 6);
    150     int expect[6] = {1, 2, 1, 2, 1, 2};
    151     for (int i = 0; i < 6; i++) assert(trace[i] == expect[i]);
    152 }
    153 
    154 /* ---- xco_cotask ------------------------------------------------------ */
    155 
    156 /* Body returns immediately with arg + 1. The cleanest possible task —
    157  * verifies xco_trampoline wires xco_task_done with the return value. */
    158 static uintptr_t task_body_simple(xco_task_t *self, uintptr_t arg) {
    159     (void)self;
    160     return arg + 1;
    161 }
    162 
    163 static void test_xco_task_join_inline(void) {
    164     /* Task runs to completion synchronously inside xco_cotask_spawn;
    165      * task.done is set by the xco_trampoline. */
    166     xco_cotask_t xt;
    167     xco_step_result_t r = xco_cotask_spawn(&xt, task_body_simple,
    168                                       stack_t1, STACK_BYTES, 41);
    169     assert(r.status == XCO_STEP_DEAD);
    170     assert(r.value  == 42);
    171     assert(xco_task_finished(&xt.task));
    172 
    173     uintptr_t v;
    174     assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL));
    175     assert(v == 42);
    176 }
    177 
    178 /* Body that suspends on a latch then returns. Uses xco_await — the
    179  * try-park-suspend dance compresses to one call. */
    180 static xco_latch_t  task_gate;
    181 static uintptr_t task_body_gated(xco_task_t *self, uintptr_t arg) {
    182     (void)self;
    183     xco_runtime_t *rt = (xco_runtime_t *)arg;
    184     return xco_await(rt, &task_gate.base) + 100;
    185 }
    186 
    187 static void test_xco_task_join_via_runtime(void) {
    188     /* Task suspends inside body; gate fires, body returns, xco_trampoline
    189      * sets task.done. Verifying via xco_task_done_event read after xco_rt_run. */
    190     xco_runtime_t rt; xco_rt_init(&rt);
    191     xco_latch_init(&task_gate);
    192 
    193     xco_cotask_t xt;
    194     xco_step_result_t r = xco_cotask_spawn(&xt, task_body_gated,
    195                                       stack_t2, STACK_BYTES, (uintptr_t)&rt);
    196     assert(r.status == XCO_STEP_SUSPENDED);
    197     assert(!xco_task_finished(&xt.task));
    198 
    199     xco_latch_set(&task_gate, 7);
    200     xco_rt_run(&rt, 0);
    201     assert(xco_task_finished(&xt.task));
    202 
    203     uintptr_t v;
    204     assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL));
    205     assert(v == 107);
    206 }
    207 
    208 /* Joiner is itself a task — its body awaits the target's done event,
    209  * captures the return value as its own return. The cleanest expression
    210  * of "wait for another task to finish, get its result." */
    211 typedef struct {
    212     xco_runtime_t  *rt;
    213     xco_cotask_t *target;
    214 } joiner_args_t;
    215 
    216 static uintptr_t task_joiner(xco_task_t *self, uintptr_t arg) {
    217     (void)self;
    218     joiner_args_t *ja = (joiner_args_t *)arg;
    219     return xco_await(ja->rt, xco_task_done_event(&ja->target->task));
    220 }
    221 
    222 static void test_xco_task_joined_by_other_task(void) {
    223     xco_runtime_t rt; xco_rt_init(&rt);
    224     xco_latch_init(&task_gate);
    225 
    226     xco_cotask_t target;
    227     xco_cotask_spawn(&target, task_body_gated,
    228                    stack_t2, STACK_BYTES, (uintptr_t)&rt);
    229     assert(!xco_task_finished(&target.task));
    230 
    231     joiner_args_t ja = {.rt = &rt, .target = &target};
    232     xco_cotask_t joiner;
    233     xco_cotask_spawn(&joiner, task_joiner,
    234                    stack_t4, STACK_BYTES, (uintptr_t)&ja);
    235     assert(!xco_task_finished(&joiner.task));
    236 
    237     xco_latch_set(&task_gate, 7);
    238     xco_rt_run(&rt, 0);
    239     assert(xco_task_finished(&target.task));
    240     assert(xco_task_finished(&joiner.task));
    241 
    242     /* Joiner's return is the target's return propagated through. */
    243     uintptr_t v;
    244     assert(xco_event_poll(xco_task_done_event(&joiner.task), &v, NULL));
    245     assert(v == 107);
    246 }
    247 
    248 /* Cancellable body: awaits a never-firing latch under the task's cancel.
    249  * Returns 1 on event, 0 on cancel — surfacing the outcome in done. */
    250 static uintptr_t task_body_cancellable(xco_task_t *self, uintptr_t arg) {
    251     xco_runtime_t *rt = (xco_runtime_t *)arg;
    252     xco_latch_t never; xco_latch_init(&never);
    253     uintptr_t v;
    254     return xco_await_or_cancel(rt, &never.base, xco_task_cancel(self), &v) ? 1 : 0;
    255 }
    256 
    257 static void test_xco_task_cancel(void) {
    258     xco_runtime_t rt; xco_rt_init(&rt);
    259 
    260     xco_cotask_t xt;
    261     xco_step_result_t r = xco_cotask_spawn(&xt, task_body_cancellable,
    262                                       stack_t3, STACK_BYTES, (uintptr_t)&rt);
    263     assert(r.status == XCO_STEP_SUSPENDED);
    264     assert(!xco_task_finished(&xt.task));
    265 
    266     xco_cancel_set(xco_task_cancel(&xt.task));
    267     xco_rt_run(&rt, 0);
    268     assert(xco_task_finished(&xt.task));
    269 
    270     uintptr_t v;
    271     assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL));
    272     assert(v == 0);                  /* cancelled */
    273 }
    274 
    275 /* Unification at the task layer: drive an xco_cotask_t through xco_step
    276  * directly, bypassing xco_cotask_spawn. The xco_trampoline still wires
    277  * xco_task_done from the body's return, because that lives inside xco_co_step,
    278  * not in any spawn helper. */
    279 static void test_xco_task_via_xstep(void) {
    280     xco_cotask_t xt;
    281     xco_cotask_init(&xt, task_body_simple, stack_t1, STACK_BYTES);
    282     assert(xco_mach_status(&xt.co.base) == XCO_STEP_INIT);
    283 
    284     xco_step_result_t r = xco_step(&xt.co.base, 41);
    285     assert(r.status == XCO_STEP_DEAD);
    286     assert(r.value  == 42);
    287     assert(xco_task_finished(&xt.task));
    288 
    289     uintptr_t v;
    290     assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL));
    291     assert(v == 42);
    292 }
    293 
    294 int main(void) {
    295     assert(xco_self() == NULL);
    296 
    297     xco_coro_t c;
    298     xco_init(&c, outer, stack_a, STACK_BYTES);
    299     assert(xco_mach_status(&c.base) == XCO_STEP_INIT);
    300 
    301     xco_step_result_t r = xco_step(&c.base, 0);
    302     assert(xco_self() == NULL);
    303     assert(r.status == XCO_STEP_SUSPENDED);
    304     assert(r.value == 11);
    305     assert(xco_mach_status(&c.base) == XCO_STEP_SUSPENDED);
    306 
    307     r = xco_step(&c.base, 42);
    308     assert(r.status == XCO_STEP_DEAD);
    309     assert(r.value == 999);
    310     assert(xco_mach_status(&c.base) == XCO_STEP_DEAD);
    311 
    312     /* Unification: the same generic driver pumps a coroutine and a
    313      * hand-coded state machine, both reaching XCO_STEP_DEAD with the
    314      * expected value sequence. */
    315     xco_coro_t co;
    316     xco_init(&co, counter, stack_c, STACK_BYTES);
    317     drive_counter(&co.base);
    318 
    319     counter_sm_t sm;
    320     counter_sm_init(&sm);
    321     drive_counter(&sm.base);
    322 
    323     test_xco_yield_alternates();
    324     test_xco_task_join_inline();
    325     test_xco_task_via_xstep();
    326     test_xco_task_join_via_runtime();
    327     test_xco_task_joined_by_other_task();
    328     test_xco_task_cancel();
    329 
    330     printf("ok\n");
    331     return 0;
    332 }