libasync

libasync is an async primitives library for C++20. It is one of the libraries powering the managarm project.

libasync is built to be portable to different platforms, hosted or freestanding.

Docs permalink: https://docs.managarm.org/libasync/

Projects using libasync

  • managarm - Pragmatic microkernel-based OS with fully asynchronous I/O. libasync is used both in user-space, and in the kernel.

Contributing

Any kind of contributions to libasync are welcome, and greatly appreciated.

Code contributions

Code contributions should follow the managarm coding style, with the exception that all names are snake_case instead. In addition, any new user-facing functionality that's added should also be appropriately documented.

Documentation contributions

If you found typos, broken code or other mistakes in the documentation, feel free to create an issue or make a pull request fixing them.

For writing new documentation, follow the following guidelines:

  • In class prototypes, only show user-facing methods.
  • Document the return types of every method shown.
  • Try to keep the Markdown lines between 80-90 characters long wherever reasonable.

It is also advised to follow this general outline for pages:

---
short-description: ...
...

# Name

Short description of the functionality.

## Prototype

Prototype for the class or function(s).

### Requirements (optional)

Constraints placed on template types.

### Arguments (optional)

List of arguments and their description.

### Return value(s) (optional)

Description of return values and types.

## Examples (optional)

Example code

Code output

Senders, receivers, and operations

libasync is in part built around the concept of senders and receivers. Using senders and receiver allows for allocation-free operation where otherwise an allocation would be necessary for the coroutine frame. For example, all of the algorithms and other asynchronous operations are written using them.

Concepts

Sender

A sender is an object that holds all the necessary arguments and knows how to start an asynchronous operation. It is moveable.

Every sender must have an appropriate connect member method or function overload that accepts a receiver and is used to form the operation.

Operation

An operation is an object that stores the necessary state and handles the actual asynchronous operation. It is immovable, and as such pointers to it will remain valid for as long as the operation exists. When the operation is finished, it notifies the receiver and optionally passes it a result value.

Every operation must either have a void start() or a bool start_inline() method that is invoked when the operation is first started. void start() is equivalent to bool start_inline() with return false; at the end.

Inline and no-inline completion

Operations that complete synchronously can signal inline completion. If an operation completes inline, it sets the value using set_value_inline, and returns true from start_inline (Operations that have a start method cannot complete inline). Inline completion allows for certain optimizations, like avoiding suspending the coroutine if the operation completed synchronously.

Receiver

A receiver is an object that knows what to do after an operation finishes (e.g. how to resume the coroutine). It optionally receives a result value from the operation. It is moveable.

Every receiver must have void set_value_inline(...) and void set_value_noinline(...) methods that are invoked by the operation when it completes.


short-description: Brief overview of a custom sender and operation ...

Writing your own sender and operation

While libasync provides a veriaty of existing types, one may wish to write their own senders and operations. The most common reason for writing a custom sender and operation is wrapping around an existing callback or event based API while avoiding extra costs induced by allocating coroutine frames. The following section goes into detail on how to implement them by implementing a simple wrapper for libuv's uv_write.

End effect

The sender and operation we'll implement here will allow us to do the following:

auto result = co_await write(my_handle, my_bufs, n_my_bufs);
if (result < 0)
	/* report error */

The implementation

The sender

As explained in , the sender is an object that stores the neceessary state to start an operation. Let's start off by looking at the prototype for uv_write:

int uv_write(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);

The function takes some object that stores the state (not to be confused with our operation object), a handle to the stream, buffers to write and a callback. The callback is also given a status code, which we will propagate back to the user. The state and callback will be handled by our operation, which leaves only the handle, buffers and status code.

Let's start by writing a simple class:

struct [[nodiscard]] write_sender {
	using value_type = int; // Status code

	uv_stream_t *handle;
	const uv_buf_t *bufs;
	size_t nbufs;
};

As can be seen, the sender class is quite simple. The nodiscard attribute only helps catch errors caused by accidentally ignoring the sender without awaiting it and can be omitted.

Next we add a simple function used to construct the sender:

write_sender write(uv_stream_t *handle, const uv_buf_t *bufs, size_t nbufs) {
	return write_sender{handle, bufs, nbufs};
}

In addition to that, we need the connect overload:

template <typename Receiver>
write_operation<Receiver> connect(write_sender s, Receiver r) {
	return {s, std::move(r)};
}

connect simply constructs an operation from the sender and receiver.

We also add an implementation of operator co_await for our class so that we can co_await it inside of a coroutine:

async::sender_awaiter<write_sender, write_sender::value_type>
operator co_await(write_sender s) {
	return {s};
}

async::sender_awaiter is a special type that can suspend and resume the coroutine, and internally connects a receiver to our sender.

The operation

With the sender done, what remains to be written is the operation. As noted earlier, the operation is constructed using the sender and receiver, and it stores the operation state. As such, we want each call to write to have an unique operation object. Let's start by writing a skeleton for the class:

template <typename Receiver>
struct write_operation {
	write_operation(write_sender s, Receiver r)
	: req_{}, handle_{s.handle}, bufs_{s.bufs}, nbufs_{s.nbufs}, r_{std::move(r)} { }

	write_operation(const write_operation &) = delete;
	write_operation &operator=(const write_operation &) = delete;
	write_operation(write_operation &&) = delete;
	write_operation &operator=(write_operation &&) = delete;

private:
	uv_write_t req_;
	uv_stream_t *handle_;
	const uv_buf_t *bufs_;
	size_t nbufs_;

	Receiver r_;
};

The operation stores all the necessary state, and is templated on the receiver type in order to support any receiver type.

The operation is also made immovable and non-copyable so that pointers to it can safely be taken without worrying that they may become invalid at some point.

Next, we add a start_inline method:

