libasync
libasync is an async primitives library for C++20. It is one of the libraries powering the managarm project.
libasync is built to be portable to different platforms, hosted or freestanding.
Docs permalink: https://docs.managarm.org/libasync/
Projects using libasync
- managarm - Pragmatic microkernel-based OS with fully asynchronous I/O. libasync is used both in user-space, and in the kernel.
Contributing
Any kind of contributions to libasync are welcome, and greatly appreciated.
Code contributions
Code contributions should follow the
managarm coding style,
with the exception that all names are snake_case
instead. In addition, any new
user-facing functionality that's added should also be appropriately documented.
Documentation contributions
If you found typos, broken code or other mistakes in the documentation, feel free to create an issue or make a pull request fixing them.
For writing new documentation, follow the following guidelines:
- In class prototypes, only show user-facing methods.
- Document the return types of every method shown.
- Try to keep the Markdown lines between 80-90 characters long wherever reasonable.
It is also advised to follow this general outline for pages:
---
short-description: ...
...
# Name
Short description of the functionality.
## Prototype
Prototype for the class or function(s).
### Requirements (optional)
Constraints placed on template types.
### Arguments (optional)
List of arguments and their description.
### Return value(s) (optional)
Description of return values and types.
## Examples (optional)
Example code
Code output
Senders, receivers, and operations
libasync is in part built around the concept of senders and receivers. Using senders and receiver allows for allocation-free operation where otherwise an allocation would be necessary for the coroutine frame. For example, all of the algorithms and other asynchronous operations are written using them.
Concepts
Sender
A sender is an object that holds all the necessary arguments and knows how to start an asynchronous operation. It is moveable.
Every sender must have an appropriate connect
member method or function overload
that accepts a receiver and is used to form the operation.
Operation
An operation is an object that stores the necessary state and handles the actual asynchronous operation. It is immovable, and as such pointers to it will remain valid for as long as the operation exists. When the operation is finished, it notifies the receiver and optionally passes it a result value.
Every operation must either have a void start()
or a bool start_inline()
method
that is invoked when the operation is first started. void start()
is equivalent to
bool start_inline()
with return false;
at the end.
Inline and no-inline completion
Operations that complete synchronously can signal inline completion. If an operation
completes inline, it sets the value using set_value_inline
, and returns true
from
start_inline
(Operations that have a start
method cannot complete inline). Inline
completion allows for certain optimizations, like avoiding suspending the coroutine
if the operation completed synchronously.
Receiver
A receiver is an object that knows what to do after an operation finishes (e.g. how to resume the coroutine). It optionally receives a result value from the operation. It is moveable.
Every receiver must have void set_value_inline(...)
and void set_value_noinline(...)
methods that are invoked by the operation when it completes.
short-description: Brief overview of a custom sender and operation ...
Writing your own sender and operation
While libasync provides a veriaty of existing types, one may wish to write their
own senders and operations. The most common reason for writing a custom sender
and operation is wrapping around an existing callback or event based API while
avoiding extra costs induced by allocating coroutine frames. The following section
goes into detail on how to implement them by implementing a simple wrapper for libuv's uv_write
.
End effect
The sender and operation we'll implement here will allow us to do the following:
auto result = co_await write(my_handle, my_bufs, n_my_bufs);
if (result < 0)
/* report error */
The implementation
The sender
As explained in , the sender is an object that stores the
neceessary state to start an operation. Let's start off by looking at the
prototype for uv_write
:
int uv_write(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);
The function takes some object that stores the state (not to be confused with our operation object), a handle to the stream, buffers to write and a callback. The callback is also given a status code, which we will propagate back to the user. The state and callback will be handled by our operation, which leaves only the handle, buffers and status code.
Let's start by writing a simple class:
struct [[nodiscard]] write_sender {
using value_type = int; // Status code
uv_stream_t *handle;
const uv_buf_t *bufs;
size_t nbufs;
};
As can be seen, the sender class is quite simple. The nodiscard
attribute only
helps catch errors caused by accidentally ignoring the sender without awaiting
it and can be omitted.
Next we add a simple function used to construct the sender:
write_sender write(uv_stream_t *handle, const uv_buf_t *bufs, size_t nbufs) {
return write_sender{handle, bufs, nbufs};
}
In addition to that, we need the connect
overload:
template <typename Receiver>
write_operation<Receiver> connect(write_sender s, Receiver r) {
return {s, std::move(r)};
}
connect
simply constructs an operation from the sender and receiver.
We also add an implementation of operator co_await
for our class so that we
can co_await
it inside of a coroutine:
async::sender_awaiter<write_sender, write_sender::value_type>
operator co_await(write_sender s) {
return {s};
}
async::sender_awaiter
is a special type
that can suspend and resume the coroutine, and internally connects a receiver
to our sender.
The operation
With the sender done, what remains to be written is the operation. As noted earlier,
the operation is constructed using the sender and receiver, and it stores the
operation state. As such, we want each call to write
to have an unique operation
object. Let's start by writing a skeleton for the class:
template <typename Receiver>
struct write_operation {
write_operation(write_sender s, Receiver r)
: req_{}, handle_{s.handle}, bufs_{s.bufs}, nbufs_{s.nbufs}, r_{std::move(r)} { }
write_operation(const write_operation &) = delete;
write_operation &operator=(const write_operation &) = delete;
write_operation(write_operation &&) = delete;
write_operation &operator=(write_operation &&) = delete;
private:
uv_write_t req_;
uv_stream_t *handle_;
const uv_buf_t *bufs_;
size_t nbufs_;
Receiver r_;
};
The operation stores all the necessary state, and is templated on the receiver type in order to support any receiver type.
The operation is also made immovable and non-copyable so that pointers to it can safely be taken without worrying that they may become invalid at some point.
Next, we add a start_inline
method:
bool start_inline() {
auto result = uv_write(&req_, handle_, bufs_, nbufs_, [] (uv_write_t *req, int status) {
/* TODO */
});
if (result < 0) {
async::execution::set_value_inline(r_, result);
return true; // Completed inline
}
return false; // Did not complete inline
}
We use start_inline
here in order to notify the user of any immediate errors
synchronously. We use functions defined inside of async::execution
to set the
value, because they properly detect which method should be called on the receiver.
Now, let's implement the actual asynchronous completion:
...
handle_->data = this;
auto result = uv_write(&req_, handle_, bufs_, nbufs_, [] (uv_write_t *req, int status) {
auto op = static_cast<write_operation *>(req->handle->data);
op->complete(status);
});
...
We use the handle's user data field to store a pointer to this instance of the operation
in order to be able to access it later. This is necessary as otherwise we'd have no way
of knowing which operation caused our callback to be entered. Do note that this way
of implementing it means that only one write operation may be in progress at once.
One way to solve this would be to have a wrapper object that manages the handle and
has a map of req
to write_operation
, but that's beyond the scope of this example.
Note: Due to how libuv operates (it hides the actual event loop and instead
dispatches callbacks directly), the will not have any logic apart
from invoking uv_run
to do one iteration of the event loop.
Finally, we add our complete
method:
private:
void complete(int status) {
async::execution::set_value_noinline(r_, status);
}
On complete
, we use async::execution::set_value_noinline
to set the result
value and notify the receiver that the operation is complete (so that it can
for example resume the suspended coroutine, like the async::sender_awaiter
receiver).
Full code
All of this put together gives us the following code:
// ----------------------------------------------
// Sender
// ----------------------------------------------
struct [[nodiscard]] write_sender {
using value_type = int; // Status code
uv_stream_t *handle;
const uv_buf_t *bufs;
size_t nbufs;
};
write_sender write(uv_stream_t *handle, const uv_buf_t *bufs, size_t nbufs) {
return write_sender{handle, bufs, nbufs};
}
template <typename Receiver>
write_operation<Receiver> conneect(write_sender s, Receiver r) {
return {s, std::move(r)};
}
async::sender_awaiter<write_sender, write_sender::value_type>
operator co_await(write_sender s) {
return {s};
}
// ----------------------------------------------
// Operation
// ----------------------------------------------
template <typename Receiver>
struct write_operation {
write_operation(write_sender s, Receiver r)
: req_{}, handle_{s.handle}, bufs_{s.bufs}, nbufs_{s.nbufs}, r_{std::move(r)} { }
write_operation(const write_operation &) = delete;
write_operation &operator=(const write_operation &) = delete;
write_operation(write_operation &&) = delete;
write_operation &operator=(write_operation &&) = delete;
bool start_inline() {
handle_->data = this;
auto result = uv_write(&req_, handle_, bufs_, nbufs_, [] (uv_write_t *req, int status) {
auto op = static_cast<write_operation *>(req->handle->data);
op->complete(status);
});
if (result < 0) {
async::execution::set_value_inline(r_, result);
return true; // Completed inline
}
return false; // Did not complete inline
}
private:
void complete(int status) {
async::execution::set_value_noinline(r_, status);
}
uv_write_t req_;
uv_stream_t *handle_;
const uv_buf_t *bufs_;
size_t nbufs_;
Receiver r_;
};
IO service
The IO service is an user-provided class which manages waiting for events and waking up coroutines/operations based on them.
The IO service must provide one method: void wait()
. This method is called when
there is no more work to do currently. It waits for any event to happen, and wakes
up the appropriate coroutine/operation which awaited the event.
Note: async::run
and async::run_forever
(see here)
take the IO service by value, not by reference.
Example
The following example shows the approximate call graph executing an event-loop-driven coroutine would take:
async::run(my_sender, my_io_service)
my_operation = async::execution::connect(my_sender, internal_receiver)
async::execution::start_inline(my_operation)
my_operation
starts running...co_await some_ev
some_ev
operation is startedmy_io_service.add_waiter(this)
- (
async::execution::start_inline
returnsfalse
) my_io_service.wait()
- IO service waits for event to happen...
waiters_.front()->complete()
some_ev
operation completesmy_operation
resumesco_return 2
async::execution::set_value_noinline(internal_receiver, 2)
return internal_receiver.value
- (
async::run
returns2
)
Headers
The following sections contain documentation for libasync's header files. Each section corresponds to one header file.
algorithm
#include <async/algorithm.hpp>
This header provides various algorithms that can be used as building blocks for creating more complex functionality without incuring any costs of memory allocation.
invocable
invocable
is an operation that executes the given functor and completes inline
with the value returned by the functor.
Prototype
template <typename F>
sender invocable(F f);
Requirements
F
is a functor that takes no arguments.
Arguments
f
- the functor to execute.
Return value
This function returns a sender of unspecified type. This sender returns the value returned by the functor.
Examples
auto fn = [] { return 1; };
std::cout << async::run(async::invocable(fn)) << std::endl;
Output:
1
transform
transform
is an operation that starts the given sender, and upon completion
applies the functor to it's return value.
Prototype
template <typename Sender, typename F>
sender transform(Sender ds, F f);
Requirements
Sender
is a sender and F
is a functor that accepts the return value of the
sender as an argument.
Arguments
ds
- the sender whose result value should be transformed.f
- the functor to use.
Return value
This function returns a sender of unspecified type. This sender returns the return value of the functor.
Examples
auto coro = [] () -> async::result<int> {
co_return 5;
};
std::cout << async::run(async::transform(coro(), [] (int i) { return i * 2; }) << std::endl;
Output:
10
ite
ite
is an operation that checks the given condition, and starts the "then" or
"else" sender depending on the result.
Prototype
template <typename C, typename ST, typename SE>
sender ite(C cond, ST then_s, SE else_s);
Requirements
C
is a functor that returns a truthy or falsy value. ST
and SE
are senders.
ST
and SE
must have the same return type.
Arguments
cond
- the condition to check.then_s
- the sender to start if condition is true.else_s
- the sender to start if condition is false.
Return value
This function returns a sender of unspecified type. The sender returns the value of the sender that was started depending on the condition.
Examples
auto then_coro = [] () -> async::result<int> {
co_return 1;
};
auto else_coro = [] () -> async::result<int> {
co_return 2;
};
std::cout << async::run(async::ite([] { return true; }, then_coro(), else_coro())) << std::endl;
std::cout << async::run(async::ite([] { return false; }, then_coro(), else_coro())) << std::endl;
Output:
1
2
repeat_while
repeat_while
is an operation that continuously checks the given condition, and
as long as it's true, it invokes the given functor to obtain a sender, and starts it.
Prototype
template <typename C, typename SF>
sender repeat_while(C cond, SF factory);
Requirements
C
is a functor that returns a truthy or falsy value. SF
is a functor that
returns a sender.
Arguments
cond
- the condition to check.factory
- the functor to call to obtain the sender on every iteration.
Return value
This function returns a sender of unspecified type. The sender does not return any value.
Examples
int i = 0;
async::run(async::repeat_while([&] { return i++ < 5; },
[] () -> async::result<void> {
std::cout << "Hi" << std::endl;
co_return;
}));
Output:
Hi
Hi
Hi
Hi
Hi
race_and_cancel
race_and_cancel
is an operation that obtains senders using the given functors,
starts all of them concurrently, and cancels the remaining ones when one finishes.
Note: race_and_cancel
does not guarantee that only one sender completes
without cancellation.
Prototype
template <typename... Functors>
sender race_and_cancel(Functors... fs);
Requirements
Every type of Functors
is invocable with an argument of type async::cancellation_token
and produces a sender.
Arguments
fs
- the functors to invoke to obtain the senders.
Return value
This function returns a sender of unspecified type. The sender does not return any value.
Examples
async::run(async::race_and_cancel(
[] (async::cancellation_token) -> async::result<void> {
std::cout << "Hi 1" << std::endl;
co_return;
},
[] (async::cancellation_token) -> async::result<void> {
std::cout << "Hi 2" << std::endl;
co_return;
}
));
Possible output:
Hi 1
Hi 2
let
let
is an operation that obtains a value using the given functor, stores it in
a variable, and obtains a sender to start using the second functor, passing a
reference to the variable as an argument.
Prototype
template <typename Pred, typename Func>
sender let(Pred pred, Func func);
Requirements
Pred
is a functor that returns a value. Func
is a functor that returns a sender
and accepts a reference to the value returned by Pred
as an argument.
Arguments
pred
- the functor to execute to obtain the value.func
- the function to call to obtain the sender.
Return value
This function returns a sender of unspecified type. The sender returns the value returned by the obtained sender.
Examples
std::cout << async::run(async::let([] { return 3; },
[] (int &i) -> async::result<int> {
return i * 2;
})) << std::endl;
Output:
6
sequence
sequence
is an operation that sequentially starts the given senders.
Prototype
template <typename ...Senders>
sender sequence(Senders ...senders);
Requirements
Senders
is not empty, and every type in it is a sender. All but the last sender
must have a return type of void
.
Arguments
senders
- the senders to run.
Return value
This function returns a sender of unspecified type. The sender returns the value returned by the last sender.
Examples
int steps[4] = {0, 0, 0, 0};
int v = async::run([&]() -> async::result<int> {
int i = 0;
co_return co_await async::sequence(
[&]() -> async::result<void> {
steps[0] = i;
i++;
co_return;
}(),
[&]() -> async::result<void> {
steps[1] = i;
i++;
co_return;
}(),
[&]() -> async::result<void> {
steps[2] = i;
i++;
co_return;
}(),
[&]() -> async::result<int> {
steps[3] = i;
i++;
co_return i * 10;
}()
);
}());
std::cout << v << " " << steps[0] << steps[1] << steps[2] << steps[3] << std::endl;
Output:
40 0123
when_all
when_all
is an operation that starts all the given senders concurrently, and
only completes when all of them complete.
Prototype
template <typename... Senders>
sender when_all(Senders... senders);
Requirements
Every type of Senders
is a sender that doesn't return any value.
Arguments
senders
- the senders to start.
Return value
This function returns a sender of unspecified type. The sender does not return any value.
Examples
async::run(async::when_all(
[] () -> async::result<void> {
std::cout << "Hi 1" << std::endl;
co_return;
}(),
[] () -> async::result<void> {
std::cout << "Hi 2" << std::endl;
co_return;
}()
));
std::cout << "Done" << std::endl;
Possible output:
Hi 1
Hi 2
Done
basic
#include <async/basic.hpp>
This header provides basic functionality.
sender_awaiter and make_awaiter
sender_awaiter
is a coroutine promise type that allows for awaiting a sender.
It connects an internal receiver to the given sender, and starts the resulting
operation when the result is awaited.
make_awaiter
is a function that obtains the awaiter associated with a sender.
It does that by getting the result of operator co_await
.
Prototype
template <typename S, typename T = void>
struct [[nodiscard]] sender_awaiter {
sender_awaiter(S sender); // (1)
bool await_ready(); // (2)
bool await_suspend(std::coroutine_handle<>); // (2)
T await_resume(); // (2)
}
template<typename S>
auto make_awaiter(S &&s); // (3)
- Constructs the object with the given sender.
- Coroutine promise methods.
- Get the awaiter for the given sender.
Requirements
S
is a sender and T
is S::value_type
or a type convertible to it.
Arguments
sender
- the sender to await.
Return values
- N/A
- These methods return implementation-specific values.
- This function returns the result object of
co_await
ing the given object (without performing the actual asynchronous await operation).
any_receiver
any_receiver
is a wrapper type that wraps around any receiver type and handles
calling set_value
on it.
Prototype
template <typename T>
struct any_receiver {
template <typename R>
any_receiver(R receiver); // (1)
void set_value(T); // (2)
void set_value_noinline(T); // (2)
}
- Constructs the object with the given receiver.
- Forwards the value to the given receiver.
Requirements
T
is any type, R
is a receiver that accepts values of type T
. R
is trivially
copyable, and is smaller or of the same size and alignment as a void *
.
Arguments
receiver
- the receiver to wrap.
Return values
- N/A
- These methods don't return any value.
run and run_forever
run
and run_forever
are top-level functions used for running coroutines. run
runs a coroutine until it completes, while run_forever
runs coroutines indefinitely
via the IO service.
Prototype
template<typename IoService>
void run_forever(IoService ios); // (1)
template<typename Sender>
Sender::value_type run(Sender s); // (2)
template<typename Sender, typename IoService>
Sender::value_type run(Sender s, IoService ios); // (3)
- Run the IO service indefinitely
- Start the sender and wait until it completes. The sender must complete inline as there's no way to wait for it to complete.
- Same as (2) but the sender can complete not-inline.
Requirements
IoService
is an IO service, and Sender
is a sender.
Arguments
IoService
- the IO service to use to wait for completion.Sender
- the sender to start.
Return value
- This function does not return.
- This function returns the result value obtained from the sender.
- Same as (2).
Examples
int i = async::run([] () -> async::result<int> {
co_return 1;
}());
std::cout << i << std::endl;
Output:
1
detached, detach and detach_with_allocator
detached
is a coroutine type used for detached coroutines. Detached coroutines
cannot be awaited and they do not suspend the coroutine that started them.
detach
and detach_with_allocator
are functions that take a sender and run them
as if they were a detached coroutine. detach
is a wrapper around detach_with_allocator
that uses operator new
/operator delete
for allocating memory. The allocator
is used to allocate a structure that holds the operation and continuation until
the operation completes.
Prototype
template<typename Allocator, typename S, typename Cont>
void detach_with_allocator(Allocator allocator, S sender, Cont continuation); // (1)
template<typename Allocator, typename S>
void detach_with_allocator(Allocator allocator, S sender); // (2)
template<typename S>
void detach(S sender); // (3)
template<typename S, typename Cont>
void detach(S sender, Cont continuation); // (4)
- Detach a sender using an allocator, and call the continuation after it completes.
- Same as (1) but without the continuation.
- Same as (2) but without an allocator.
- Same as (1) but without an allocator.
Requirements
Allocator
is an allocator, S
is a sender, Cont
is a functor that takes no arguments.
Arguments
allocator
- the allocator to use.sender
- the sender to start.continuation
- the functor to call on completion.
Return value
These functions don't return any value.
Examples
async::oneshot_event ev;
async::run([] (async::oneshot_event &ev) -> async::detached {
std::cout << "Coroutine 1" << std::endl;
co_await ev.wait();
std::cout << "Coroutine 2" << std::endl;
}(ev));
std::cout << "Before event raise" << std::endl;
ev.raise();
std::cout << "After event raise" << std::endl;
Output:
Coroutine 1
Before event raise
Coroutine 2
After event raise
spawn_with_allocator
spawn_with_allocator
is a function that takes a sender and a receiver, connects
them and detaches in a way similar to detach_with_allocator
.
Prototype
template<typename Allocator, typename S, typename R>
void spawn_with_allocator(Allocator allocator, S sender, R receiver);
Requirements
Allocator
is an allocator, S
is a sender, R
is a receiver.
Arguments
allocator
- the allocator to use.sender
- the sender to start.receiver
- the receiver to use.
Return value
This function doesn't return any value.
Examples
struct my_receiver {
void set_value_inline(int value) {
std::cout << "Value: " << value << std::endl;
}
void set_value_noinline(int value) {
std::cout << "Value: " << value << std::endl;
}
};
async::oneshot_event ev;
async::spawn_with_allocator(frg::stl_allocator{},
[] (async::oneshot_event &ev) -> async::result<int> {
std::cout << "Start sender" << std::endl;
co_await ev.wait();
co_return 1;
}(ev), my_receiver{});
std::cout << "Before event raise" << std::endl;
ev.raise();
std::cout << "After event raise" << std::endl;
Output:
Start sender
Before event raise
Value: 1
After event raise
result
#include <async/result.hpp>
result
is a generic coroutine promise and sender type. It it used for coroutines
for which you need to await the result of.
Prototype
template <typename T>
struct result;
Requirements
T
is the type of the value returned by the coroutine.
Examples
async::result<int> coro1(int i) {
co_return i + 1;
}
async::result<int> coro2(int i) {
co_return i * co_await coro1(i);
}
int main() {
std::cout << async::run(coro2(5)) << std::endl;
}
Output:
30
oneshot_event
#include <async/oneshot-event.hpp>
oneshot_event
is an event type that can be only raised once, and supports
multiple waiters. After the initial raise, any further waits will complete
immediately, and further raises will be a no-op.
Prototype
struct oneshot_event {
void raise(); // (1)
sender wait(cancellation_token ct); // (2)
sender wait(); // (3)
};
- Raises an event.
- Returns a sender for the wait operation. The operation waits for the event to be raised.
- Same as (2) but it cannot be cancelled.
Arguments
ct
- the cancellation token to use to listen for cancellation.
Return values
- This method doesn't return any value.
- This method returns a sender of unspecified type. The sender completes with
either
true
to indicate success, orfalse
to indicate that the wait was cancelled. - Same as (2) except the sender completes without a value.
Examples
async::oneshot_event ev;
auto coro = [] (async::oneshot_event &ev) -> async::detached {
std::cout << "Before wait" << std::endl;
co_await ev.wait();
std::cout << "After wait" << std::endl;
};
coro(ev);
std::cout << "Before raise" << std::endl;
ev.raise();
std::cout << "After raise" << std::endl;
coro(ev);
Output:
Before wait
Before raise
After wait
After raise
Before wait
After wait
async/wait-group.hpp
wait_group
#include <async/wait-group.hpp>
wait_group
is a synchronization primitive that waits for a counter (that can
be incremented) to reach zero. It conceptually maps to a group of related work
being done in parallel, and a few consumers waiting for that work to be done.
The amount of work is increased by calls to add()
and decreased by calls to
done()
.
This struct also implements
BasicLockable so
that it can be used with std::unique_lock
.
Prototype
struct wait_group {
void done(); // (1)
void add(int n); // (2)
sender wait(cancellation_token ct); // (3)
sender wait(); // (4)
void lock(); // (5)
void unlock(); // (6)
};
- "Finishes" a work (decrements the work count).
- "Adds" more work (increments the work count by
n
). - Returns a sender for the wait operation. The operation waits for the counter to drop to zero.
- Same as (3) but it cannot be cancelled.
- Equivalent to
add(1)
. - Equivalent to
done()
.
Arguments
n
- amount of work to "add" to this work groupct
- the cancellation token to use to listen for cancellation.
Return values
- This method doesn't return any value.
- This method doesn't return any value.
- This method returns a sender of unspecified type. The sender completes with
either
true
to indicate success, orfalse
to indicate that the wait was cancelled. - Same as (3) except the sender completes without a value.
Examples
async::wait_group wg { 3 };
([&wg] () -> async::detached {
std::cout << "before wait" << std::endl;
co_await wg.wait();
std::cout << "after wait" << std::endl;
})();
auto done = [&wg] () {
std::cout << "before done" << std::endl;
wg.done();
std::cout << "after done" << std::endl;
};
done();
done();
std::cout << "before add" << std::endl;
wg.add(2);
std::cout << "after add" << std::endl;
done();
done();
done();
Output:
before wait
before done
after done
before done
after done
before add
after add
before done
after done
before done
after done
before done
after wait
after done
wait_in_group
wait_in_group(wg, S)
takes a sender S
and adds it to the work group wg
(calls wg.add(1)
) immediately before it's started and marks it as done
(calls wg.done()
) immediately after.
Prototype
template<typename S>
sender wait_in_group(wait_group &wg, S sender);
Requirements
S
is a sender.
Arguments
wg
- wait group to wait insender
- sender to wrap in the wait group
Return value
The value produced by the sender
.
Examples
bool should_run() {
/* ... */
}
async::result<void> handle_conn(tcp_socket conn) {
/* ... */
}
/* ... */
tcp_socket server;
server.bind(":80");
server.listen(32);
async::wait_group handlers { 0 };
while (should_run()) {
auto conn = socket.accept();
async::detach(async::wait_in_group(handlers, handle_conn(std::move(conn))));
}
/* wait for all connections to terminate */
handlers.wait();
recurring_event
#include <async/recurring-event.hpp>
recurring_event
is an event type that can be raised multiple times, and supports
multiple waiters. On raise, all waiters are woken up sequentially.
Prototype
struct recurring_event {
void raise(); // (1)
template <typename C>
sender async_wait_if(C cond, cancellation_token ct = {}); // (2)
sender async_wait(cancellation_token ct = {}); // (3)
};
- Raises an event.
- Returns a sender for the wait operation. The operation checks the condition, and if it's true, waits for the event to be raised.
- Same as (2) but without the condition.
Requirements
C
is a functor that accepts no arguments and returns a truthy or falsy value.
Arguments
ct
- the cancellation token to use to listen for cancellation.
Return values
- This method doesn't return any value.
- This method returns a sender of unspecified type. The sender completes with either
true
to indicate success, orfalse
to indicate that the wait was cancelled, or that the condition was false. - Same as (2)
Examples
async::recurring_event ev;
auto coro = [] (int i, async::recurring_event &ev) -> async::detached {
std::cout << i << ": Before wait" << std::endl;
co_await ev.async_wait();
std::cout << i << ": After wait" << std::endl;
};
coro(1, ev);
coro(2, ev);
std::cout << "Before raise" << std::endl;
ev.raise();
std::cout << "After raise" << std::endl;
coro(3, ev);
Possible output:
1: Before wait
2: Before wait
Before raise
1: After wait
2: After wait
After raise
3: Before wait
sequenced_event
#include <async/sequenced-event.hpp>
sequenced_event
is an event type that can be raised multiple times, and supports
multiple waiters. On raise, all waiters are woken up sequentially. The event
maintains a sequence counter to detect missed raises.
Prototype
struct sequenced_event {
void raise(); // (1)
uint64_t next_sequence(); // (2)
sender async_wait(uint64_t in_seq, async::cancellation_token ct = {}); // (3)
};
- Raises an event.
- Returns the next sequence number for the event (the sequence number after the event is raised).
- Returns a sender for the wait operation. The operation checks whether the input sequence is equal to the event sequeence, and if it's true, waits for the event to be raised.
Arguments
in_seq
- input sequence number to compare against the event sequence number.ct
- the cancellation token to use to listen for cancellation.
Return values
- This method doesn't return any value.
- This method returns the next sequence number for the event (the sequence number after the event is raised).
- This method returns a sender of unspecified type. The sender completes with the current event sequence number.
Examples
async::sequenced_event ev;
std::cout << ev.next_sequence() << std::endl;
ev.raise();
std::cout << ev.next_sequence() << std::endl;
auto seq = async::run(
[] (async::sequenced_event &ev) -> async::result<uint64_t> {
// Current sequence is 1, so waiting at sequence 0 will immediately complete.
co_return co_await ev.async_wait(0);
}(ev));
std::cout << seq << std::endl;
Output:
1
2
1
cancellation
#include <async/cancellation.hpp>
This header provides facilities to request and handle cancellation of operations.
cancellation_event and cancellation_token
cancellation_event
is a type that is used to request cancellation of an
asynchronous operation.
cancellation_token
is a type constructible from a cancellation_event
that's
used by operations to check for cancellation.
Prototype
struct cancellation_event {
cancellation_event(const cancellation_event &) = delete;
cancellation_event(cancellation_event &&) = delete;
cancellation_event &operator=(const cancellation_event &) = delete;
cancellation_event &operator=(cancellation_event &&) = delete;
void cancel(); // (1)
void reset(); // (2)
};
struct cancellation_token {
cancellation_token(); // (3)
cancellation_token(cancellation_event &ev); // (4)
bool is_cancellation_requested(); // (5)
};
- Requests cancellation.
- Resets the object to prepare it for reuse.
- Default constructs a cancellation token.
- Constructs a cancellation token from a cancellation event.
- Checks whether cancellation is requested.
Arguments
ev
- the cancellation event to use.
Return values
- This method doesn't return any value.
- Same as (1).
- N/A
- N/A
- Returns
true
if cancellation was requested,false
otherwise.
Example
async::run([] () -> async::result<void> {
async::queue<int, frg::stl_allocator> q;
async::cancellation_event ce;
ce.cancel(); // Request cancellation immediately
auto v = q.async_get(ce);
if (!v)
std::cout << "Cancelled" << std::endl;
else
std::cout << "Completed: " << *v << std::endl;
}());
Output:
Cancelled
cancellation_callback and cancellation_observer
cancellation_callback
registers a callback that will be invoked when
cancellation occurs.
cancellation_observer
checks for cancellation, potentially invoking the handler
and returns whether cancellation occured.
Prototype
template <typename F>
struct cancellation_callback {
cancellation_callback(cancellation_token ct, F functor = {}); // (1)
cancellation_callback(const cancellation_callback &) = delete;
cancellation_callback(cancellation_callback &&) = delete;
cancellation_callback &operator=(const cancellation_callback &) = delete;
cancellation_callback &operator=(cancellation_callback &&) = delete;
void unbind(); // (2)
};
template <typename F>
struct cancellation_observer {
cancellation_observer(F functor = {}); // (3)
cancellation_observer(const cancellation_observer &) = delete;
cancellation_observer(cancellation_observer &&) = delete;
cancellation_observer &operator=(const cancellation_observer &) = delete;
cancellation_observer &operator=(cancellation_observer &&) = delete;
bool try_set(cancellation_token ct); // (4)
bool try_reset(); // (5)
};
- Constructs and attaches the cancellation callback to the given token.
- Detaches the cancellation callback.
- Constructs a cancellation observer.
- Attaches the cancellation observer to the given token and checks for cancellation without invoking the functor.
- Checks for cancellation, potentially invoking the functor, and detaches the cancellation observer.
Requirements
F
is a functor that takes no arguments.
Arguments
ct
- the cancellation token to attach to.functor
- the functor to call on cancellation.
Return values
- N/A
- This method doesn't return any value.
- N/A
- Returns
false
if cancellation was already requested and the observer hasn't been attached,true
otherwise. - Same as (4).
suspend_indefinitely
suspend_indefinitely
is an operation that suspends until it is cancelled.
Prototype
sender suspend_indefinitely(cancellation_token cancellation);
Arguments
cancellation
- the cancellation token to use.
Return value
This function returns a sender of unspecified type. The sender doesn't return any value, and completes when cancellation is requested.
Examples
async::cancellation_event ce;
auto coro = [] (async::cancellation_token ct) -> async::detached {
std::cout << "Before await" << std::endl;
co_await async::suspend_indefinitely(ct);
std::cout << "After await" << std::endl;
};
coro(ce);
std::cout << "Before cancel" << std::endl;
ce.cancel();
std::cout << "After cancel" << std::endl;
Output:
Before await
Before cancel
After await
After cancel
execution
#include <async/execution.hpp>
The following objects and type definitions are in the async::execution
namespace.
This header contains customization point objects (CPOs) for the following methods/functions:
connect
(as a member or function),start
(as a member or function),start_inline
(as a member),set_value
(as a member),set_value_inline
(as a member),set_value_noinline
(as a member).
In addition to that, it provides a convenience type definition for working with operations:
template<typename S, typename R>
using operation_t = std::invoke_result_t<connect_cpo, S, R>;
Examples
auto op = async::execution::connect(my_sender, my_receiver);
bool finished_inline = async::execution::start_inline(op);
queue
#include <async/queue.hpp>
queue
is a type which provides a queue on which you can asynchronously wait
for items to appear.
Prototype
template <typename T, typename Allocator>
struct queue {
queue(Allocator allocator = {}); // (1)
void put(T item); // (2)
template <typename ...Ts>
void emplace(Ts &&...ts); // (3)
sender async_get(cancellation_token ct = {}); // (4)
frg::optional<T> maybe_get() // (5)
};
- Constructs a queue with the given allocator.
- Inserts an item into the queue.
- Emplaces an item into the queue.
- Returns a sender for the get operation. The operation waits for an item to be inserted and returns it.
- Pops and returns the top item if it exists, or
frg::null_opt
otherwise.
Requirements
T
is moveable. Allocator
is an allocator.
T
is constructible withTs
.
Arguments
allocator
- the allocator to use.item
- the item to insert into the queue.ts
- the arguments to pass to the constructor ofT
when inserting it into the queue.ct
- the cancellation token to use.
Return values
- N/A
- This method doesn't return any value.
- Same as (2).
- This method returns a sender of unspecified type. The sender returns a
frg::optional<T>
and completes with the value, orfrg::null_opt
if the operation was cancelled. - This method returns a value of type
frg::optional<T>
. It returns a value from the queue, orfrg::null_opt
if the queue is empty.
Examples
auto coro = [] (async::queue<int, frg::stl_allocator> &q) -> async::detached {
std::cout << "Got " << *(co_await q.async_get()) << std::endl;
std::cout << "Got " << *(co_await q.async_get()) << std::endl;
};
async::queue<int, frg::stl_allocator> q;
coro(q);
q.put(1);
q.put(2);
Output:
1
2
mutex
#include <async/mutex.hpp>
This header provides asynchronous mutex types.
mutex
mutex
is a mutex which supports asynchronous acquisition. When the mutex is
contended, the operation asynchronously blocks until the current holder
releases it.
Prototype
struct mutex {
sender async_lock(); // (1)
bool try_lock(); // (2)
void unlock(); // (3)
};
- Asynchronously acquire the mutex.
- Synchronously try to acquire the mutex.
- Release the mutex.
Return values
- This method returns a sender of unspecified type. The sender does not return any value, and completes once the mutex is acquired.
- This method returns
true
if the mutex was successfully acquired,false
otherwise. - This method doesn't return any value.
Examples
async::mutex mtx;
async::queue<int, frg::stl_allocator> q;
auto coro = [] (int i, auto &mtx, auto &q) -> async::detached {
std::cout << i << ": taking" << std::endl;
co_await mtx.async_lock();
std::cout << i << ": " << mtx.try_lock() << std::endl;
co_await q.async_get();
std::cout << i << ": releasing" << std::endl;
mtx.unlock();
};
coro(1, mtx, q);
coro(2, mtx, q);
q.put(1);
q.put(2);
Output:
1: taking
1: 0
2: taking
1: releasing
2: 0
2: releasing
shared_mutex
shared_mutex
is a mutex which supports asynchronous acquisition. It can be
acquired in either shared mode, or exclusive mode. Acquiring in exclusive mode
blocks until all shared owners, or the current exclusive owner release the mutex,
and acquiring in shared mode only blocks if it's currently owned exclusively.
When the mutex is contended, the operation asynchronously blocks until the current
holder releases it. These are also known as read-write mutexes, where shared mode
is a read, and exclusive mode is a write.
Prototype
struct shared_mutex {
sender async_lock(); // (1)
sender async_lock_shared(); // (2)
void unlock(); // (3)
void unlock_shared(); // (4)
};
- Asynchronously acquire the mutex in exclusive mode.
- Asynchronously acquire the mutex in shared mode.
- Release the mutex (mutex must be in exclusive mode).
- Release the mutex (mutex must be in shared mode).
Return values
- This method returns a sender of unspecified type. The sender does not return any value, and completes once the mutex is acquired.
- Same as (1).
- This method doesn't return any value.
- Same as (3).
Examples
async::shared_mutex mtx;
async::queue<int, frg::stl_allocator> q;
auto coro_shared = [] (int i, auto &mtx, auto &q) -> async::detached {
std::cout << i << ": taking shared" << std::endl;
co_await mtx.async_lock_shared();
std::cout << i << ": acquired shared" << std::endl;
co_await q.async_get();
std::cout << i << ": releasing shared" << std::endl;
mtx.unlock_shared();
};
auto coro_exclusive = [] (int i, auto &mtx, auto &q) -> async::detached {
std::cout << i << ": taking exclusive" << std::endl;
co_await mtx.async_lock();
std::cout << i << ": acquired exclusive" << std::endl;
co_await q.async_get();
std::cout << i << ": releasing exclusive" << std::endl;
mtx.unlock();
};
coro_shared(1, mtx, q);
coro_shared(2, mtx, q);
coro_exclusive(3, mtx, q);
coro_exclusive(4, mtx, q);
q.put(1);
q.put(2);
q.put(3);
q.put(4);
Output:
1: taking shared
1: acquired shared
2: taking shared
2: acquired shared
3: taking exclusive
4: taking exclusive
1: releasing shared
2: releasing shared
3: acquired exclusive
3: releasing exclusive
4: acquired exclusive
4: releasing exclusive
promise
#include <async/promise.hpp>
This header provides promise and future types.
Examples
async::future<int, frg::stl_allocator> future;
{
async::promise<int, frg::stl_allocator> promise;
future = promise.get_future();
promise.set_value(3);
}
std::cout << *async::run(future.get()) << std::endl;
Output:
3
promise
promise
is a type that can be used to set a value that can be awaited via a future
.
The stored data is only destroyed when the promise, and all associated futures go out
of scope. A promise is movable and non-copyable.
Prototype
template <typename T, typename Allocator>
struct promise {
future<T, Allocator> get_future(); // (1)
template <typename U>
void set_value(U &&v); // (2)
void set_value(); // (3)
};
- Obtains the future associated with this promise. This can be called multiple times to get multiple futures.
- Emplaces a value into the promise (overload for
T
other thanvoid
). - Same as (2), except
T
must bevoid
.
Requirements
T
is any type. Allocator
is an allocator. T
is constructible from U
.
Arguments
v
- the value to emplace.
Return values
- This method returns a future object associated with the promise.
- This method doesn't return any value.
- Same as (2).
future
future
is a type that can be used to await a value from the promise. The
stored data is only destroyed when the promise, and all associated futures
go out of scope.
Prototype
template <typename T, typename Allocator>
struct future {
friend void swap(future &a, future &b); // (1)
future(Allocator allocator = {}); // (2)
future(const future &); // (3)
future(future &&); // (4)
future &operator=(const future &); // (3)
future &operator=(future &&); // (4)
sender get(cancellation_token ct); // (5)
sender get(); // (6)
bool valid(); // (7)
operator bool (); // (7)
};
- Swaps two futures.
- Constructs an empty future.
- Copies a future. The copy of the future refers to the same state as the
original object in a manner similar to a
shared_ptr
. - Moves a future.
- Asynchronously obtains the value.
- Same as (5).
- Checks whether the future has valid state.
Requirements
T
is any type. Allocator
is an allocator.
Arguments
ct
- the cancellation token to use.
Return values
- This function doesn't return any value.
- N/A
- N/A for constructor, reference to this object for assignment operator.
- Same as (3).
- This method returns a sender of unspecified type. If
T
is notvoid
, the sender returns a value of typefrg::optional<T *>
. It returns the pointer if the value was obtained successfully,frg::null_opt
if the operation was cancelled. IfT
isvoid
, the sender returnstrue
if the value was obtained successfully,false
if the operation was cancelled. - This method returns a sender of unspecified type. If
T
is notvoid
, the sender returns a pointer to the stored value. IfT
isvoid
, it doesn't return anything. - These methods return
true
if the future has valid state,false
otherwise.
post-ack
#include <async/post-ack.hpp>
This header provides classes for a producer-consumer data structure that requires every consumer to acknowledge the value before the producer can continue.
Examples
async::post_ack_mechanism<int> mech;
auto producer = [] (async::post_ack_mechanism<int> &mech) -> async::detached {
std::cout << "Posting 1" << std::endl;
co_await mech.post(1);
std::cout << "Posting 2" << std::endl;
co_await mech.post(2);
};
auto consumer = [] (async::post_ack_mechanism<int> &mech) -> async::detached {
async::post_ack_agent<int> agent;
agent.attach(&mech);
std::cout << "Awaiting first value" << std::endl;
auto handle = co_await agent.poll();
std::cout << *handle << ", acking first value" << std::endl;
handle.ack();
std::cout << "Awaiting second value" << std::endl;
handle = co_await agent.poll();
std::cout << *handle << ", acking second value" << std::endl;
handle.ack();
agent.detach();
};
consumer(mech);
consumer(mech);
consumer(mech);
producer(mech);
Output:
Awaiting first value
Awaiting first value
Awaiting first value
Posting 1
1, acking first value
Awaiting second value
1, acking first value
Awaiting second value
1, acking first value
Posting 2
2, acking second value
2, acking second value
Awaiting second value
2, acking second value
post_ack_mechanism
post_ack_mechanism
is the producer class of post-ack. It can asynchronously
post a value, which will only complete once all consumers acknowledge the value.
Prototype
template <typename T>
struct post_ack_mechanism {
sender post(T object); // (1)
};
- Asynchronously post a value.
Requirements
T
is moveable.
Arguments
object
- the value to post.
Return values
- This method returns a sender of unspecified type. The sender doesn't return any value.
post_ack_agent
post_ack_agent
is the consumer class of post-ack. It can asynchronously poll
for a value produced by the observed post_ack_mechanism.
Prototype
template <typename T>
struct post_ack_agent {
void attach(post_ack_mechanism<T> *mech); // (1)
void detach(); // (2)
sender poll(cancellation_token ct = {}); // (3)
};
- Attach the agent (consumer) to a mechanism (producer).
- Detach the agent from the mechanism.
- Asynchronously poll for a value.
Requirements
T
is moveable.
Arguments
mech
- the mechanism to attach to.ct
- the cancellation token to use.
Return values
- This method doesn't return any value.
- This method doesn't return any value.
- This method returns a sender of unspecified type. The sender returns a value
of type
post_ack_handle<T>
.
post_ack_handle
post_ack_handle
is the value handle class of post-ack. It has methods to obtain
a pointer to the value and to acknowledge the value.
Prototype
template <typename T>
struct post_ack_handle {
friend void swap(post_ack_handle &a, post_ack_handle &b); // (1)
explicit post_ack_handle() = default; // (2)
post_ack_handle(const post_ack_handle &) = delete;
post_ack_handle(post_ack_handle &&); // (3)
post_ack_handle &operator=(const post_ack_handle &) = delete;
post_ack_handle &operator=(post_ack_handle &&); // (3)
void ack(); // (4)
explicit operator bool (); // (5)
T *operator-> (); // (6)
T &operator* (); // (6)
};
- Swaps two handles.
- Constructs an empty handle.
- Moves a handle.
- Acknowledges the value.
- Checks if the handle is valid.
- Gets the stored object.
Requirements
T
is moveable.
Return values
- This function doesn't return any value.
- N/A
- N/A for constructor, reference to the handle for assignment operator.
- This method doesn't return any value.
- This method returns
true
if the handle isn't empty,false
otherwise. - These methods return a pointer/reference to the handle.