From f2809e72f6d7f878e70a1b173da584e4dcbd4ac8 Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Thu, 2 Jul 2026 12:54:14 -0600 Subject: [PATCH] docs: state Capy positioning, execution invariant, and interop escape hatch Addresses #341 and #349: the docs never stated capy's value proposition, its core execution invariant, or its foreign-awaitable escape hatch. Documentation: - index.adoc: add "What Capy Is / Is Not" positioning (protocol + reference implementation; the IoAwaitable interop dimension) and an up-front callout of the same-executor invariant. - 4c.executors.adoc: new "The Same-Executor Invariant" section stating the rule, its rationale, and how affinity/run maintain it. - 4d.io-awaitable.adoc: frame IoAwaitable as an interop vocabulary and add "Bridging a Foreign Awaitable" documenting the escape hatch (and why there is no universal auto-bridge). - Two example pages (8o sender bridge, 8p Asio use_capy) plus nav entries. - Clean up stale xrefs to the removed dynamic-buffer pages. - Update the IoAwaitable paper reference from draft D4003 to published P4003. Examples (fix bit-rot from the continuation refactor so the referenced interop examples build and run again): - use_capy.hpp / uni_stream.hpp / capy_streams.cpp: post a continuation through the executor instead of a raw coroutine_handle. - use_capy_example.cpp / any_stream.cpp: writer/reader return io_task<> to satisfy when_all's io_result requirement. --- doc/modules/ROOT/nav.adoc | 2 + .../ROOT/pages/4.coroutines/4c.executors.adoc | 32 ++++++ .../pages/4.coroutines/4d.io-awaitable.adoc | 69 +++++++++++ .../ROOT/pages/5.buffers/5e.algorithms.adoc | 2 +- .../pages/8.examples/8g.parallel-fetch.adoc | 4 - .../8.examples/8i.echo-server-corosio.adoc | 1 - .../pages/8.examples/8o.sender-bridge.adoc | 107 ++++++++++++++++++ .../pages/8.examples/8p.asio-use-capy.adoc | 91 +++++++++++++++ .../ROOT/pages/9.design/9l.RunApi.adoc | 2 +- .../ROOT/pages/9.design/9m.WhyNotCobalt.adoc | 2 +- doc/modules/ROOT/pages/index.adoc | 22 ++++ example/asio/any_stream.cpp | 18 ++- example/asio/api/capy_streams.cpp | 4 +- example/asio/api/uni_stream.hpp | 4 +- example/asio/api/use_capy.hpp | 13 ++- example/asio/use_capy_example.cpp | 24 ++-- 16 files changed, 369 insertions(+), 28 deletions(-) create mode 100644 doc/modules/ROOT/pages/8.examples/8o.sender-bridge.adoc create mode 100644 doc/modules/ROOT/pages/8.examples/8p.asio-use-capy.adoc diff --git a/doc/modules/ROOT/nav.adoc b/doc/modules/ROOT/nav.adoc index 4437e2451..573ebb261 100644 --- a/doc/modules/ROOT/nav.adoc +++ b/doc/modules/ROOT/nav.adoc @@ -53,6 +53,8 @@ ** xref:8.examples/8l.async-mutex.adoc[Async Mutex] ** xref:8.examples/8m.parallel-tasks.adoc[Parallel Tasks] ** xref:8.examples/8n.custom-executor.adoc[Custom Executor] +** xref:8.examples/8o.sender-bridge.adoc[Bridging a P2300 Sender] +** xref:8.examples/8p.asio-use-capy.adoc[Calling Asio from a Capy Coroutine] * xref:9.design/9.intro.adoc[Design] ** xref:9.design/9a.CapyLayering.adoc[Layered Abstractions] ** xref:9.design/9b.Separation.adoc[Why Capy Is Separate] diff --git a/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc b/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc index d84aea100..ccbce2879 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc @@ -7,6 +7,38 @@ This section explains executors and execution contexts—the mechanisms that con * Completed xref:4.coroutines/4b.launching.adoc[Launching Coroutines] * Understanding of `run_async` and `run` +[#the-same-executor-invariant] +== The Same-Executor Invariant + +Capy enforces one rule above all others: + +*A coroutine always resumes on the executor it was launched with.* + +This rule is what keeps shared state safe by default. Consider a connection handler launched on a strand: + +[source,cpp] +---- +task handle_client(connection& conn) +{ + auto req = co_await conn.read(); + auto resp = process(req); + co_await conn.write(resp); + conn.stats.requests++; +} +---- + +Launch this on a strand, and every resumption—after `conn.read()` and after `conn.write()`—happens on that strand. The update to `conn.stats.requests` is therefore free of data races without any mutex. + +Without the invariant, the coroutine could resume after `co_await conn.read()` on an `io_uring` completion thread, a pool thread, or wherever the I/O subsystem completed the operation. Correct code would then need either a mutex around every access to shared state or an explicit _resume-on-this-strand_ step after every `co_await`. A mutex defeats the purpose of the strand, and a single forgotten resume step reintroduces the data race. + +Because the invariant holds, the safe behavior is automatic and the unsafe behavior does not compile: awaiting a _plain_ awaitable—one that could resume the coroutine on any thread—is rejected. See xref:4.coroutines/4d.io-awaitable.adoc#bridging-a-foreign-awaitable[Bridging a Foreign Awaitable] for why, and for the explicit escape hatch. + +=== How the Invariant Is Maintained + +Affinity propagates forward. Launching a task with `run_async(ex)` binds it to `ex`; a child `co_await`-ed from that task inherits the same executor automatically, and so on down the chain. When a child completes, control returns to its caller _through the caller's executor_. When both share the same executor—the common case—that return is a direct symmetric transfer with no queuing; only a deliberate executor change requires a dispatch. + +That deliberate change is what `run` provides: it runs a subtree on a different executor and restores the caller's executor when the subtree completes (see xref:4.coroutines/4b.launching.adoc[Launching Coroutines]). This is also why I/O objects with executor-bound invariants—a socket tied to one `io_context`, a Windows IOCP handle, executor-specific timer state—remain safe: a coroutine holding them never resumes on the wrong executor mid-body. + == The Executor Concept An *executor* is an object that can schedule work for execution. An executor must be nothrow copy- and move-constructible and provide the following interface: diff --git a/doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc b/doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc index cc49ce6e5..c306f425e 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc @@ -117,6 +117,12 @@ Forward propagation offers several advantages: This design enables Capy's type-erased wrappers (`any_stream`, etc.) to work without knowing the concrete executor type. +== A Vocabulary for Coroutine Interop + +Because the protocol is just a two-argument `await_suspend`, `IoAwaitable` is more than an internal mechanism—it is a _vocabulary type_ for interoperation. Any coroutine library that speaks the protocol can propagate execution environment across a `co_await` boundary without knowing the other side's concrete task type. + +This is the interop problem framed on the xref:index.adoc[home page]: a shared protocol replaces the set of pairwise adapters that separate coroutine libraries would otherwise need to talk to one another. Capy is the reference implementation of that protocol. + == Implementing Custom IoAwaitables To create a custom IoAwaitable: @@ -222,6 +228,69 @@ See xref:4.coroutines/4e.cancellation.adoc#stoppable-awaitables[Implementing Sto For a production implementation of this exact pattern, read the source of `delay_awaitable` (xref:reference:boost/capy/delay_awaitable.adoc[`delay_awaitable`]): it schedules a timer, registers a stop callback that posts the resume through the executor, and arbitrates between the timer and cancellation with a single atomic claim. +[#bridging-a-foreign-awaitable] +== Bridging a Foreign Awaitable + +What happens when you `co_await` an awaitable that does _not_ implement the protocol—a "plain" awaitable from another library, with the standard one-argument `await_suspend`? + +Capy rejects it at compile time: + +[source,cpp] +---- +// In task.hpp, when the awaited type is not an IoAwaitable: +static_assert(sizeof(A) == 0, "requires IoAwaitable"); +---- + +This is intentional. A plain awaitable receives only the coroutine handle; it can resume the coroutine on any thread by calling `handle.resume()` directly. That silently breaks the xref:4.coroutines/4c.executors.adoc#the-same-executor-invariant[same-executor invariant]—the coroutine could wake on a foreign completion thread, leaving shared state you believed was strand-protected exposed to races. Rejecting such an awaitable at compile time prevents that. The constraint does not lock you in; it requires environment propagation to be explicit rather than silently dropped. + +The escape hatch is to wrap the foreign awaitable (or callback, or future) in a small `IoAwaitable` that captures the executor and re-posts the resumption through it: + +[source,cpp] +---- +// Bridge a foreign async operation into a Capy coroutine. +struct foreign_bridge +{ + io_env const* env_ = nullptr; + continuation cont_; + result_type result_; + + bool await_ready() const noexcept { return false; } + + std::coroutine_handle<> await_suspend( + std::coroutine_handle<> h, io_env const* env) + { + env_ = env; + cont_.h = h; // stable address; the executor links continuations intrusively + + // Start the foreign operation. Its completion callback may run on + // ANY thread, so it must not resume h directly. Instead it posts + // the continuation back through the caller's executor, restoring + // the same-executor invariant. + start_foreign_op([this]() noexcept { + store_result(); + env_->executor.post(cont_); + }); + + return std::noop_coroutine(); + } + + result_type await_resume() { return std::move(result_); } +}; +---- + +The single rule that makes any bridge correct: *on completion, post through `env->executor` instead of resuming the handle inline.* + +=== There Is No Universal Bridge + +Capy does not provide a generic `co_await foreign_awaitable(x)` that adapts _arbitrary_ awaitables automatically. Such an adapter cannot work in the general case: it has no way to know how a foreign runtime schedules its completions, so it cannot guarantee the invariant. A shared protocol addresses this where a hidden adapter cannot. A small, explicit bridge per foreign runtime keeps the guarantee intact and the cost visible. + +=== Worked Bridges + +Two complete, buildable bridges live in the examples: + +* xref:8.examples/8o.sender-bridge.adoc[Bridging a P2300 Sender] — `await_sender` adapts a `std::execution` sender, mapping its completion channels onto `io_result` and posting the resumption through the executor. +* xref:8.examples/8p.asio-use-capy.adoc[Calling Asio from a Capy Coroutine] — the `use_capy` completion token turns any Asio async operation into an `IoAwaitable`. + == Reference [cols="1,3"] diff --git a/doc/modules/ROOT/pages/5.buffers/5e.algorithms.adoc b/doc/modules/ROOT/pages/5.buffers/5e.algorithms.adoc index 96d0a1f25..dc52b70f6 100644 --- a/doc/modules/ROOT/pages/5.buffers/5e.algorithms.adoc +++ b/doc/modules/ROOT/pages/5.buffers/5e.algorithms.adoc @@ -260,4 +260,4 @@ public: | Copy algorithm |=== -You have now learned how to measure and copy buffer sequences. Continue to xref:5.buffers/5f.dynamic.adoc[Dynamic Buffers] to learn about growable buffer storage. +You have now learned how to measure and copy buffer sequences. Continue to xref:6.streams/6.intro.adoc[Stream Concepts] to learn how coroutines transfer data through streams. diff --git a/doc/modules/ROOT/pages/8.examples/8g.parallel-fetch.adoc b/doc/modules/ROOT/pages/8.examples/8g.parallel-fetch.adoc index 9efdc0492..98513889a 100644 --- a/doc/modules/ROOT/pages/8.examples/8g.parallel-fetch.adoc +++ b/doc/modules/ROOT/pages/8.examples/8g.parallel-fetch.adoc @@ -255,7 +255,3 @@ Caught error: B failed! 1. Add timing to see the parallel speedup vs sequential execution 2. Implement a "fan-out/fan-in" pattern that processes a list of items in parallel 3. Add cancellation support so remaining tasks can exit early on error - -== Next Steps - -* xref:8.examples/8h.custom-dynamic-buffer.adoc[Custom Dynamic Buffer] — Implementing your own buffer diff --git a/doc/modules/ROOT/pages/8.examples/8i.echo-server-corosio.adoc b/doc/modules/ROOT/pages/8.examples/8i.echo-server-corosio.adoc index e4c073d27..a6ca865fd 100644 --- a/doc/modules/ROOT/pages/8.examples/8i.echo-server-corosio.adoc +++ b/doc/modules/ROOT/pages/8.examples/8i.echo-server-corosio.adoc @@ -10,7 +10,6 @@ A complete echo server using Corosio for real network I/O. == Prerequisites -* Completed xref:8.examples/8h.custom-dynamic-buffer.adoc[Custom Dynamic Buffer] * Corosio library installed * Understanding of TCP networking basics diff --git a/doc/modules/ROOT/pages/8.examples/8o.sender-bridge.adoc b/doc/modules/ROOT/pages/8.examples/8o.sender-bridge.adoc new file mode 100644 index 000000000..3f42129a8 --- /dev/null +++ b/doc/modules/ROOT/pages/8.examples/8o.sender-bridge.adoc @@ -0,0 +1,107 @@ += Bridging a P2300 Sender + +Awaiting a `std::execution` (P2300) sender from inside a Capy coroutine. + +== What You Will Learn + +* How to `co_await` a foreign awaitable that does not implement the xref:4.coroutines/4d.io-awaitable.adoc[IoAwaitable protocol] +* How a bridge restores the xref:4.coroutines/4c.executors.adoc#the-same-executor-invariant[same-executor invariant] by posting the resumption through the caller's executor +* How a sender's value and error completion channels map onto `io_result` + +== Prerequisites + +* Completed xref:4.coroutines/4d.io-awaitable.adoc[The IoAwaitable Protocol] +* A `std::execution` implementation (this example uses https://github.com/bemanproject/execution[beman.execution]) + +[NOTE] +==== +This example is built only when `BOOST_CAPY_BUILD_P2300_EXAMPLES=ON` (it requires C++23 and fetches `beman.execution`). The full source is in `example/sender-bridge/`. +==== + +== The Bridge + +A P2300 sender is a foreign awaitable: it has no two-argument `await_suspend`, so a Capy `task` cannot `co_await` it directly. `await_sender` wraps it in an `IoAwaitable`. The key move is in the bridge's _receiver_: when the sender completes—on whatever thread its scheduler chose—it does not resume the coroutine inline. It stores the result and posts the continuation back through the caller's executor. + +[source,cpp] +---- +// From example/sender-bridge/sender_awaitable.hpp (abridged): +template +void set_value(Args&&... args) && noexcept +{ + result_->template emplace<1>(std::forward(args)...); + env_->executor.post(cont_); // resume on the caller's executor +} +---- + +`await_sender` inspects the sender's completion signatures. If the sender can complete with `set_error(std::error_code)`, the bridge yields `io_result` so the error stays a value rather than an exception; otherwise it yields the value directly. + +== Source Code + +[source,cpp] +---- +#include "sender_awaitable.hpp" + +#include +#include + +#include +#include +#include + +namespace capy = boost::capy; +namespace ex = beman::execution; + +capy::task compute(auto sched) +{ + int result = co_await capy::await_sender( + ex::schedule(sched) + | ex::then([] { + std::cout << " sender running on thread " + << std::this_thread::get_id() << "\n"; + return 42 * 42; + })); + + std::cout << " coroutine resumed on thread " + << std::this_thread::get_id() << "\n"; + + co_return result; +} + +int main() +{ + std::cout << "main thread: " << std::this_thread::get_id() << "\n"; + + capy::thread_pool pool; // Capy execution context + + ex::run_loop loop; // Beman context on its own thread + std::jthread loop_thread([&loop] { loop.run(); }); + auto sched = loop.get_scheduler(); + + std::latch done(1); + int answer = 0; + auto on_complete = [&](int v) { answer = v; done.count_down(); }; + auto on_error = [&](std::exception_ptr) { done.count_down(); }; + + capy::run_async(pool.get_executor(), on_complete, on_error)(compute(sched)); + + done.wait(); + loop.finish(); + std::cout << "result: " << answer << "\n"; +} +---- + +== Output + +The sender runs on the `run_loop` thread, but the coroutine resumes on the `thread_pool` executor it was launched with—the invariant holds across the bridge: + +---- +main thread: 139667952822976 + sender running on thread 139667946014400 + coroutine resumed on thread 139667920406208 +result: 1764 +---- + +== See Also + +* xref:4.coroutines/4d.io-awaitable.adoc#bridging-a-foreign-awaitable[Bridging a Foreign Awaitable] — the general pattern and why there is no universal bridge +* xref:8.examples/8p.asio-use-capy.adoc[Calling Asio from a Capy Coroutine] — the same technique for Asio operations diff --git a/doc/modules/ROOT/pages/8.examples/8p.asio-use-capy.adoc b/doc/modules/ROOT/pages/8.examples/8p.asio-use-capy.adoc new file mode 100644 index 000000000..fda96ecd4 --- /dev/null +++ b/doc/modules/ROOT/pages/8.examples/8p.asio-use-capy.adoc @@ -0,0 +1,91 @@ += Calling Asio from a Capy Coroutine + +Using Boost.Asio async operations directly inside a Capy coroutine through a `use_capy` completion token. + +== What You Will Learn + +* How a completion token adapts _any_ Asio async operation into an xref:4.coroutines/4d.io-awaitable.adoc[IoAwaitable] +* How the token bridges `std::stop_token` to Asio's cancellation slot +* How the bridge preserves the xref:4.coroutines/4c.executors.adoc#the-same-executor-invariant[same-executor invariant] + +== Prerequisites + +* Completed xref:4.coroutines/4d.io-awaitable.adoc[The IoAwaitable Protocol] +* Boost.Asio available to the build + +[NOTE] +==== +This example is built when a `Boost::asio` target is available. The full source is in `example/asio/` (`use_capy_example.cpp` and the reusable token in `api/use_capy.hpp`). +==== + +== The Completion Token + +Asio async operations accept a _completion token_ that decides what the call returns. `use_capy` is a token whose `async_result` returns an `IoAwaitable`. When co-awaited, its `await_suspend` starts the Asio operation and arranges the completion handler to post the resumption through the caller's executor—so the coroutine resumes on the executor it was launched with, never on whatever thread Asio completed on: + +[source,cpp] +---- +// From example/asio/api/use_capy.hpp (abridged): +std::coroutine_handle<> await_suspend( + std::coroutine_handle<> h, capy::io_env const* env) +{ + cancel_ = std::make_shared(env->stop_token); + + cont_.h = h; // stable address; posted through the executor on completion + auto handler = [this, ex = env->executor](Args... args) mutable + { + store_result(std::move(args)...); + ex.post(cont_); + }; + + std::move(op_)(net::bind_cancellation_slot( + cancel_->signal.slot(), std::move(handler))); + + return std::noop_coroutine(); +} +---- + +== Using the Token + +Pass `use_capy` where an Asio operation expects a completion token. The `co_await` yields an `io_result` carrying the error code and the operation's result: + +[source,cpp] +---- +capy::io_task<> +writer(net::ip::tcp::socket& socket, std::size_t total) +{ + char buf[128]; + std::memset(buf, 'X', sizeof(buf)); + + std::size_t written = 0; + while (written < total) + { + std::size_t chunk = (std::min)(sizeof(buf), total - written); + + auto [ec, n] = co_await socket.async_write_some( + net::buffer(buf, chunk), use_capy); + + if (ec) + co_return capy::io_result<>{ec}; + written += n; + } + co_return capy::io_result<>{}; +} +---- + +The reader is symmetric, using `async_read_some`. `run_example` drives both concurrently over a connected socket pair with `when_all`, then reports completion. + +== Output + +---- +writer: wrote 128 bytes (total 128) +reader: read 128 bytes (total 128) +... +writer: done, wrote 1024 bytes +reader: done, read 1024 bytes +example complete! +---- + +== See Also + +* xref:4.coroutines/4d.io-awaitable.adoc#bridging-a-foreign-awaitable[Bridging a Foreign Awaitable] — the general pattern +* xref:8.examples/8o.sender-bridge.adoc[Bridging a P2300 Sender] — the same technique for `std::execution` senders diff --git a/doc/modules/ROOT/pages/9.design/9l.RunApi.adoc b/doc/modules/ROOT/pages/9.design/9l.RunApi.adoc index 17ab64484..e18a805f5 100644 --- a/doc/modules/ROOT/pages/9.design/9l.RunApi.adoc +++ b/doc/modules/ROOT/pages/9.design/9l.RunApi.adoc @@ -232,7 +232,7 @@ Coroutine frame allocation happens _before_ the coroutine body executes. When th Any mechanism that injects the allocator _after_ the call -- receiver queries, `await_transform`, explicit method calls -- arrives too late. The frame is already allocated. -This is the fundamental tension identified in D4003 section 3.3: +This is the fundamental tension identified in P4003 section 3.3: [quote] ____ diff --git a/doc/modules/ROOT/pages/9.design/9m.WhyNotCobalt.adoc b/doc/modules/ROOT/pages/9.design/9m.WhyNotCobalt.adoc index d563c086b..2c8e6d194 100644 --- a/doc/modules/ROOT/pages/9.design/9m.WhyNotCobalt.adoc +++ b/doc/modules/ROOT/pages/9.design/9m.WhyNotCobalt.adoc @@ -342,7 +342,7 @@ Capy supports multi-threaded execution. `thread_pool` distributes work across th Cobalt stores executor context in thread-local variables. Coroutines access it via `this_coro::executor`. This works on a single thread with a single executor. This design is scoped to single-threaded, single-executor configurations. -Capy introduces the https://github.com/cppalliance/wg21-papers/blob/master/source/d4003-io-awaitables.md[IoAwaitable protocol] and uses it for context propagation. When you `co_await`, the caller passes its execution environment to the child structurally: +Capy introduces the https://wg21.link/P4003[IoAwaitable protocol] and uses it for context propagation. When you `co_await`, the caller passes its execution environment to the child structurally: [source,cpp] ---- diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index a2559acb5..1faed2244 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -2,6 +2,28 @@ Capy abstracts away sockets, files, and asynchrony with type-erased streams and buffer sequences—code compiles fast because the implementation is hidden. It provides the framework for concurrent algorithms that transact in buffers of memory: networking, serial ports, console, timers, and any platform I/O. This is only possible because Capy is coroutine-only, enabling optimizations and ergonomics that hybrid approaches must sacrifice. +== What Capy Is + +Capy is two things at once: + +* *A protocol.* `IoAwaitable` is a protocol for propagating a coroutine's _execution environment_—its executor, stop token, and allocator—forward through `co_await` chains. This is the vocabulary that lets awaitable-based coroutine libraries interoperate. +* *A reference implementation.* A concrete library—thread pool, task types, byte streams, buffer sequences, synchronization primitives—that proves the protocol works in practice. + +The protocol is the smaller, more general library living inside Capy. Without a shared protocol, _N_ coroutine libraries need _N_×(_N_−1) adapters to interoperate; with one shared protocol for environment propagation, a single bridge covers everyone. This is the role `IoAwaitable` plays. + +[IMPORTANT] +==== +*The core invariant: a coroutine always resumes on the executor it was launched with.* + +Launch a coroutine on a strand, and every resumption—after every `co_await`—happens on that strand. Shared state touched between suspension points is free of data races without a mutex. A _plain_ awaitable can resume a coroutine on any thread and would break this guarantee, so Capy rejects it at compile time and provides an explicit way to bridge such awaitables when you need one. + +See xref:4.coroutines/4c.executors.adoc#the-same-executor-invariant[the same-executor invariant] for the rationale and xref:4.coroutines/4d.io-awaitable.adoc#bridging-a-foreign-awaitable[bridging a foreign awaitable] for the escape hatch. +==== + +== What Capy Is Not + +Capy is _not_ an all-purpose coroutine framework, and it is _not_ an implementation detail of Corosio. It is the execution model and byte-stream layer—usable standalone for logic that operates on streams without any platform I/O (HTTP parsing, protocol state machines, serialization), and usable as the foundation for Corosio's networking layer. CERN's traccc project uses Capy without Corosio for GPU reconstruction pipelines; the Boost.HTTP parser is built entirely on Capy's byte streams. + == What This Library Does * *Lazy coroutine tasks* — `task` with forward-propagating stop tokens and automatic cancellation diff --git a/example/asio/any_stream.cpp b/example/asio/any_stream.cpp index 7d2c84f99..3d28bb94c 100644 --- a/example/asio/any_stream.cpp +++ b/example/asio/any_stream.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -22,7 +23,7 @@ constexpr std::size_t total_bytes = 1024; -capy::task<> +capy::io_task<> writer( capy::any_stream& stream, std::size_t total) @@ -38,15 +39,16 @@ writer( if(ec) { std::printf("writer error: %s\n", ec.message().c_str()); - co_return; + co_return capy::io_result<>{ec}; } written += n; std::printf("writer: wrote %zu bytes (total %zu)\n", n, written); } std::printf("writer: done, wrote %zu bytes\n", written); + co_return capy::io_result<>{}; } -capy::task<> +capy::io_task<> reader( capy::any_stream& stream, std::size_t total) @@ -60,12 +62,13 @@ reader( if(ec) { std::printf("reader error: %s\n", ec.message().c_str()); - co_return; + co_return capy::io_result<>{ec}; } read_total += n; std::printf("reader: read %zu bytes (total %zu)\n", n, read_total); } std::printf("reader: done, read %zu bytes\n", read_total); + co_return capy::io_result<>{}; } capy::task<> @@ -73,11 +76,14 @@ run_example( capy::any_stream& client_stream, capy::any_stream& server_stream) { - co_await capy::when_all( + auto r = co_await capy::when_all( writer(client_stream, total_bytes), reader(server_stream, total_bytes)); - std::printf("example complete!\n"); + if(r.ec) + std::printf("example error: %s\n", r.ec.message().c_str()); + else + std::printf("example complete!\n"); } int main() diff --git a/example/asio/api/capy_streams.cpp b/example/asio/api/capy_streams.cpp index 712cb7c95..af35f7808 100644 --- a/example/asio/api/capy_streams.cpp +++ b/example/asio/api/capy_streams.cpp @@ -238,14 +238,14 @@ class asio_executor void on_work_started() const noexcept {} void on_work_finished() const noexcept {} - std::coroutine_handle<> dispatch(continuation& c) const + std::coroutine_handle<> dispatch(capy::continuation& c) const { auto h = c.h; net::post(ex_, [h]{ h.resume(); }); return std::noop_coroutine(); } - void post(continuation& c) const + void post(capy::continuation& c) const { auto h = c.h; net::post(ex_, [h]{ h.resume(); }); diff --git a/example/asio/api/uni_stream.hpp b/example/asio/api/uni_stream.hpp index 93964cde1..80202821c 100644 --- a/example/asio/api/uni_stream.hpp +++ b/example/asio/api/uni_stream.hpp @@ -55,14 +55,14 @@ class asio_executor_wrapper void on_work_started() const noexcept {} void on_work_finished() const noexcept {} - std::coroutine_handle<> dispatch(continuation& c) const + std::coroutine_handle<> dispatch(capy::continuation& c) const { auto h = c.h; net::post(ex_, [h]{ h.resume(); }); return std::noop_coroutine(); } - void post(continuation& c) const + void post(capy::continuation& c) const { auto h = c.h; net::post(ex_, [h]{ h.resume(); }); diff --git a/example/asio/api/use_capy.hpp b/example/asio/api/use_capy.hpp index bc227a767..a2b3f19f0 100644 --- a/example/asio/api/use_capy.hpp +++ b/example/asio/api/use_capy.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -110,6 +111,7 @@ class capy_awaitable DeferredOp op_; result_type result_; std::shared_ptr cancel_; + capy::continuation cont_; public: explicit capy_awaitable(DeferredOp op) @@ -128,10 +130,17 @@ class capy_awaitable { cancel_ = std::make_shared(env->stop_token); - auto handler = [this, h, ex = env->executor](Args... args) mutable + // Resume the caller on its executor when the Asio operation + // completes. The completion handler may run on any thread, so we + // post the continuation through the executor rather than resuming + // inline; this restores the same-executor invariant. cont_ is a + // member, giving the continuation a stable address until the + // executor dequeues and resumes it. + cont_.h = h; + auto handler = [this, ex = env->executor](Args... args) mutable { store_result(std::move(args)...); - ex.post(h); + ex.post(cont_); }; std::move(op_)( diff --git a/example/asio/use_capy_example.cpp b/example/asio/use_capy_example.cpp index d861da282..93af499ac 100644 --- a/example/asio/use_capy_example.cpp +++ b/example/asio/use_capy_example.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -24,8 +25,9 @@ constexpr std::size_t total_bytes = 1024; -// Writer coroutine using use_capy with raw Asio socket -capy::task<> +// Writer coroutine using use_capy with raw Asio socket. +// Returns io_task<> so it can be composed with when_all. +capy::io_task<> writer( net::ip::tcp::socket& socket, std::size_t total) @@ -45,16 +47,18 @@ writer( if (ec) { std::printf("writer error: %s\n", ec.message().c_str()); - co_return; + co_return capy::io_result<>{ec}; } written += n; std::printf("writer: wrote %zu bytes (total %zu)\n", n, written); } std::printf("writer: done, wrote %zu bytes\n", written); + co_return capy::io_result<>{}; } -// Reader coroutine using use_capy with raw Asio socket -capy::task<> +// Reader coroutine using use_capy with raw Asio socket. +// Returns io_task<> so it can be composed with when_all. +capy::io_task<> reader( net::ip::tcp::socket& socket, std::size_t total) @@ -71,12 +75,13 @@ reader( if (ec) { std::printf("reader error: %s\n", ec.message().c_str()); - co_return; + co_return capy::io_result<>{ec}; } read_total += n; std::printf("reader: read %zu bytes (total %zu)\n", n, read_total); } std::printf("reader: done, read %zu bytes\n", read_total); + co_return capy::io_result<>{}; } capy::task<> @@ -84,11 +89,14 @@ run_example( net::ip::tcp::socket& client, net::ip::tcp::socket& server) { - co_await capy::when_all( + auto r = co_await capy::when_all( writer(client, total_bytes), reader(server, total_bytes)); - std::printf("example complete!\n"); + if (r.ec) + std::printf("example error: %s\n", r.ec.message().c_str()); + else + std::printf("example complete!\n"); } int main()