bool start_inline() {
	auto result = uv_write(&req_, handle_, bufs_, nbufs_, [] (uv_write_t *req, int status) {
		/* TODO */
	});

	if (result < 0) {
		async::execution::set_value_inline(r_, result);
		return true; // Completed inline
	}

	return false; // Did not complete inline
}

We use start_inline here in order to notify the user of any immediate errors synchronously. We use functions defined inside of async::execution to set the value, because they properly detect which method should be called on the receiver.

Now, let's implement the actual asynchronous completion:

...
	handle_->data = this;
	auto result = uv_write(&req_, handle_, bufs_, nbufs_, [] (uv_write_t *req, int status) {
		auto op = static_cast<write_operation *>(req->handle->data);
		op->complete(status);
	});
...

We use the handle's user data field to store a pointer to this instance of the operation in order to be able to access it later. This is necessary as otherwise we'd have no way of knowing which operation caused our callback to be entered. Do note that this way of implementing it means that only one write operation may be in progress at once. One way to solve this would be to have a wrapper object that manages the handle and has a map of req to write_operation, but that's beyond the scope of this example.

Note: Due to how libuv operates (it hides the actual event loop and instead dispatches callbacks directly), the will not have any logic apart from invoking uv_run to do one iteration of the event loop.

Finally, we add our complete method:

private:
	void complete(int status) {
		async::execution::set_value_noinline(r_, status);
	}

On complete, we use async::execution::set_value_noinline to set the result value and notify the receiver that the operation is complete (so that it can for example resume the suspended coroutine, like the async::sender_awaiter receiver).

Full code

All of this put together gives us the following code:

// ----------------------------------------------
// Sender
// ----------------------------------------------

struct [[nodiscard]] write_sender {
	using value_type = int; // Status code

	uv_stream_t *handle;
	const uv_buf_t *bufs;
	size_t nbufs;
};

write_sender write(uv_stream_t *handle, const uv_buf_t *bufs, size_t nbufs) {
	return write_sender{handle, bufs, nbufs};
}

template <typename Receiver>
write_operation<Receiver> conneect(write_sender s, Receiver r) {
	return {s, std::move(r)};
}

async::sender_awaiter<write_sender, write_sender::value_type>
operator co_await(write_sender s) {
	return {s};
}

// ----------------------------------------------
// Operation
// ----------------------------------------------

template <typename Receiver>
struct write_operation {
	write_operation(write_sender s, Receiver r)
	: req_{}, handle_{s.handle}, bufs_{s.bufs}, nbufs_{s.nbufs}, r_{std::move(r)} { }

	write_operation(const write_operation &) = delete;
	write_operation &operator=(const write_operation &) = delete;
	write_operation(write_operation &&) = delete;
	write_operation &operator=(write_operation &&) = delete;

	bool start_inline() {
		handle_->data = this;
		auto result = uv_write(&req_, handle_, bufs_, nbufs_, [] (uv_write_t *req, int status) {
			auto op = static_cast<write_operation *>(req->handle->data);
			op->complete(status);
		});

		if (result < 0) {
			async::execution::set_value_inline(r_, result);
			return true; // Completed inline
		}

		return false; // Did not complete inline
	}

private:
	void complete(int status) {
		async::execution::set_value_noinline(r_, status);
	}

	uv_write_t req_;
	uv_stream_t *handle_;
	const uv_buf_t *bufs_;
	size_t nbufs_;

	Receiver r_;
};

IO service

The IO service is an user-provided class which manages waiting for events and waking up coroutines/operations based on them.

The IO service must provide one method: void wait(). This method is called when there is no more work to do currently. It waits for any event to happen, and wakes up the appropriate coroutine/operation which awaited the event.

Note: async::run and async::run_forever (see here) take the IO service by value, not by reference.

Example

The following example shows the approximate call graph executing an event-loop-driven coroutine would take:

  • async::run(my_sender, my_io_service)
    • my_operation = async::execution::connect(my_sender, internal_receiver)
    • async::execution::start_inline(my_operation)
      • my_operation starts running...
        • co_await some_ev
          • some_ev operation is started
            • my_io_service.add_waiter(this)
    • (async::execution::start_inline returns false)
    • my_io_service.wait()
      • IO service waits for event to happen...
      • waiters_.front()->complete()
        • some_ev operation completes
          • my_operation resumes
            • co_return 2
              • async::execution::set_value_noinline(internal_receiver, 2)
    • return internal_receiver.value
  • (async::run returns 2)

Headers

The following sections contain documentation for libasync's header files. Each section corresponds to one header file.

algorithm

#include <async/algorithm.hpp>

This header provides various algorithms that can be used as building blocks for creating more complex functionality without incuring any costs of memory allocation.

invocable

invocable is an operation that executes the given functor and completes inline with the value returned by the functor.

Prototype

template <typename F>
sender invocable(F f);

Requirements

F is a functor that takes no arguments.

Arguments

  • f - the functor to execute.

Return value

This function returns a sender of unspecified type. This sender returns the value returned by the functor.

Examples

auto fn = [] { return 1; };
std::cout << async::run(async::invocable(fn)) << std::endl;

Output:

1

transform

transform is an operation that starts the given sender, and upon completion applies the functor to it's return value.

Prototype

template <typename Sender, typename F>
sender transform(Sender ds, F f);

Requirements

Sender is a sender and F is a functor that accepts the return value of the sender as an argument.

Arguments

  • ds - the sender whose result value should be transformed.
  • f - the functor to use.

Return value

This function returns a sender of unspecified type. This sender returns the return value of the functor.

Examples

auto coro = [] () -> async::result<int> {
	co_return 5;
};

