Asynchronous Programming with Seastar

Nadav Har’El - nyh@ScyllaDB.com

Avi Kivity - avi@ScyllaDB.com

Back to table of contents. Previous: 11. Pipes. Next: 13. Discarding futures, redux.

12 Shutting down a service with a gate

Consider an application which has some long operation slow(), and many such operations may be started at any time. A number of slow() operations may even even be active in parallel. Now, you want to shut down this service, but want to make sure that before that, all outstanding operations are completed. Moreover, you don’t want to allow new slow() operations to start while the shut-down is in progress.

This is the purpose of a seastar::gate. A gate g maintains an internal counter of operations in progress. We call g.enter() when entering an operation (i.e., before running slow()), and call g.leave() when leaving the operation (when a call to slow() completed). The method g.close() closes the gate, which means it forbids any further calls to g.enter() (such attempts will generate an exception); Moreover g.close() returns a future which resolves when all the existing operations have completed. In other words, when g.close() resolves, we know that no more invocations of slow() can be in progress - because the ones that already started have completed, and new ones could not have started.

The construct

seastar::with_gate(g, [] { return slow(); })

can be used as a shortcut to the idiom

g.enter();
slow().finally([&g] { g.leave(); });

Here is a typical example of using a gate:

#include <seastar/core/sleep.hh>
#include <seastar/core/gate.hh>
#include <boost/iterator/counting_iterator.hpp>

seastar::future<> slow(int i) {
    std::cerr << "starting " << i << "\n";
    return seastar::sleep(std::chrono::seconds(10)).then([i] {
        std::cerr << "done " << i << "\n";
    });
}
seastar::future<> f() {
    return seastar::do_with(seastar::gate(), [] (auto& g) {
        return seastar::do_for_each(boost::counting_iterator<int>(1),
                boost::counting_iterator<int>(6),
                [&g] (int i) {
            seastar::with_gate(g, [i] { return slow(i); });
            // wait one second before starting the next iteration
            return seastar::sleep(std::chrono::seconds(1));
        }).then([&g] {
            seastar::sleep(std::chrono::seconds(1)).then([&g] {
                // This will fail, because it will be after the close()
                seastar::with_gate(g, [] { return slow(6); });
            });
            return g.close();
        });
    });
}

In this example, we have a function future<> slow() taking 10 seconds to complete. We run it in a loop 5 times, waiting 1 second between calls, and surround each call with entering and leaving the gate (using with_gate). After the 5th call, while all calls are still ongoing (because each takes 10 seconds to complete), we close the gate and wait for it before exiting the program. We also test that new calls cannot begin after closing the gate, by trying to enter the gate again one second after closing it.

The output of this program looks like this:

starting 1
starting 2
starting 3
starting 4
starting 5
WARNING: exceptional future ignored of type 'seastar::gate_closed_exception': gate closed
done 1
done 2
done 3
done 4
done 5

Here, the invocations of slow() were started at 1 second intervals. After the “starting 5” message, we closed the gate and another attempt to use it resulted in a seastar::gate_closed_exception, which we ignored and hence this message. At this point the application waits for the future returned by g.close(). This will happen once all the slow() invocations have completed: Immediately after printing “done 5”, the test program stops.

As explained so far, a gate can prevent new invocations of an operation, and wait for any in-progress operations to complete. However, these in-progress operations may take a very long time to complete. Often, a long operation would like to know that a shut-down has been requested, so it could stop its work prematurely. An operation can check whether its gate was closed by calling the gate’s check() method: If the gate is already closed, the check() method throws an exception (the same seastar::gate_closed_exception that enter() would throw at that point). The intent is that the exception will cause the operation calling it to stop at this point.

In the previous example code, we had an un-interruptible operation slow() which slept for 10 seconds. Let’s replace it by a loop of 10 one-second sleeps, calling g.check() each second:

seastar::future<> slow(int i, seastar::gate &g) {
    std::cerr << "starting " << i << "\n";
    return seastar::do_for_each(boost::counting_iterator<int>(0),
                                boost::counting_iterator<int>(10),
            [&g] (int) {
        g.check();
        return seastar::sleep(std::chrono::seconds(1));
    }).finally([i] {
        std::cerr << "done " << i << "\n";
    });
}

Now, just one second after gate is closed (after the “starting 5” message is printed), all the slow() operations notice the gate was closed, and stop. As expected, the exception stops the do_for_each() loop, and the finally() continuation is performed so we see the “done” messages for all five operations.

Back to table of contents. Previous: 11. Pipes. Next: 13. Discarding futures, redux.