Asynchronous Programming with Seastar

Nadav Har’El - nyh@ScyllaDB.com

Avi Kivity - avi@ScyllaDB.com

Back to table of contents. Previous: 8. Advanced futures. Next: 10. Semaphores.

9 Fibers

Seastar continuations are normally short, but often chained to one another, so that one continuation does a bit of work and then schedules another continuation for later. Such chains can be long, and often even involve loopings - see the following section, “Loops”. We call such chains “fibers” of execution.

These fibers are not threads - each is just a string of continuations - but they share some common requirements with traditional threads. For example, we want to avoid one fiber getting starved while a second fiber continuously runs its continuations one after another. As another example, fibers may want to communicate - e.g., one fiber produces data that a second fiber consumes, and we wish to ensure that both fibers get a chance to run, and that if one stops prematurely, the other doesn’t hang forever.

TODO: Mention fiber-related sections like loops, semaphores, gates, pipes, etc. # Loops TODO: do_until, repear and friends; parallel_for_each and friends; Use boost::counting_iterator for integers. map_reduce, as a shortcut (?) for parallel_for_each which needs to produce some results (e.g., logical_or of boolean results), so we don’t need to create a lw_shared_ptr explicitly (or do_with).

TODO: See seastar commit “input_stream: Fix possible infinite recursion in consume()” for an example on why recursion is a possible, but bad, replacement for repeat(). See also my comment on https://groups.google.com/d/msg/seastar-dev/CUkLVBwva3Y/3DKGw-9aAQAJ on why Seastar’s iteration primitives should be used over tail call optimization. # when_all: Waiting for multiple futures Above we’ve seen parallel_for_each(), which starts a number of asynchronous operations, and then waits for all to complete. Seastar has another idiom, when_all(), for waiting for several already-existing futures to complete.

The first variant of when_all() is variadic, i.e., the futures are given as separate parameters, the exact number of which is known at compile time. The individual futures may have different types. For example,

#include <seastar/core/sleep.hh>

future<> f() {
    using namespace std::chrono_literals;
    future<int> slow_two = sleep(2s).then([] { return 2; });
    return when_all(sleep(1s), std::move(slow_two), 
                    make_ready_future<double>(3.5)
           ).discard_result();
}

This starts three futures - one which sleeps for one second (and doesn’t return anything), one which sleeps for two seconds and returns the integer 2, and one which returns the double 3.5 immediately - and then waits for them. The when_all() function returns a future which resolves as soon as all three futures resolves, i.e., after two seconds. This future also has a value, which we shall explain below, but in this example, we simply waited for the future to resolve and discarded its value.

Note that when_all() accept only rvalues, which can be temporaries (like the return value of an asynchronous function or make_ready_future) or an std::move()’ed variable holding a future.

The future returned by when_all() resolves to a tuple of futures which are already resolved, and contain the results of the three input futures. Continuing the above example,

future<> f() {
    using namespace std::chrono_literals;
    future<int> slow_two = sleep(2s).then([] { return 2; });
    return when_all(sleep(1s), std::move(slow_two),
                    make_ready_future<double>(3.5)
           ).then([] (auto tup) {
            std::cout << std::get<0>(tup).available() << "\n";
            std::cout << std::get<1>(tup).get0() << "\n";
            std::cout << std::get<2>(tup).get0() << "\n";
    });
}

The output of this program (which comes after two seconds) is 1, 2, 3.5: the first future in the tuple is available (but has no value), the second has the integer value 2, and the third a double value 3.5 - as expected.

One or more of the waited futures might resolve in an exception, but this does not change how when_all() works: It still waits for all the futures to resolve, each with either a value or an exception, and in the returned tuple some of the futures may contain an exception instead of a value. For example,

future<> f() {
    using namespace std::chrono_literals;
    future<> slow_success = sleep(1s);
    future<> slow_exception = sleep(2s).then([] { throw 1; });
    return when_all(std::move(slow_success), std::move(slow_exception)
           ).then([] (auto tup) {
            std::cout << std::get<0>(tup).available() << "\n";
            std::cout << std::get<1>(tup).failed() << "\n";
            std::get<1>(tup).ignore_ready_future();
    });
}

Both futures are available() (resolved), but the second has failed() (resulted in an exception instead of a value). Note how we called ignore_ready_future() on this failed future, because silently ignoring a failed future is considered a bug, and will result in an “Exceptional future ignored” error message. More typically, an application will log the failed future instead of ignoring it.

The above example demonstrate that when_all() is inconvenient and verbose to use properly. The results are wrapped in a tuple, leading to verbose tuple syntax, and uses ready futures which must all be inspected individually for an exception to avoid error messages.

So Seastar also provides an easier to use when_all_succeed() function. This function too returns a future which resolves when all the given futures have resolved. If all of them succeeded, it passes the resulting values to continuation, without wrapping them in futures or a tuple. If, however, one or more of the futures failed, when_all_succeed() resolves to a failed future, containing the exception from one of the failed futures. If more than one of the given future failed, one of those will be passed on (it is unspecified which one is chosen), and the rest will be silently ignored. For example,

using namespace seastar;
future<> f() {
    using namespace std::chrono_literals;
    return when_all_succeed(sleep(1s), make_ready_future<int>(2),
                    make_ready_future<double>(3.5)
            ).then([] (int i, double d) {
        std::cout << i << " " << d << "\n";
    });
}

Note how the integer and double values held by the futures are conveniently passed, individually (without a tuple) to the continuation. Since sleep() does not contain a value, it is waited for, but no third value is passed to the continuation. That also means that if we when_all_succeed() on several future<> (without a value), the result is also a future<>:

using namespace seastar;
future<> f() {
    using namespace std::chrono_literals;
    return when_all_succeed(sleep(1s), sleep(2s), sleep(3s));
}

This example simply waits for 3 seconds (the maximum of 1, 2 and 3 seconds).

An example of when_all_succeed() with an exception:

using namespace seastar;
future<> f() {
    using namespace std::chrono_literals;
    return when_all_succeed(make_ready_future<int>(2),
                    make_exception_future<double>("oops")
            ).then([] (int i, double d) {
        std::cout << i << " " << d << "\n";
    }).handle_exception([] (std::exception_ptr e) {
        std::cout << "exception: " << e << "\n";
    });
}

In this example, one of the futures fails, so the result of when_all_succeed is a failed future, so the normal continuation is not run, and the handle_exception() continuation is done.

TODO: also explain when_all and when_all_succeed for vectors.

Back to table of contents. Previous: 8. Advanced futures. Next: 10. Semaphores.