std::cout << async::run(async::transform(coro(), [] (int i) { return i * 2; }) << std::endl;

Output:

10

ite

ite is an operation that checks the given condition, and starts the "then" or "else" sender depending on the result.

Prototype

template <typename C, typename ST, typename SE>
sender ite(C cond, ST then_s, SE else_s);

Requirements

C is a functor that returns a truthy or falsy value. ST and SE are senders. ST and SE must have the same return type.

Arguments

  • cond - the condition to check.
  • then_s - the sender to start if condition is true.
  • else_s - the sender to start if condition is false.

Return value

This function returns a sender of unspecified type. The sender returns the value of the sender that was started depending on the condition.

Examples

auto then_coro = [] () -> async::result<int> {
	co_return 1;
};

auto else_coro = [] () -> async::result<int> {
	co_return 2;
};

std::cout << async::run(async::ite([] { return true; }, then_coro(), else_coro())) << std::endl;
std::cout << async::run(async::ite([] { return false; }, then_coro(), else_coro())) << std::endl;

Output:

1
2

repeat_while

repeat_while is an operation that continuously checks the given condition, and as long as it's true, it invokes the given functor to obtain a sender, and starts it.

Prototype

template <typename C, typename SF>
sender repeat_while(C cond, SF factory);

Requirements

C is a functor that returns a truthy or falsy value. SF is a functor that returns a sender.

Arguments

  • cond - the condition to check.
  • factory - the functor to call to obtain the sender on every iteration.

Return value

This function returns a sender of unspecified type. The sender does not return any value.

Examples

int i = 0;
async::run(async::repeat_while([&] { return i++ < 5; },
		[] () -> async::result<void> {
			std::cout << "Hi" << std::endl;
			co_return;
		}));

Output:

Hi
Hi
Hi
Hi
Hi

race_and_cancel

race_and_cancel is an operation that obtains senders using the given functors, starts all of them concurrently, and cancels the remaining ones when one finishes.

Note: race_and_cancel does not guarantee that only one sender completes without cancellation.

Prototype

template <typename... Functors>
sender race_and_cancel(Functors... fs);

Requirements

Every type of Functors is invocable with an argument of type async::cancellation_token and produces a sender.

Arguments

  • fs - the functors to invoke to obtain the senders.

Return value

This function returns a sender of unspecified type. The sender does not return any value.

Examples

async::run(async::race_and_cancel(
	[] (async::cancellation_token) -> async::result<void> {
		std::cout << "Hi 1" << std::endl;
		co_return;
	},
	[] (async::cancellation_token) -> async::result<void> {
		std::cout << "Hi 2" << std::endl;
		co_return;
	}
));

Possible output:

Hi 1
Hi 2

let

let is an operation that obtains a value using the given functor, stores it in a variable, and obtains a sender to start using the second functor, passing a reference to the variable as an argument.

Prototype

template <typename Pred, typename Func>
sender let(Pred pred, Func func);

Requirements

Pred is a functor that returns a value. Func is a functor that returns a sender and accepts a reference to the value returned by Pred as an argument.

Arguments

  • pred - the functor to execute to obtain the value.
  • func - the function to call to obtain the sender.

Return value

This function returns a sender of unspecified type. The sender returns the value returned by the obtained sender.

Examples

std::cout << async::run(async::let([] { return 3; },
			[] (int &i) -> async::result<int> {
				return i * 2;
			})) << std::endl;

Output:

6

sequence

sequence is an operation that sequentially starts the given senders.

Prototype

template <typename ...Senders>
sender sequence(Senders ...senders);

Requirements

Senders is not empty, and every type in it is a sender. All but the last sender must have a return type of void.

Arguments

  • senders - the senders to run.

Return value

This function returns a sender of unspecified type. The sender returns the value returned by the last sender.

Examples

int steps[4] = {0, 0, 0, 0};
int v = async::run([&]() -> async::result<int> {
	int i = 0;
	co_return co_await async::sequence(
		[&]() -> async::result<void> {
			steps[0] = i;
			i++;
			co_return;
		}(),
		[&]() -> async::result<void> {
			steps[1] = i;
			i++;
			co_return;
		}(),
		[&]() -> async::result<void> {
			steps[2] = i;
			i++;
			co_return;
		}(),
		[&]() -> async::result<int> {
			steps[3] = i;
			i++;
			co_return i * 10;
		}()
	);
}());

std::cout << v << " " << steps[0] << steps[1] << steps[2] << steps[3] << std::endl;

Output:

40 0123

when_all

when_all is an operation that starts all the given senders concurrently, and only completes when all of them complete.

Prototype

template <typename... Senders>
sender when_all(Senders... senders);

Requirements

Every type of Senders is a sender that doesn't return any value.

Arguments

  • senders - the senders to start.

Return value

This function returns a sender of unspecified type. The sender does not return any value.

Examples

async::run(async::when_all(
	[] () -> async::result<void> {
		std::cout << "Hi 1" << std::endl;
		co_return;
	}(),
	[] () -> async::result<void> {
		std::cout << "Hi 2" << std::endl;
		co_return;
	}()
));
std::cout << "Done" << std::endl;

Possible output:

Hi 1
Hi 2
Done

basic

#include <async/basic.hpp>

This header provides basic functionality.

sender_awaiter and make_awaiter

sender_awaiter is a coroutine promise type that allows for awaiting a sender. It connects an internal receiver to the given sender, and starts the resulting operation when the result is awaited.

make_awaiter is a function that obtains the awaiter associated with a sender. It does that by getting the result of operator co_await.

Prototype

template <typename S, typename T = void>
struct [[nodiscard]] sender_awaiter {
	sender_awaiter(S sender); // (1)

	bool await_ready(); // (2)
	bool await_suspend(std::coroutine_handle<>); // (2)
	T await_resume(); // (2)
}

template<typename S>
auto make_awaiter(S &&s); // (3)
  1. Constructs the object with the given sender.
  2. Coroutine promise methods.
  3. Get the awaiter for the given sender.

Requirements

S is a sender and T is S::value_type or a type convertible to it.

Arguments

  • sender - the sender to await.

Return values

  1. N/A
  2. These methods return implementation-specific values.
  3. This function returns the result object of co_awaiting the given object (without performing the actual asynchronous await operation).

any_receiver

any_receiver is a wrapper type that wraps around any receiver type and handles calling set_value on it.

Prototype

template <typename T>
struct any_receiver {
	template <typename R>
	any_receiver(R receiver); // (1)

	void set_value(T); // (2)
	void set_value_noinline(T); // (2)
}
  1. Constructs the object with the given receiver.
  2. Forwards the value to the given receiver.

Requirements

T is any type, R is a receiver that accepts values of type T. R is trivially copyable, and is smaller or of the same size and alignment as a void *.

Arguments

  • receiver - the receiver to wrap.

Return values

  1. N/A
  2. These methods don't return any value.

run and run_forever

run and run_forever are top-level functions used for running coroutines. run runs a coroutine until it completes, while run_forever runs coroutines indefinitely via the IO service.

Prototype

template<typename IoService>
void run_forever(IoService ios); // (1) 

template<typename Sender>
Sender::value_type run(Sender s); // (2)

template<typename Sender, typename IoService>
Sender::value_type run(Sender s, IoService ios); // (3)
  1. Run the IO service indefinitely
  2. Start the sender and wait until it completes. The sender must complete inline as there's no way to wait for it to complete.
  3. Same as (2) but the sender can complete not-inline.

Requirements

IoService is an IO service, and Sender is a sender.

Arguments

  • IoService - the IO service to use to wait for completion.
  • Sender - the sender to start.

Return value

  1. This function does not return.
  2. This function returns the result value obtained from the sender.
  3. Same as (2).

Examples

int i = async::run([] () -> async::result<int> {
			co_return 1;
		}());
std::cout << i << std::endl;

Output:

1

detached, detach and detach_with_allocator

detached is a coroutine type used for detached coroutines. Detached coroutines cannot be awaited and they do not suspend the coroutine that started them.

detach and detach_with_allocator are functions that take a sender and run them as if they were a detached coroutine. detach is a wrapper around detach_with_allocator that uses operator new/operator delete for allocating memory. The allocator is used to allocate a structure that holds the operation and continuation until the operation completes.

Prototype

template<typename Allocator, typename S, typename Cont>
void detach_with_allocator(Allocator allocator, S sender, Cont continuation); // (1)

template<typename Allocator, typename S>
void detach_with_allocator(Allocator allocator, S sender); // (2)

template<typename S>
void detach(S sender); // (3)

template<typename S, typename Cont>
void detach(S sender, Cont continuation); // (4)
  1. Detach a sender using an allocator, and call the continuation after it completes.
  2. Same as (1) but without the continuation.
  3. Same as (2) but without an allocator.
  4. Same as (1) but without an allocator.

Requirements

Allocator is an allocator, S is a sender, Cont is a functor that takes no arguments.

Arguments

  • allocator - the allocator to use.
  • sender - the sender to start.
  • continuation - the functor to call on completion.

Return value

These functions don't return any value.

Examples

async::oneshot_event ev;

async::run([] (async::oneshot_event &ev) -> async::detached {
		std::cout << "Coroutine 1" << std::endl;
		co_await ev.wait();
		std::cout << "Coroutine 2" << std::endl;
	}(ev));

std::cout << "Before event raise" << std::endl;
ev.raise();
std::cout << "After event raise" << std::endl;

Output:

Coroutine 1
Before event raise
Coroutine 2
After event raise

spawn_with_allocator

spawn_with_allocator is a function that takes a sender and a receiver, connects them and detaches in a way similar to detach_with_allocator.

Prototype

template<typename Allocator, typename S, typename R>
void spawn_with_allocator(Allocator allocator, S sender, R receiver);

Requirements

Allocator is an allocator, S is a sender, R is a receiver.

Arguments

  • allocator - the allocator to use.
  • sender - the sender to start.
  • receiver - the receiver to use.

Return value

This function doesn't return any value.

Examples

struct my_receiver {
	void set_value_inline(int value) {
		std::cout << "Value: " << value << std::endl;
	}

	void set_value_noinline(int value) {
		std::cout << "Value: " << value << std::endl;
	}
};

async::oneshot_event ev;

async::spawn_with_allocator(frg::stl_allocator{},
		[] (async::oneshot_event &ev) -> async::result<int> {
			std::cout << "Start sender" << std::endl;
			co_await ev.wait();
			co_return 1;
		}(ev), my_receiver{});

std::cout << "Before event raise" << std::endl;
ev.raise();
std::cout << "After event raise" << std::endl;

Output:

Start sender
Before event raise
Value: 1
After event raise

result

#include <async/result.hpp>

result is a generic coroutine promise and sender type. It it used for coroutines for which you need to await the result of.

Prototype

template <typename T>
struct result;

Requirements

T is the type of the value returned by the coroutine.

Examples

async::result<int> coro1(int i) {
	co_return i + 1;
}

async::result<int> coro2(int i) {
	co_return i * co_await coro1(i);
}

int main() {
	std::cout << async::run(coro2(5)) << std::endl;
}

Output:

30

oneshot_event

#include <async/oneshot-event.hpp>

oneshot_event is an event type that can be only raised once, and supports multiple waiters. After the initial raise, any further waits will complete immediately, and further raises will be a no-op.

Prototype

struct oneshot_event {
	void raise(); // (1)

	sender wait(cancellation_token ct); // (2)
	sender wait(); // (3)
};
  1. Raises an event.
  2. Returns a sender for the wait operation. The operation waits for the event to be raised.
  3. Same as (2) but it cannot be cancelled.

Arguments

  • ct - the cancellation token to use to listen for cancellation.

Return values

  1. This method doesn't return any value.
  2. This method returns a sender of unspecified type. The sender completes with either true to indicate success, or false to indicate that the wait was cancelled.
  3. Same as (2) except the sender completes without a value.

Examples

async::oneshot_event ev;

auto coro = [] (async::oneshot_event &ev) -> async::detached {
	std::cout << "Before wait" << std::endl;
	co_await ev.wait();
	std::cout << "After wait" << std::endl;
};

coro(ev);
std::cout << "Before raise" << std::endl;
ev.raise();
std::cout << "After raise" << std::endl;
coro(ev);

Output:

Before wait
Before raise
After wait
After raise
Before wait
After wait

async/wait-group.hpp

wait_group

#include <async/wait-group.hpp>

wait_group is a synchronization primitive that waits for a counter (that can be incremented) to reach zero. It conceptually maps to a group of related work being done in parallel, and a few consumers waiting for that work to be done. The amount of work is increased by calls to add() and decreased by calls to done().

This struct also implements BasicLockable so that it can be used with std::unique_lock.

Prototype

struct wait_group {
	void done(); // (1)
	void add(int n); // (2)

	sender wait(cancellation_token ct); // (3)
	sender wait(); // (4)

	void lock(); // (5)
	void unlock(); // (6)
};
  1. "Finishes" a work (decrements the work count).
  2. "Adds" more work (increments the work count by n).
  3. Returns a sender for the wait operation. The operation waits for the counter to drop to zero.
  4. Same as (3) but it cannot be cancelled.
  5. Equivalent to add(1).
  6. Equivalent to done().

Arguments

  • n - amount of work to "add" to this work group
  • ct - the cancellation token to use to listen for cancellation.

Return values

  1. This method doesn't return any value.
  2. This method doesn't return any value.
  3. This method returns a sender of unspecified type. The sender completes with either true to indicate success, or false to indicate that the wait was cancelled.
  4. Same as (3) except the sender completes without a value.

Examples

async::wait_group wg { 3 };

([&wg] () -> async::detached {
	std::cout << "before wait" << std::endl;
	co_await wg.wait();
	std::cout << "after wait" << std::endl;
})();

auto done = [&wg] () {
	std::cout << "before done" << std::endl;
	wg.done();
	std::cout << "after done" << std::endl;
};

done();
done();
std::cout << "before add" << std::endl;
wg.add(2);
std::cout << "after add" << std::endl;
done();
done();
done();

Output:

before wait
before done
after done
before done
after done
before add
after add
before done
after done
before done
after done
before done
after wait
after done

wait_in_group

wait_in_group(wg, S) takes a sender S and adds it to the work group wg (calls wg.add(1)) immediately before it's started and marks it as done (calls wg.done()) immediately after.

Prototype

template<typename S>
sender wait_in_group(wait_group &wg, S sender);

Requirements

S is a sender.

Arguments

  • wg - wait group to wait in
  • sender - sender to wrap in the wait group

Return value

The value produced by the sender.

Examples

bool should_run() {
	/* ... */
}
async::result<void> handle_conn(tcp_socket conn) {
	/* ... */
}

/* ... */

tcp_socket server;
server.bind(":80");
server.listen(32);
async::wait_group handlers { 0 };
while (should_run()) {
	auto conn = socket.accept();
	async::detach(async::wait_in_group(handlers, handle_conn(std::move(conn))));
}

/* wait for all connections to terminate */
handlers.wait();

recurring_event

#include <async/recurring-event.hpp>

recurring_event is an event type that can be raised multiple times, and supports multiple waiters. On raise, all waiters are woken up sequentially.

Prototype

struct recurring_event {
	void raise(); // (1)

	template <typename C>
	sender async_wait_if(C cond, cancellation_token ct = {}); // (2)
	sender async_wait(cancellation_token ct = {}); // (3)
};
  1. Raises an event.
  2. Returns a sender for the wait operation. The operation checks the condition, and if it's true, waits for the event to be raised.
  3. Same as (2) but without the condition.

Requirements

C is a functor that accepts no arguments and returns a truthy or falsy value.

Arguments

  • ct - the cancellation token to use to listen for cancellation.

Return values

  1. This method doesn't return any value.
  2. This method returns a sender of unspecified type. The sender completes with either true to indicate success, or false to indicate that the wait was cancelled, or that the condition was false.
  3. Same as (2)

Examples

async::recurring_event ev;

auto coro = [] (int i, async::recurring_event &ev) -> async::detached {
	std::cout << i << ": Before wait" << std::endl;
	co_await ev.async_wait();
	std::cout << i << ": After wait" << std::endl;
};

coro(1, ev);
coro(2, ev);
std::cout << "Before raise" << std::endl;
ev.raise();
std::cout << "After raise" << std::endl;
coro(3, ev);

Possible output:

1: Before wait
2: Before wait
Before raise
1: After wait
2: After wait
After raise
3: Before wait

sequenced_event

#include <async/sequenced-event.hpp>

sequenced_event is an event type that can be raised multiple times, and supports multiple waiters. On raise, all waiters are woken up sequentially. The event maintains a sequence counter to detect missed raises.

Prototype

struct sequenced_event {
	void raise(); // (1)

	uint64_t next_sequence(); // (2)

	sender async_wait(uint64_t in_seq, async::cancellation_token ct = {}); // (3)
};
  1. Raises an event.
  2. Returns the next sequence number for the event (the sequence number after the event is raised).
  3. Returns a sender for the wait operation. The operation checks whether the input sequence is equal to the event sequeence, and if it's true, waits for the event to be raised.

Arguments

  • in_seq - input sequence number to compare against the event sequence number.
  • ct - the cancellation token to use to listen for cancellation.

Return values

  1. This method doesn't return any value.
  2. This method returns the next sequence number for the event (the sequence number after the event is raised).
  3. This method returns a sender of unspecified type. The sender completes with the current event sequence number.

Examples

async::sequenced_event ev;

std::cout << ev.next_sequence() << std::endl;
ev.raise();
std::cout << ev.next_sequence() << std::endl;

auto seq = async::run(
		[] (async::sequenced_event &ev) -> async::result<uint64_t> {
			// Current sequence is 1, so waiting at sequence 0 will immediately complete.
			co_return co_await ev.async_wait(0);
		}(ev));

std::cout << seq << std::endl;

Output:

1
2
1

cancellation

#include <async/cancellation.hpp>

This header provides facilities to request and handle cancellation of operations.

cancellation_event and cancellation_token

cancellation_event is a type that is used to request cancellation of an asynchronous operation.

cancellation_token is a type constructible from a cancellation_event that's used by operations to check for cancellation.

Prototype

struct cancellation_event {
	cancellation_event(const cancellation_event &) = delete;
	cancellation_event(cancellation_event &&) = delete;

	cancellation_event &operator=(const cancellation_event &) = delete;
	cancellation_event &operator=(cancellation_event &&) = delete;

	void cancel(); // (1)
	void reset(); // (2)
};

struct cancellation_token {
	cancellation_token(); // (3)
	cancellation_token(cancellation_event &ev); // (4) 

	bool is_cancellation_requested(); // (5)
};
  1. Requests cancellation.
  2. Resets the object to prepare it for reuse.
  3. Default constructs a cancellation token.
  4. Constructs a cancellation token from a cancellation event.
  5. Checks whether cancellation is requested.

Arguments

  • ev - the cancellation event to use.

Return values

  1. This method doesn't return any value.
  2. Same as (1).
  3. N/A
  4. N/A
  5. Returns true if cancellation was requested, false otherwise.

Example

async::run([] () -> async::result<void> {
	async::queue<int, frg::stl_allocator> q;

	async::cancellation_event ce;
	ce.cancel(); // Request cancellation immediately

	auto v = q.async_get(ce);
	if (!v)
		std::cout << "Cancelled" << std::endl;
	else
		std::cout << "Completed: " << *v << std::endl;
}());

Output:

Cancelled

cancellation_callback and cancellation_observer

cancellation_callback registers a callback that will be invoked when cancellation occurs.

cancellation_observer checks for cancellation, potentially invoking the handler and returns whether cancellation occured.

Prototype

template <typename F>
struct cancellation_callback {
	cancellation_callback(cancellation_token ct, F functor = {}); // (1)

	cancellation_callback(const cancellation_callback &) = delete;
	cancellation_callback(cancellation_callback &&) = delete;

	cancellation_callback &operator=(const cancellation_callback &) = delete;
	cancellation_callback &operator=(cancellation_callback &&) = delete;

	void unbind(); // (2)
};

template <typename F>
struct cancellation_observer {
	cancellation_observer(F functor = {}); // (3)

	cancellation_observer(const cancellation_observer &) = delete;
	cancellation_observer(cancellation_observer &&) = delete;

	cancellation_observer &operator=(const cancellation_observer &) = delete;
	cancellation_observer &operator=(cancellation_observer &&) = delete;

	bool try_set(cancellation_token ct); // (4)

	bool try_reset(); // (5)
};
  1. Constructs and attaches the cancellation callback to the given token.
  2. Detaches the cancellation callback.
  3. Constructs a cancellation observer.
  4. Attaches the cancellation observer to the given token and checks for cancellation without invoking the functor.
  5. Checks for cancellation, potentially invoking the functor, and detaches the cancellation observer.

Requirements

F is a functor that takes no arguments.

Arguments

  • ct - the cancellation token to attach to.
  • functor - the functor to call on cancellation.

Return values

  1. N/A
  2. This method doesn't return any value.
  3. N/A
  4. Returns false if cancellation was already requested and the observer hasn't been attached, true otherwise.
  5. Same as (4).

suspend_indefinitely

suspend_indefinitely is an operation that suspends until it is cancelled.

Prototype

sender suspend_indefinitely(cancellation_token cancellation);

Arguments

  • cancellation - the cancellation token to use.

Return value

This function returns a sender of unspecified type. The sender doesn't return any value, and completes when cancellation is requested.

Examples

async::cancellation_event ce;

auto coro = [] (async::cancellation_token ct) -> async::detached {
	std::cout << "Before await" << std::endl;
	co_await async::suspend_indefinitely(ct);
	std::cout << "After await" << std::endl;
};

coro(ce);
std::cout << "Before cancel" << std::endl;
ce.cancel();
std::cout << "After cancel" << std::endl;

Output:

Before await
Before cancel
After await
After cancel

execution

#include <async/execution.hpp>

The following objects and type definitions are in the async::execution namespace.

This header contains customization point objects (CPOs) for the following methods/functions:

  • connect (as a member or function),
  • start (as a member or function),
  • start_inline (as a member),
  • set_value (as a member),
  • set_value_inline (as a member),
  • set_value_noinline (as a member).

In addition to that, it provides a convenience type definition for working with operations:

template<typename S, typename R>
using operation_t = std::invoke_result_t<connect_cpo, S, R>;

Examples

auto op = async::execution::connect(my_sender, my_receiver);
bool finished_inline = async::execution::start_inline(op);

queue

#include <async/queue.hpp>

queue is a type which provides a queue on which you can asynchronously wait for items to appear.

Prototype

template <typename T, typename Allocator>
struct queue {
	queue(Allocator allocator = {}); // (1)

	void put(T item); // (2)

	template <typename ...Ts>
	void emplace(Ts &&...ts); // (3)

	sender async_get(cancellation_token ct = {}); // (4)

	frg::optional<T> maybe_get() // (5)
};
  1. Constructs a queue with the given allocator.
  2. Inserts an item into the queue.
  3. Emplaces an item into the queue.
  4. Returns a sender for the get operation. The operation waits for an item to be inserted and returns it.
  5. Pops and returns the top item if it exists, or frg::null_opt otherwise.

Requirements

T is moveable. Allocator is an allocator.

  1. T is constructible with Ts.

Arguments

  • allocator - the allocator to use.
  • item - the item to insert into the queue.
  • ts - the arguments to pass to the constructor of T when inserting it into the queue.
  • ct - the cancellation token to use.

Return values

  1. N/A
  2. This method doesn't return any value.
  3. Same as (2).
  4. This method returns a sender of unspecified type. The sender returns a frg::optional<T> and completes with the value, or frg::null_opt if the operation was cancelled.
  5. This method returns a value of type frg::optional<T>. It returns a value from the queue, or frg::null_opt if the queue is empty.

Examples

auto coro = [] (async::queue<int, frg::stl_allocator> &q) -> async::detached {
	std::cout << "Got " << *(co_await q.async_get()) << std::endl;
	std::cout << "Got " << *(co_await q.async_get()) << std::endl;
};

async::queue<int, frg::stl_allocator> q;

coro(q);

q.put(1);
q.put(2);

Output:

1
2

mutex

#include <async/mutex.hpp>

This header provides asynchronous mutex types.

mutex

mutex is a mutex which supports asynchronous acquisition. When the mutex is contended, the operation asynchronously blocks until the current holder releases it.

Prototype

struct mutex {
	sender async_lock(); // (1)

	bool try_lock(); // (2)

	void unlock(); // (3)
};
  1. Asynchronously acquire the mutex.
  2. Synchronously try to acquire the mutex.
  3. Release the mutex.

Return values

  1. This method returns a sender of unspecified type. The sender does not return any value, and completes once the mutex is acquired.
  2. This method returns true if the mutex was successfully acquired, false otherwise.
  3. This method doesn't return any value.

Examples

async::mutex mtx;
async::queue<int, frg::stl_allocator> q;

auto coro = [] (int i, auto &mtx, auto &q) -> async::detached {
	std::cout << i << ": taking" << std::endl;
	co_await mtx.async_lock();
	std::cout << i << ": " << mtx.try_lock() << std::endl;
	co_await q.async_get();
	std::cout << i << ": releasing" << std::endl;
	mtx.unlock();
};

coro(1, mtx, q);
coro(2, mtx, q);

q.put(1);
q.put(2);

Output:

1: taking
1: 0
2: taking
1: releasing
2: 0
2: releasing

shared_mutex

shared_mutex is a mutex which supports asynchronous acquisition. It can be acquired in either shared mode, or exclusive mode. Acquiring in exclusive mode blocks until all shared owners, or the current exclusive owner release the mutex, and acquiring in shared mode only blocks if it's currently owned exclusively. When the mutex is contended, the operation asynchronously blocks until the current holder releases it. These are also known as read-write mutexes, where shared mode is a read, and exclusive mode is a write.

Prototype

struct shared_mutex {
	sender async_lock(); // (1)

	sender async_lock_shared(); // (2)

	void unlock(); // (3)

	void unlock_shared(); // (4)
};
  1. Asynchronously acquire the mutex in exclusive mode.
  2. Asynchronously acquire the mutex in shared mode.
  3. Release the mutex (mutex must be in exclusive mode).
  4. Release the mutex (mutex must be in shared mode).

Return values

  1. This method returns a sender of unspecified type. The sender does not return any value, and completes once the mutex is acquired.
  2. Same as (1).
  3. This method doesn't return any value.
  4. Same as (3).

Examples

async::shared_mutex mtx;
async::queue<int, frg::stl_allocator> q;

auto coro_shared = [] (int i, auto &mtx, auto &q) -> async::detached {
	std::cout << i << ": taking shared" << std::endl;
	co_await mtx.async_lock_shared();
	std::cout << i << ": acquired shared" << std::endl;
	co_await q.async_get();
	std::cout << i << ": releasing shared" << std::endl;
	mtx.unlock_shared();
};

auto coro_exclusive = [] (int i, auto &mtx, auto &q) -> async::detached {
	std::cout << i << ": taking exclusive" << std::endl;
	co_await mtx.async_lock();
	std::cout << i << ": acquired exclusive" << std::endl;
	co_await q.async_get();
	std::cout << i << ": releasing exclusive" << std::endl;
	mtx.unlock();
};

coro_shared(1, mtx, q);
coro_shared(2, mtx, q);
coro_exclusive(3, mtx, q);
coro_exclusive(4, mtx, q);

q.put(1);
q.put(2);
q.put(3);
q.put(4);

Output:

1: taking shared
1: acquired shared
2: taking shared
2: acquired shared
3: taking exclusive
4: taking exclusive
1: releasing shared
2: releasing shared
3: acquired exclusive
3: releasing exclusive
4: acquired exclusive
4: releasing exclusive

promise

#include <async/promise.hpp>

This header provides promise and future types.

Examples

async::future<int, frg::stl_allocator> future;
{
	async::promise<int, frg::stl_allocator> promise;
	future = promise.get_future();

	promise.set_value(3);
}

std::cout << *async::run(future.get()) << std::endl;

Output:

3

promise

promise is a type that can be used to set a value that can be awaited via a future. The stored data is only destroyed when the promise, and all associated futures go out of scope. A promise is movable and non-copyable.

Prototype

template <typename T, typename Allocator>
struct promise {
	future<T, Allocator> get_future(); // (1)

	template <typename U>
	void set_value(U &&v); // (2)
	void set_value(); // (3)
};
  1. Obtains the future associated with this promise. This can be called multiple times to get multiple futures.
  2. Emplaces a value into the promise (overload for T other than void).
  3. Same as (2), except T must be void.

Requirements

T is any type. Allocator is an allocator. T is constructible from U.

Arguments

  • v - the value to emplace.

Return values

  1. This method returns a future object associated with the promise.
  2. This method doesn't return any value.
  3. Same as (2).

future

future is a type that can be used to await a value from the promise. The stored data is only destroyed when the promise, and all associated futures go out of scope.

Prototype

template <typename T, typename Allocator>
struct future {
	friend void swap(future &a, future &b); // (1)

	future(Allocator allocator = {}); // (2)

	future(const future &); // (3)
	future(future &&); // (4)

	future &operator=(const future &); // (3)
	future &operator=(future &&); // (4)

	sender get(cancellation_token ct); // (5)
	sender get(); // (6)

	bool valid(); // (7)
	operator bool (); // (7)
};
  1. Swaps two futures.
  2. Constructs an empty future.
  3. Copies a future. The copy of the future refers to the same state as the original object in a manner similar to a shared_ptr.
  4. Moves a future.
  5. Asynchronously obtains the value.
  6. Same as (5).
  7. Checks whether the future has valid state.

Requirements

T is any type. Allocator is an allocator.

Arguments

  • ct - the cancellation token to use.

Return values

  1. This function doesn't return any value.
  2. N/A
  3. N/A for constructor, reference to this object for assignment operator.
  4. Same as (3).
  5. This method returns a sender of unspecified type. If T is not void, the sender returns a value of type frg::optional<T *>. It returns the pointer if the value was obtained successfully, frg::null_opt if the operation was cancelled. If T is void, the sender returns true if the value was obtained successfully, false if the operation was cancelled.
  6. This method returns a sender of unspecified type. If T is not void, the sender returns a pointer to the stored value. If T is void, it doesn't return anything.
  7. These methods return true if the future has valid state, false otherwise.

post-ack

#include <async/post-ack.hpp>

This header provides classes for a producer-consumer data structure that requires every consumer to acknowledge the value before the producer can continue.

Examples

async::post_ack_mechanism<int> mech;

auto producer = [] (async::post_ack_mechanism<int> &mech) -> async::detached {
	std::cout << "Posting 1" << std::endl;
	co_await mech.post(1);
	std::cout << "Posting 2" << std::endl;
	co_await mech.post(2);
};

auto consumer = [] (async::post_ack_mechanism<int> &mech) -> async::detached {
	async::post_ack_agent<int> agent;
	agent.attach(&mech);

	std::cout << "Awaiting first value" << std::endl;
	auto handle = co_await agent.poll();
	std::cout << *handle << ", acking first value" << std::endl;
	handle.ack();

	std::cout << "Awaiting second value" << std::endl;
	handle = co_await agent.poll();
	std::cout << *handle << ", acking second value" << std::endl;
	handle.ack();

	agent.detach();
};

consumer(mech);
consumer(mech);
consumer(mech);

producer(mech);

Output:

Awaiting first value
Awaiting first value
Awaiting first value
Posting 1
1, acking first value
Awaiting second value
1, acking first value
Awaiting second value
1, acking first value
Posting 2
2, acking second value
2, acking second value
Awaiting second value
2, acking second value

post_ack_mechanism

post_ack_mechanism is the producer class of post-ack. It can asynchronously post a value, which will only complete once all consumers acknowledge the value.

Prototype

template <typename T>
struct post_ack_mechanism {
	sender post(T object); // (1)
};
  1. Asynchronously post a value.

Requirements

T is moveable.

Arguments

  • object - the value to post.

Return values

  1. This method returns a sender of unspecified type. The sender doesn't return any value.

post_ack_agent

post_ack_agent is the consumer class of post-ack. It can asynchronously poll for a value produced by the observed post_ack_mechanism.

Prototype

template <typename T>
struct post_ack_agent {
	void attach(post_ack_mechanism<T> *mech); // (1)

	void detach(); // (2)

	sender poll(cancellation_token ct = {}); // (3)
};
  1. Attach the agent (consumer) to a mechanism (producer).
  2. Detach the agent from the mechanism.
  3. Asynchronously poll for a value.

Requirements

T is moveable.

Arguments

  • mech - the mechanism to attach to.
  • ct - the cancellation token to use.

Return values

  1. This method doesn't return any value.
  2. This method doesn't return any value.
  3. This method returns a sender of unspecified type. The sender returns a value of type post_ack_handle<T>.

post_ack_handle

post_ack_handle is the value handle class of post-ack. It has methods to obtain a pointer to the value and to acknowledge the value.

Prototype

template <typename T>
struct post_ack_handle {
	friend void swap(post_ack_handle &a, post_ack_handle &b); // (1)

	explicit post_ack_handle() = default; // (2)

	post_ack_handle(const post_ack_handle &) = delete;
	post_ack_handle(post_ack_handle &&); // (3)

	post_ack_handle &operator=(const post_ack_handle &) = delete;
	post_ack_handle &operator=(post_ack_handle &&); // (3)

	void ack(); // (4)

	explicit operator bool (); // (5)

	T *operator-> (); // (6)

	T &operator* (); // (6)
};
  1. Swaps two handles.
  2. Constructs an empty handle.
  3. Moves a handle.
  4. Acknowledges the value.
  5. Checks if the handle is valid.
  6. Gets the stored object.

Requirements

T is moveable.

Return values

  1. This function doesn't return any value.
  2. N/A
  3. N/A for constructor, reference to the handle for assignment operator.
  4. This method doesn't return any value.
  5. This method returns true if the handle isn't empty, false otherwise.
  6. These methods return a pointer/reference to the handle.