xco

Concurrency for C
git clone https://git.ryansepassi.com/git/xco.git
Log | Files | Refs

commit a3a4dfec6af6dce976d823d7fedbf2f99e532a43
parent ac11fcddc0cf1f7eaf1610374e5e975b3168c16d
Author: Ryan Sepassi <rsepassi@gmail.com>
Date:   Tue,  5 May 2026 15:37:32 -0700

Add xco_op layer for pure-effect IO

Coroutines submit ops onto a runtime-owned pending list; the host pulls
the batch via xco_rt_take_ops, executes them externally, and feeds
completions back via xco_op_complete. The xco library reads no clocks
and makes no syscalls — that property extends to IO via this layer.

PENDING/IN_FLIGHT distinction uses a generation counter on the runtime
plus an epoch field on each op, so submit, take, and is_pending are all
O(1) — no per-op walk on take.

Diffstat:
MMakefile | 2+-
Atests/test_op.c | 452+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mxco.c | 62++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mxco.h | 93+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
4 files changed, 606 insertions(+), 3 deletions(-)

diff --git a/Makefile b/Makefile @@ -21,7 +21,7 @@ SRCS := xco.c $(PLATFORMDIR)/xco_platform.c OBJS := $(SRCS:%.c=$(BUILD)/%.o) LIB := $(BUILD)/libxco.a -TEST_SRCS := tests/test_xco.c tests/test_event.c +TEST_SRCS := tests/test_xco.c tests/test_event.c tests/test_op.c TEST_BINS := $(TEST_SRCS:tests/%.c=$(BUILD)/%) all: $(LIB) diff --git a/tests/test_op.c b/tests/test_op.c @@ -0,0 +1,452 @@ +/* + * test_op.c — exercises the xco_op layer. + * + * The op layer is a pure-effect IO substrate: submitters push ops onto + * the runtime's pending list; the host pulls a batch via xco_rt_take_ops, + * executes them, and feeds completions back via xco_op_complete. These + * tests drive the state machine (PENDING → IN_FLIGHT → RESOLVED) plus + * cancel-at-each-state, embedder-result patterns, and composition with + * the standard event API (xco_await, wait_or_cancel). + */ + +#include "xco.h" + +#include <assert.h> +#include <stdalign.h> +#include <stdio.h> + +#define STACK_BYTES (64 * 1024) + +/* ---- Embedder example: a "read" op carrying fd/buf/len + result --- */ + +enum { OP_READ = 1, OP_WRITE = 2 }; + +typedef struct { + xco_op_t base; + int fd; + void *buf; + size_t len; + /* Host writes these on COMPLETED. */ + ssize_t result; + int err; +} read_op_t; + +static void read_op_init(read_op_t *r, int fd, void *buf, size_t len) { + r->base.kind = OP_READ; + r->fd = fd; r->buf = buf; r->len = len; + r->result = -1; r->err = 0; +} + +/* ---- Submit + take batch ----------------------------------------- */ + +static void test_submit_then_take(void) { + xco_runtime_t rt; xco_rt_init(&rt); + + read_op_t a, b, c; + read_op_init(&a, 3, NULL, 4); + read_op_init(&b, 4, NULL, 8); + read_op_init(&c, 5, NULL, 16); + + xco_op_submit(&rt, &a.base); + xco_op_submit(&rt, &b.base); + xco_op_submit(&rt, &c.base); + + /* PENDING: rt back-pointer set, done unset. */ + assert(xco_op_is_pending(&a.base)); + assert(xco_op_is_pending(&b.base)); + assert(xco_op_is_pending(&c.base)); + assert(rt.op_head == &a.base && rt.op_tail == &c.base); + + xco_op_t *batch = xco_rt_take_ops(&rt, NULL); + assert(batch == &a.base); + assert(rt.op_head == NULL && rt.op_tail == NULL); + + /* FIFO order in the returned list, all transitioned to IN_FLIGHT. */ + assert(batch == &a.base && batch->next == &b.base); + assert(batch->next->next == &c.base); + assert(batch->next->next->next == NULL); + assert(!xco_op_is_pending(&a.base)); + assert(!xco_op_is_pending(&b.base)); + assert(!xco_op_is_pending(&c.base)); + + /* Take from an empty list returns NULL. */ + assert(xco_rt_take_ops(&rt, NULL) == NULL); +} + +static void test_take_tail_out(void) { + /* The tail out-param lets the host splice the batch onto an + * in-flight list in O(1) without walking. */ + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t a, b, c; + read_op_init(&a, 1, NULL, 1); + read_op_init(&b, 2, NULL, 2); + read_op_init(&c, 3, NULL, 3); + xco_op_submit(&rt, &a.base); + xco_op_submit(&rt, &b.base); + xco_op_submit(&rt, &c.base); + + xco_op_t *tail = (xco_op_t *)0x1; /* sentinel to verify it gets written */ + xco_op_t *head = xco_rt_take_ops(&rt, &tail); + assert(head == &a.base); + assert(tail == &c.base); + + /* Empty take: tail_out written to NULL. */ + tail = (xco_op_t *)0x1; + head = xco_rt_take_ops(&rt, &tail); + assert(head == NULL); + assert(tail == NULL); +} + +/* ---- Awaiter: blocks on op->done.base, captures status --------- */ + +typedef struct { + xco_mach_t base; + xco_op_t *op; + xco_runtime_t *rt; + xco_waker_t sw; + int phase; + xco_op_status_t status; +} op_awaiter_t; + +static xco_step_result_t op_awaiter_step(xco_mach_t *s, uintptr_t v) { + op_awaiter_t *a = (op_awaiter_t *)s; + switch (a->phase) { + case 0: { + uintptr_t out; + if (xco_event_poll(&a->op->done.base, &out, NULL)) { + a->status = (xco_op_status_t)out; + a->phase = 2; + return (xco_step_result_t){out, XCO_STEP_DEAD}; + } + xco_waker_init(&a->sw, a->rt, &a->base); + xco_event_poll(&a->op->done.base, NULL, &a->sw.base); + a->phase = 1; + return (xco_step_result_t){0, XCO_STEP_SUSPENDED}; + } + case 1: + a->status = (xco_op_status_t)v; + a->phase = 2; + return (xco_step_result_t){v, XCO_STEP_DEAD}; + } + __builtin_unreachable(); +} + +static void op_awaiter_init(op_awaiter_t *a, xco_runtime_t *rt, xco_op_t *op) { + a->base = (xco_mach_t){.step = op_awaiter_step, .status = XCO_STEP_INIT}; + a->op = op; + a->rt = rt; + a->phase = 0; + a->status = XCO_OP_PENDING; +} + +/* ---- Complete after host take (the happy path) -------------------- */ + +static void test_complete_after_take(void) { + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t op; read_op_init(&op, 7, NULL, 32); + + xco_op_submit(&rt, &op.base); + op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base); + + /* Awaiter parks on the op's done latch. */ + xco_step(&a.base, 0); + assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED); + assert(op.base.done.waiters == &a.sw.base); + + /* Host pulls the batch, runs the op, fills result fields. */ + xco_op_t *batch = xco_rt_take_ops(&rt, NULL); + assert(batch == &op.base); + assert(!xco_op_is_pending(&op.base)); + assert(!op.base.done.set); /* still IN_FLIGHT */ + + op.result = 32; + op.err = 0; + xco_op_complete(&op.base, XCO_OP_COMPLETED); + + assert(op.base.done.set); + xco_rt_run(&rt, 0); + assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); + assert(a.status == XCO_OP_COMPLETED); + assert(op.result == 32); +} + +/* ---- Cancel while PENDING: splices, resolves CANCELLED ------------ */ + +static void test_cancel_while_pending(void) { + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t a, b, c; + read_op_init(&a, 1, NULL, 1); + read_op_init(&b, 2, NULL, 2); + read_op_init(&c, 3, NULL, 3); + xco_op_submit(&rt, &a.base); + xco_op_submit(&rt, &b.base); + xco_op_submit(&rt, &c.base); + + /* Cancel the middle: list links splice cleanly. */ + xco_op_cancel(&b.base); + assert(b.base.done.set); + assert(!xco_op_is_pending(&b.base)); + assert(b.base.prev == NULL && b.base.next == NULL); + + /* Resolved as CANCELLED. */ + uintptr_t v; + assert(xco_event_poll(&b.base.done.base, &v, NULL)); + assert(v == XCO_OP_CANCELLED); + + /* a and c are still on the list, in order. */ + assert(rt.op_head == &a.base && rt.op_tail == &c.base); + assert(a.base.next == &c.base && c.base.prev == &a.base); + + /* Cancel head and tail. */ + xco_op_cancel(&a.base); + assert(rt.op_head == &c.base && rt.op_tail == &c.base); + assert(c.base.prev == NULL); + xco_op_cancel(&c.base); + assert(rt.op_head == NULL && rt.op_tail == NULL); + assert(a.base.done.set && c.base.done.set); +} + +/* ---- Cancel while IN_FLIGHT is advisory --------------------------- */ + +static void test_cancel_while_in_flight(void) { + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t op; read_op_init(&op, 9, NULL, 64); + + xco_op_submit(&rt, &op.base); + (void)xco_rt_take_ops(&rt, NULL); /* host took it; now IN_FLIGHT */ + assert(!xco_op_is_pending(&op.base)); + + xco_op_cancel(&op.base); + /* Advisory only: done not fired, host sees cancel_requested. */ + assert(op.base.cancel_requested); + assert(!op.base.done.set); + + /* Host honors the cancel. */ + xco_op_complete(&op.base, XCO_OP_CANCELLED); + assert(op.base.done.set); + uintptr_t v; + assert(xco_event_poll(&op.base.done.base, &v, NULL)); + assert(v == XCO_OP_CANCELLED); +} + +static void test_cancel_in_flight_lost_race(void) { + /* Cancel after take, but the syscall already finished — host + * completes with COMPLETED instead. The awaiter sees whichever + * status the host chose; cancel_requested is just an advisory bit. */ + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t op; read_op_init(&op, 9, NULL, 64); + + xco_op_submit(&rt, &op.base); + (void)xco_rt_take_ops(&rt, NULL); + xco_op_cancel(&op.base); + assert(op.base.cancel_requested); + + op.result = 64; + xco_op_complete(&op.base, XCO_OP_COMPLETED); + uintptr_t v; + assert(xco_event_poll(&op.base.done.base, &v, NULL)); + assert(v == XCO_OP_COMPLETED); + assert(op.result == 64); +} + +/* ---- Idempotency -------------------------------------------------- */ + +static void test_complete_idempotent(void) { + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t op; read_op_init(&op, 1, NULL, 1); + xco_op_submit(&rt, &op.base); + (void)xco_rt_take_ops(&rt, NULL); + + xco_op_complete(&op.base, XCO_OP_COMPLETED); + /* Second complete is a no-op: latch is set, status doesn't change. */ + xco_op_complete(&op.base, XCO_OP_CANCELLED); + uintptr_t v; + assert(xco_event_poll(&op.base.done.base, &v, NULL)); + assert(v == XCO_OP_COMPLETED); +} + +static void test_cancel_after_resolved(void) { + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t op; read_op_init(&op, 1, NULL, 1); + xco_op_submit(&rt, &op.base); + (void)xco_rt_take_ops(&rt, NULL); + xco_op_complete(&op.base, XCO_OP_COMPLETED); + + /* Cancel after RESOLVED is a no-op; status stays COMPLETED. */ + xco_op_cancel(&op.base); + uintptr_t v; + assert(xco_event_poll(&op.base.done.base, &v, NULL)); + assert(v == XCO_OP_COMPLETED); +} + +/* ---- Cancel-while-PENDING wakes the awaiter ----------------------- */ + +static void test_cancel_pending_wakes_awaiter(void) { + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t op; read_op_init(&op, 1, NULL, 1); + xco_op_submit(&rt, &op.base); + + op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base); + xco_step(&a.base, 0); + assert(xco_mach_status(&a.base) == XCO_STEP_SUSPENDED); + + xco_op_cancel(&op.base); + /* PENDING-cancel resolves done immediately and enqueues the waker. */ + assert(op.base.done.set); + xco_rt_run(&rt, 0); + assert(xco_mach_status(&a.base) == XCO_STEP_DEAD); + assert(a.status == XCO_OP_CANCELLED); +} + +/* ---- Already-resolved: awaiter takes fast path ------------------- */ + +static void test_awaiter_fast_path(void) { + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t op; read_op_init(&op, 1, NULL, 1); + xco_op_submit(&rt, &op.base); + (void)xco_rt_take_ops(&rt, NULL); + xco_op_complete(&op.base, XCO_OP_COMPLETED); + + op_awaiter_t a; op_awaiter_init(&a, &rt, &op.base); + xco_step_result_t r = xco_step(&a.base, 0); + assert(r.status == XCO_STEP_DEAD); + assert(a.status == XCO_OP_COMPLETED); + assert(rt.head == NULL); /* no parking, no enqueue */ +} + +/* ---- Compose with wait_or_cancel: cooperative cancel pattern ----- */ + +static uintptr_t coop_body(xco_task_t *self, uintptr_t arg) { + xco_runtime_t *rt = (xco_runtime_t *)arg; + read_op_t op; read_op_init(&op, 11, NULL, 128); + xco_op_submit(rt, &op.base); + + uintptr_t status_v = 0; + bool ok = xco_await_or_cancel(rt, &op.base.done.base, + xco_task_cancel(self), &status_v); + if (!ok) { + /* Task cancel won the race; wind down the op cooperatively. */ + xco_op_cancel(&op.base); + status_v = xco_await(rt, &op.base.done.base); + } + return status_v; +} + +static alignas(XCO_STACK_ALIGN) unsigned char coop_stack[STACK_BYTES]; + +static void test_wait_or_cancel_op_wins(void) { + xco_runtime_t rt; xco_rt_init(&rt); + xco_cotask_t xt; + xco_cotask_spawn(&xt, coop_body, coop_stack, sizeof coop_stack, + (uintptr_t)&rt); + /* Body submitted an op and parked on a wait_or_cancel select. */ + assert(xco_mach_status(&xt.co.base) == XCO_STEP_SUSPENDED); + + xco_op_t *batch = xco_rt_take_ops(&rt, NULL); + assert(batch != NULL); + /* Host runs the op and reports completion. */ + xco_op_complete(batch, XCO_OP_COMPLETED); + + xco_rt_run(&rt, 0); + assert(xco_task_finished(&xt.task)); + uintptr_t v; + assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); + assert(v == XCO_OP_COMPLETED); +} + +static alignas(XCO_STACK_ALIGN) unsigned char coop_stack2[STACK_BYTES]; + +static void test_wait_or_cancel_task_cancel_pending(void) { + /* Task cancel fires while op is still PENDING. The body's + * xco_await_or_cancel returns false; the body then xco_op_cancels, + * which resolves done(CANCELLED) inline (PENDING path). */ + xco_runtime_t rt; xco_rt_init(&rt); + xco_cotask_t xt; + xco_cotask_spawn(&xt, coop_body, coop_stack2, sizeof coop_stack2, + (uintptr_t)&rt); + assert(xco_mach_status(&xt.co.base) == XCO_STEP_SUSPENDED); + /* Op is PENDING on the runtime. */ + assert(rt.op_head != NULL); + + xco_cancel_set(xco_task_cancel(&xt.task)); + xco_rt_run(&rt, 0); + assert(xco_task_finished(&xt.task)); + uintptr_t v; + assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); + assert(v == XCO_OP_CANCELLED); + /* Op was spliced from the pending list by the body's xco_op_cancel. */ + assert(rt.op_head == NULL && rt.op_tail == NULL); +} + +static alignas(XCO_STACK_ALIGN) unsigned char coop_stack3[STACK_BYTES]; + +static void test_wait_or_cancel_task_cancel_after_take(void) { + /* Task cancel after host has taken the op. Body xco_op_cancels; + * since op is IN_FLIGHT this just sets cancel_requested. Body then + * waits for the host's eventual completion. */ + xco_runtime_t rt; xco_rt_init(&rt); + xco_cotask_t xt; + xco_cotask_spawn(&xt, coop_body, coop_stack3, sizeof coop_stack3, + (uintptr_t)&rt); + xco_op_t *batch = xco_rt_take_ops(&rt, NULL); + assert(batch != NULL); + + /* Now cancel the task. */ + xco_cancel_set(xco_task_cancel(&xt.task)); + xco_rt_run(&rt, 0); + /* Body has wound back to xco_await on done; not yet finished. */ + assert(!xco_task_finished(&xt.task)); + assert(batch->cancel_requested); + + /* Host honors the cancel. */ + xco_op_complete(batch, XCO_OP_CANCELLED); + xco_rt_run(&rt, 0); + assert(xco_task_finished(&xt.task)); + uintptr_t v; + assert(xco_event_poll(xco_task_done_event(&xt.task), &v, NULL)); + assert(v == XCO_OP_CANCELLED); +} + +/* ---- Open kind tag space ----------------------------------------- */ + +static void test_open_kind_space(void) { + /* The runtime never inspects kind. Two op types share the pending + * list; the host pattern-matches and dispatches per-kind. */ + xco_runtime_t rt; xco_rt_init(&rt); + read_op_t r; read_op_init(&r, 1, NULL, 4); + typedef struct { xco_op_t base; const char *path; } stat_op_t; + stat_op_t s = { .base = { .kind = 99 }, .path = "/tmp/x" }; + + xco_op_submit(&rt, &r.base); + xco_op_submit(&rt, &s.base); + + int saw_read = 0, saw_stat = 0; + for (xco_op_t *o = xco_rt_take_ops(&rt, NULL); o; o = o->next) { + switch (o->kind) { + case OP_READ: saw_read++; xco_op_complete(o, XCO_OP_COMPLETED); break; + case 99: saw_stat++; xco_op_complete(o, XCO_OP_COMPLETED); break; + default: assert(0); + } + } + assert(saw_read == 1 && saw_stat == 1); + assert(r.base.done.set && s.base.done.set); +} + +int main(void) { + test_submit_then_take(); + test_take_tail_out(); + test_complete_after_take(); + test_cancel_while_pending(); + test_cancel_while_in_flight(); + test_cancel_in_flight_lost_race(); + test_complete_idempotent(); + test_cancel_after_resolved(); + test_cancel_pending_wakes_awaiter(); + test_awaiter_fast_path(); + test_wait_or_cancel_op_wins(); + test_wait_or_cancel_task_cancel_pending(); + test_wait_or_cancel_task_cancel_after_take(); + test_open_kind_space(); + printf("ok\n"); + return 0; +} diff --git a/xco.c b/xco.c @@ -1108,3 +1108,65 @@ void xco_cotask_init(xco_cotask_t *xt, xco_cotask_fn fn, xt->fn = fn; xco_init(&xt->co, xco_cotask_trampoline, stack_base, stack_len); } + +/* ==================================================================== + * xco_op — generic effect/IO layer + * + * The runtime owns a doubly-linked pending-ops list (op_head/op_tail). + * Submit appends; cancel-while-PENDING splices in O(1); take detaches + * the whole list in O(1) and lets the host iterate via the next pointers + * (which are then the host's to reuse for in-flight tracking). + * + * Resolution always goes through xco_latch_set on op->done; awaiters + * compose with the standard event API. Status is delivered as the latch + * payload. + * ==================================================================== */ + +void xco_op_submit(xco_runtime_t *rt, xco_op_t *op) { + xco_latch_init(&op->done); + op->cancel_requested = false; + op->rt = rt; + op->epoch = rt->op_epoch; /* matches → PENDING */ + op->next = NULL; + op->prev = rt->op_tail; + if (rt->op_tail) rt->op_tail->next = op; + else rt->op_head = op; + rt->op_tail = op; +} + +void xco_op_cancel(xco_op_t *op) { + if (op->done.set) return; /* RESOLVED — no-op */ + if (xco_op_is_pending(op)) { + /* PENDING: splice from rt's pending list and resolve as cancelled. */ + xco_runtime_t *rt = op->rt; + if (op->prev) op->prev->next = op->next; + else rt->op_head = op->next; + if (op->next) op->next->prev = op->prev; + else rt->op_tail = op->prev; + op->prev = op->next = NULL; + xco_latch_set(&op->done, (uintptr_t)XCO_OP_CANCELLED); + return; + } + /* IN_FLIGHT: advisory. Host sees cancel_requested and decides whether + * to honor it; xco_op_complete is still the final word either way. */ + op->cancel_requested = true; +} + +void xco_op_complete(xco_op_t *op, xco_op_status_t status) { + /* xco_latch_set is idempotent; second/late completion is a no-op. + * Per the contract, host calls this only after take, so op is not + * on rt's pending list. */ + xco_latch_set(&op->done, (uintptr_t)status); +} + +xco_op_t *xco_rt_take_ops(xco_runtime_t *rt, xco_op_t **tail_out) { + xco_op_t *head = rt->op_head; + if (tail_out) *tail_out = rt->op_tail; + rt->op_head = rt->op_tail = NULL; + /* Bump the epoch: every op currently on the (now-detached) batch had + * its epoch field set to the old value at submit, so they all flip to + * IN_FLIGHT in O(1). Submits after this point land in the new + * generation. */ + rt->op_epoch++; + return head; +} diff --git a/xco.h b/xco.h @@ -221,14 +221,24 @@ static inline void xco_event_unpark(xco_event_t *e, xco_waiter_t *w) { e->vt->un * Defined in the timer section below. */ typedef struct xco_timers xco_timers_t; +typedef struct xco_op xco_op_t; + typedef struct xco_runtime { xco_waiter_t *head, *tail; xco_timers_t *timers; /* optional; advanced inside xco_rt_run */ + /* Pending ops list (xco_op layer). The runtime never inspects ops; it + * only owns this list-head pair so the host can pull a batch via + * xco_rt_take_ops. op_epoch is bumped each take and recorded on each + * submit; PENDING vs IN_FLIGHT is a generation match (see xco_op). */ + xco_op_t *op_head, *op_tail; + uint64_t op_epoch; } xco_runtime_t; static inline void xco_rt_init(xco_runtime_t *rt) { - rt->head = rt->tail = NULL; - rt->timers = NULL; + rt->head = rt->tail = NULL; + rt->timers = NULL; + rt->op_head = rt->op_tail = NULL; + rt->op_epoch = 0; } /* Attach (or detach with NULL) a timer source. While attached, xco_rt_run @@ -1171,4 +1181,83 @@ static inline xco_step_result_t xco_cotask_spawn(xco_cotask_t *xt, xco_cotask_fn return xco_step(&xt->co.base, arg); } +/* ==================================================================== + * xco_op — generic effect/IO layer. + * + * Coroutines submit ops describing work to do; the runtime accumulates + * them on a pending list; after xco_rt_run quiesces, the host pulls the + * batch via xco_rt_take_ops, executes them however it likes (io_uring, + * threads, mocks, replay), and injects completions back via + * xco_op_complete. The xco library itself reads no clocks and makes no + * syscalls — that property extends to IO via this layer. + * + * An op is just an event with a side-channel describing what to do. + * Embed xco_op_t as the first member of a kind-specific payload struct; + * the host pattern-matches on `kind` (an open tag space — the runtime + * never inspects it). + * + * State machine: + * PENDING op->epoch == op->rt->op_epoch, done unset (on rt's list) + * IN_FLIGHT op->epoch != op->rt->op_epoch, done unset (host has taken) + * RESOLVED done.set (terminal) + * + * The epoch counter on the runtime is bumped each xco_rt_take_ops, so + * "still on the pending list" is a generation match — submit + take are + * both O(1) and the runtime never walks the batch. + * + * Threading: single-threaded, same as the rest of xco. submit, cancel, + * and complete must all be called on the runtime's thread. + * ==================================================================== */ + +typedef enum { + XCO_OP_PENDING, /* not used as a fire payload, but conceptual */ + XCO_OP_COMPLETED, /* host finished; result lives in embedder fields */ + XCO_OP_CANCELLED, /* resolved without a real result */ +} xco_op_status_t; + +struct xco_op { + xco_latch_t done; /* fires once, payload = status */ + xco_op_t *next, *prev; /* intrusive on rt's pending list */ + xco_runtime_t *rt; /* set on submit; stays set after take */ + uint64_t epoch; /* matches rt->op_epoch iff PENDING */ + uint32_t kind; /* open tag space; host pattern-matches */ + bool cancel_requested; /* advisory to host post-take */ +}; + +/* True iff op is still on rt's current pending batch (i.e. PENDING). + * False for IN_FLIGHT or RESOLVED. O(1). */ +static inline bool xco_op_is_pending(const xco_op_t *op) { + return op->rt && op->epoch == op->rt->op_epoch && !op->done.set; +} + +/* Submit op to rt's pending list. Initializes done, clears + * cancel_requested, and sets op->rt = rt. The caller pre-populates op->kind + * and any embedder fields (fd, buf, len, ...) before submit. After submit + * the awaiter typically waits on &op->done.base — composes with select, + * xco_wait_or_cancel, etc. */ +void xco_op_submit (xco_runtime_t *rt, xco_op_t *op); + +/* Request cancellation. Behavior depends on state: + * PENDING splice from rt list, fire done(CANCELLED). + * IN_FLIGHT set cancel_requested=true (advisory; host decides). + * RESOLVED no-op. + * Idempotent. */ +void xco_op_cancel (xco_op_t *op); + +/* Host's final word for an op it took via xco_rt_take_ops. Fires done + * with the given status. Idempotent (the latch is). */ +void xco_op_complete(xco_op_t *op, xco_op_status_t status); + +/* Detach the runtime's pending ops list and return its head. Bumps the + * runtime's op_epoch, transitioning the whole batch to IN_FLIGHT in O(1) + * (no walk — each op's stored epoch now no longer matches). The host + * owns the returned list — the next/prev fields are the host's to reuse + * however it wants for in-flight tracking. Returns NULL if the list is + * empty. + * + * If tail_out is non-NULL, *tail_out receives the list's tail (or NULL + * for an empty list) — handy for splicing the batch onto a host-side + * in-flight list in O(1) without walking. */ +xco_op_t *xco_rt_take_ops(xco_runtime_t *rt, xco_op_t **tail_out); + #endif /* XCO_H */