Range comprehensions with C++ lazy generators

In the previous post we had a look at a recent proposal N4286 to add stackless coroutines to the C++ language and described the implementation prototype that ships with Visual Studio 2015 CTP.

We saw that coroutines can be used to implement lazy sequences. Lazy evaluation is a powerful tool and a pillar of functional programming; it gives the ability to construct potentially infinite data structures, and increases the performance by avoiding needless calculations. Many languages provide constructs to compute a series of elements generating them one at the time, only when they are requested. These are usually named sequences (https://msdn.microsoft.com/en-us/library/dd233209.aspx) in functional languages like Haskell or F#, generators in Python, iterator blocks in C#.

In the N4286 proposal a generator is a function that returns an object of type generator<T> and that contains at least one yield statement. For example, this is a generator for the Fibonacci sequence (the Hello World of generators) written with the notation of the VS2015 CTP:

generator<int> Fibonacci(int max)
{
    int a = 0;
    int b = 1;
    while (a <= max)
    {
        __yield_value a;
        int next = a + b;
        a = b;
        b = next;
    }
}

Query comprehensions and LINQ

Functional languages like Haskell have the concept of list comprehensions, syntactic constructs for creating a list by applying operators to other existing lists. Haskell is perfect to deal with infinite lists, thanks to its laziness, but even non-functional languages like Python and C# provide their version of query comprehensions.

In C#, of course, we have LINQ, whose design was heavily influenced by Haskell (Erik Meijer was one of the authors). The LINQ syntax is designed specifically to make it easy to apply a sequence of operations on the sequence monad IEnumerable<T>, in the form of queries.

It would be nice to have something similar in an eager language like C++. There are very interesting articles on this topic and I’d heartily suggest reading Bartosz Milewski’s “Getting Lazy with C++” and Eric Niebler’s “Range comprehensions”.

Let’s take an example from these two papers: let’s write a small Haskell program that prints the sequence of Pythagorean triples. A Pythagorean triple is a tuple of three positive integers, x, y, and z which satisfy the relation x*x + y*y = z*z.

main = print (take 10 triples)
triples = [(x, y, z) | z <- [1..]
                     , x <- [1..z]
                     , y <- [x..z]
                     , x^2 + y^2 == z^2]

In Haskell this code is very intuitive; we start with (possibly infinite) sequences of integers, and operate on them to generate a sequence of tuples, filtering only the tuples that satisfy the Pythagorean condition. Note that the act of taking only the first elements and the act of printing them are orthogonal to the act of generating the sequence.

The equivalent LINQ code for Pythagorean triples is not very different from the equivalent code in Haskell:

static IEnumerable<Tuple<int,int,int>> PythagoreanTriples(int max)
{
    return (from z in Enumerable.Range(1, max)
            from x in Enumerable.Range(1, z - 1)
            from y in Enumerable.Range(x, z - x)
            where x * x + y * y == z * z
            select new Tuple<int, int, int>(x, y, z));
}

This declarative query syntax is just syntactic sugar for the invocation of standard query operators (https://msdn.microsoft.com/en-us/library/bb397947.aspx) like Where, Select, GroupBy, Range, …, which are defined as extension methods of the Enumerable class.

The same function can be written as a simple iteration block, with a yield statement:

static IEnumerable<Tuple<int, int, int>> PythagoreanTriples(int max)
{
    foreach (var z in Enumerable.Range(1, max))
    {
        foreach (var x in Enumerable.Range(1, z - 1))
        {
            foreach (var y in Enumerable.Range(x, z - x))
            {
                if (x * x + y * y == z * z)
                {
                    yield return new Tuple<int, int, int>(x, y, z);
                }
            }
        }
    }
}

Query comprehensions in C++

Can we have something like LINQ in C++? This has been for some time a little holy grail for me; I tried to implement something with Fibers a long time ago but I didn’t really know what I was doing and the result was very kludgy.

But now the lazy, resumable generators proposed by N4286 seem perfect for this purpose; when coroutines will be fully supported by the language it could be interesting to think of a new library for query comprehensions that work on these generators.

We can use the VS2015 CTP prototype to experiment with this idea. How should a LINQ-like library for C++ look like? How would it work?

Like in LINQ, we want to write a set of operators that transform generator<T> sequences. Let’s start with a first one, a filtering operator Where that takes a generator and a predicate and returns the sequence of elements that satisfy the predicate. This can be done with a simple static function:

template<typename T, typename Pred>
static generator<T> Where(generator<T> gen, Pred pred)
{
    for (auto val : gen) {
        if (pred(val)) {
            __yield_value val;
        }
    }
}

We could very easily write other operators in the same way; for example, the Select operator, analogous to std::transform<> but that work on generators:

template<typename T, typename Fun>
static generator<typename std::result_of<Fun(T)>::type>
Select(generator<T> gen, Fun fun)
{
    for (auto val : gen) {
        __yield_value fun(val);
    }
}

This is also simple to write thanks to std::result_of that deduces the return type of Fun at compile time. Note that we cannot use automatic return type deduction here; at least in the first prototypes of VS2015 CTP we must explicitly return type generator<T>. This would not compile:

static auto Select(generator<T> gen, Fun fun) {}

Composability

Writing query operators is not very difficult. The problem is: how do we compose these operators? As an example, let’s say we want to take the sequence of all integers, filter the ones that are even and then transform them into a sequence of their string representation, like in figure:

Operators pipeline

We could do this in stages, starting with a generator that yields the sequence of integers:

generator<int> Integers()
{
    for (int i = 0; i <= INT_MAX; i++) {
        __yield_value i;
    }
}

We could either (1) apply the Where and Select operators one after the other:

    auto evenInts = Where(Integers(), [](int n) { return (n % 2) == 0; }); 
    auto evenIntStrs = Select(evenInts [](int n){ return std::to_string(n); })

or (2) we could try to put them together in a single expression:

    auto result = Select(
        Where(Integers(), [](int n) { return (n % 2) == 0; }),
        [](int n){ return std::to_string(n); });

But both choices are not great; the second one in particular quickly makes the code difficult to read and to maintain. Ideally, we would like these operators to be composable into a pipeline, as they are in LINQ:

var result = Range(1, Max)
    .Where(n => (n % 2) == 0)
    .Select(n => n.ToString());

If we were able to modify the code for generator<T>, we could do this by adding a predetermined set of operators directly in that class, in the standard library:

template <typename T>
class generator
{
    ...
    generator<T> Where(Pred pred)
    {
        for (auto val : *this) {
            if (pred(val)) {
                __yield_value val;
            }
        }
    }
    ...
}

But even so, the set of operators would not be easy to extend. What we really want is to have the ability to define and add new operators in our code, and have them compose seamlessly.

In C# the query operators are implemented as extension methods of the Enumerable class, which makes very easy to “plug” new operators simply by adding new extension methods. In C++ we don’t have extension methods. As alternative, we can follow the lead of Boost Range library that uses the pipe syntax (bitwise OR operator |) to compose the operations.

Therefore our goal will be being able to write query comprehensions as chains of pipe operators, in this form:

auto evenNumStrs = Range(1, Max)
    | Where([](int n) { return n % 2 == 0; })
    | Select([](int n) { return std::to_string(n); });

And now things get interesting: how to implement this? 🙂

A possible implementation

To be honest, at first I had really no idea. But, as Picasso once said, “good artists copy, great artists steal”: so I stole from a real artist. More precisely, I looked at the code of Eric Niebler’s range library, and it was a very interesting read. Here I am reusing some of his ideas, especially to implement a pipeline of operators. The resulting code is much simpler in my case, of course, because I only have to deal with the narrow scope of resumable generators.

So, let’s start from the beginning: as we said our goal is to be able to create pipelines of operations with the pipe operator |:

generator<int> evenNumbers = Integers() |
                             Where([](int n) { return n%2 == 0; });

We clearly need to overload the binary-OR operator | (). When its left argument is a generator, the second argument must be something of “pipeable” to a generator. Therefore, a call to a function like Where() must return “something pipeable”, whatever that means.

We can say that a type is pipeable if it implements a static method pipe(), with two arguments: a generator<T> and an instance of the pipeable type itself:

template<typename T, typename Pipeable>
auto operator|(generator<T>&& src, Pipeable&& pipe)
{
    return Pipeable::pipe(std::forward<generator<T>>(src),
                          std::forward<Pipeable>(pipe));
}

The next step could be to implement Where as a pipeable type. Rather than just a function, Where now becomes an invokable type, a class with a call operator that implements the logic to filter a sequence according to a predicate. And class Where should also have a static method pipe() that evaluates a pipe by invoking the call operator:

template<typename T>
struct Where
{
    template<typename Pred>
    generator<T> operator()(generator<T> gen, Pred pred)
    {
        for (auto val : gen) {
            if (pred(val)) {
                __yield_value val;
            }
        }
    }

    template<typename Pipe>
    static auto pipe(generator<T>&& src, Pipe&& pipe)
    {
        return operator()(src, pred); // pred ???
    }
};

But now the problem is: how do we pass the predicate to the pipe? Or, generalizing the question, since operators can have any number of arguments: how do we pass to the pipe all the arguments, if we have an expressions like this?

auto outputGen = inputGen | Op(arg1, arg2, ...);

Pipeable types

We need to complicate the design a little, leveraging the magic of variadic templates and std::bind. We introduce a type for pipeable objects (objects that can be at the right side of a pipe operator |), implemented as follows:

template<typename Binder>
struct pipeable
{
    // A forwarding call wrapper generated by std::bind.
    Binder _binder;

    pipeable(Binder pipeableBinder)
        : _binder(std::move(pipeableBinder))
    {
    }

    // Executes the call operator of a bound method, passing a generator
    // as placeholder argument.
    template<typename T, typename Pipe>
    static auto pipe(generator<T>&& gen, Pipe&& pipe)
    {
        return pipe._binder(std::forward<generator<T>>(gen));
    } 
};

Let’s look at this in more detail. Type pipeable<> has a template parameter Binder, which is an invokable type, the type of the objects returned when we call std::bind() to bind the call operator (of a type like Where) to a set of arguments.

Now, having an object op of class Where, a generator object and a predicate, we can call std::bind(op, sourceGenerator, predicate) and use the result to construct an instance of a pipeable object, so that pipeable::pipe() will call a method already bound to its arguments.

Class Where can now be simplified to have just a call operator:

template<typename T>
struct Where
{
    template<typename Pred>
    generator<T> operator()(generator<T> gen, Pred pred)
    {
        [...]
    }
};

Not all the arguments are early-bound though. When we construct the pipeable object, while the operator and the predicate are given, we don’t have a sourceGenerator to bind yet. This is the generator to which the operator is applied, i.e. the object that is at left side of the pipe and that is passed as argument to pipeable<T>::pipe(). So we need to do a little currying and use a std::bind placeholder in its stead:

std::bind(op, std::placeholders::_1, predicate);

The resulting workflow is the following:

// Given an input sequence,
auto gen = Integers();

// a predicate,
auto pred = [](int n){ return (n % 2) == 0; }

// and a query operator:
Where<int> opWhere;

// Binds the predicate to the operator
auto opWhereBoundToPred = std::bind(opWhere, std::placeholders::_1,
    std::move(pred));

// Creates a pipeable object
pipeable<decltype(binder)> pipeableWhereBoundToPred(opWhereBoundToPred);

// Now a pipe expression executes the operator replacing the placeholder with
// the input sequence:
gen | Where(pred) =>
gen | pipeableWhereBoundToPred =>
Pipeable::pipe(gen, pipeableWhereBoundToPred) =>
opWhereBoundToPred(gen)

Pipeable factories

The next step is to encapsulate all this logic into a class that acts as a factory for our pipeable types.

template<typename Op>
struct pipeable_factory
{
    // an invokable object of an operator class.
    Op _op;

    // Binds _op to its arguments, using a placeholder for the first argument.
    template<typename...Args>
    auto operator()(Args&&... args) const
    {
        return make_pipeable(std::bind(_op, std::placeholders::_1,
               std::move(args)...));
    }

private:
    template<typename Binder>
    static pipeable<Binder> make_pipeable(Binder pipeableBinder)
    {
        return { std::move(pipeableBinder) };
    }
};

Class pipeable_factory is templated on a query operator type (a type like class Where above) and it is also an invokable type. Its call operator uses the magic of variadic templates to accept a variable number of arguments and forward them to the std::bind function described before, then uses the result to instantiate and return a pipeable object.

So now in order to be able to execute an operator in a pipe expression like:

auto evenIntegers = Integers() | Where( [](int n) { return (n%2)==0; });

we just need to transform somehow Where(pred) into a pipeable_factory of the Where operator. We can do this very simply, by renaming the operator and constructing a single, constant instance of the pipeable_factory for Where:

struct where_op
{
    template<typename T, typename Pred>
    generator<T> operator()(generator<T> gen, Pred pred)
    {
        for (auto n : gen) {
	     if (pred(n)) {
                __yield_value n;
            }
        }
    }
};
constexpr pipeable_factory<where_op> Where{};

Now finally “gen | Where(pred)” works as expected: at the right side there is a factory which binds the predicate to where_op::operator() and constructs a pipeable object. Evaluating the pipe means invoking pipeable<>::pipe(), which calls the bound method replacing a placeholder with the generator object that is at the left side of the pipe. The result is that the pipe expression is evaluated as a call to where_op::operator(gen, pipe).

And since the result of this pipe expression can be itself a generator<>, and the bitwise-OR operator | is left-to-right associative, it we have also composability.

In other words, it is possible to chain a sequence of operators, exactly like LINQ does:

auto result = Integers() | Where(isEven) | Select(toString);

and we can iterate over the result with a simple range loop:

for (auto s in result) {
    cout << s << endl;
}

Here we have introduced a second operator, Select, which projects each element of a sequence into a new form by applying a transform function:

struct select_op
{
    template<typename T, typename Fun>
    generator<typename std::result_of<Fun(T)>::type>
    operator()(generator<T> gen, Fun fun)
    {
        for (auto n : gen) {
            __yield_value fun(n);
        }
    }
};
constexpr pipeable_factory<select_op> Select{};

More operators

Operators like Select and Where only take two arguments, a generator and a callable object. But we can have cases a little more challenging. For example the operator Zip, which “joins” two sequences together by applying a specified function to the corresponding i-th element of each sequence and producing a sequence of the results.

We could think of writing Zip in the same way, simply adding an additional argument for the second sequence:

class zip_op
{
public:
    template<typename TFirst, typename TSecond, typename Fun>
    generator<typename std::result_of<Fun(TFirst, TSecond)>::type>
      operator()(generator<TFirst> first,
                 generator<TSecond> second,
                 Fun resultSelector) const
    {
        auto it1 = first.begin();
        auto it2 = second.begin();
        while (it1 != first.end() && it2 != second.end()) {
            __yield_value resultSelector(*it1, *it2);
            ++it1;
            ++it2;
        }
    }
};
constexpr pipeable_factory<zip_op> Zip{};

But this would not compile (with VS2015 CTP) and I am not totally sure if it should. It fails to bind all the arguments to the variadic pipeable_factory::operator() seen before. I suspect that this could work if I could use return type deduction in zip_op::operator() but unfortunately this is not an option, since we need to explicitly return a generator<T> type.

What we can do instead is to write operator() as a variadic template function, which can take any number of arguments in its parameter pack. This function can use return type deduction as long as it does not really *implement* a resumable function but only *calls* a resumable function. To generalize the solution, we can put the variadic call operator in a base class generator_operator from which all our query operator should inherit. And each operator class will now lazily do its work in a resumable function exec() that returns a generator<T>.

Putting this idea into code, we have now:

// A generator operator is an invokable object templatized on
// an operator class. The template type Op represents a LINQ-like
// operator that works on lazy generators.
template <typename Op>
struct generator_operator
{
    template<typename Gen, typename... Rest>
    auto operator()(Gen&& gen, Rest&&... rest) const
    {
        return Op::exec(std::move(gen), std::move(rest)...);
    }
};

class select_op : public generator_operator<select_op>
{
    friend struct generator_operator<select_op>;

    template<typename T, typename Fun>
    static generator<typename std::result_of<Fun(T)>::type>
    exec(generator<T> gen, Fun fun)
    {
        for (auto n : gen) {
            __yield_value fun(n);
        }
    }
};
constexpr pipeable_factory<select_op> Select{};

class where_op : public generator_operator<where_op>
{
    friend struct generator_operator<where_op>;

    template<typename T, typename Fun>
    static generator<T> exec(generator<T> gen, Fun fun)
    {
        for (auto n : gen) {
            if (fun(n)) {
                __yield_value n;
            }
        }
    }
};
constexpr pipeable_factory<where_op> Where{};

Note that the operator classes are stateless, the exec function can be static and all the state is captured in the closures. The function exec can even be private, since it’s always called through the base generator_operator class, which is a friend of the operator class.

The result is quite clean, in my opinion: if we now want to add a new resumable operator all we have to do is to:

  1. write a new generator_operator-derived class with a method exec that returns a generator<T>, and
  2. declare a pipeable_factory<> for this new type.

More about generator<T>

So far nothing seems to prevent us from making these lazy operators work with any range type, even eager ranges, or with any type that defines begin() and end() methods that return input iterators.

However, we are using resumable functions to implement operators for lazy algorithms and we have seen that a resumable generator function can return only a type generator<T> and not auto. Class generator<T> has some other peculiarities that must be taken into account.

If we look at its code (in the header file C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\include\experimental\generator in the VS CTP) we see that class generator is moveable but not copyable.

template <typename _Ty, typename _Alloc = allocator<char> >
struct generator
{
    struct promise_type { ... };

    struct iterator { ... };

    iterator begin() {
        ...
        return {_Coro};
    }
    iterator end() {
        return {nullptr};
    }

    generator() = default;
    ~generator() { ... }

    generator(generator const&) = delete;
    generator& operator = (generator const&) = delete;

    generator(generator && _Right) : _Coro(_Right._Coro) {
        _Right._Coro = nullptr;
    }
    generator& operator = (generator && _Right) {
        if (&_Right != this) {
            _Coro = _Right._Coro;
            _Right._Coro = nullptr;
        }
    }

private:
    resumable_handle<promise_type> _Coro;
};

The copy constructor and copy assignment operators are deleted. To understand why, it is useful to look at a class diagram for the generator’s code:

generators

Generators are based on stackless coroutines, which work (as described here) by allocating an activation frame, encapsulated by the class resumable_handler. In this design a resumable_handler provides the functionality to suspend and resume the execution of a function. Every instance of a generator<T> creates and owns a resumable_handler object, and shares a reference to this object with its iterators. This is why the generator<T> type cannot be copyable: we should never have two instances of a generator that refer to the same coroutine.

But the type can be moveable, so the move constructor and assignment operator are declared and work by transferring the ownership of the resumable_handler from one object to another.

The non-copyability must be taken into account in our implementation. Every time we pass a generator<T> in a function call, we need to pass an rvalue reference (generator<T>&&). The only, very important exception is in the generator_operator<> call operator. As we have seen, it is written like this:

template <typename Op>
struct generator_operator
{
    template<typename Gen, typename... Rest>
    auto operator()(Gen&& gen, Rest&&... rest) const
    {
        return Op::exec(std::move(gen), std::move(rest)...);
    }
};

We pass explicitly a rvalue reference to the call operator, but we must make sure that in the call to exec() the move constructor is called to pass the ownership of the resumable handler to a new instance of generator<T> allocated in the scope of the exec() resumable function:

struct where_op
{
    template<typename T, typename Pred>
    static generator<T> exec(generator<T> gen, Pred pred) {...}
};

Copying the rvalue reference would not work:

struct where_op
{
    template<typename T, typename Pred>
    static generator<T> exec(generator<T>&& gen, Pred pred) { ... }
};

This error is quite tricky and insidious to debug. The problem is that with resumable functions one must always make sure that the lifespan of the generator object surpasses the lifespan of the resumable function that uses that generator. Note that when we evaluate a pipe, like:

auto evenNumbers = Integers() | Where([](int n) { return n % 2 == 0; });

the generator’s destructor is called after the expression is evaluated, well before we can start iterating over the result. So, in the where_op::exec() coroutine we would have a reference to a deleted object. But if we use a move constructor instead, then there is a new generator<T> object that lives in the scope of the exec() function and that takes ownership of the resumable_handle. And since exec() is actually a resumable function, this new generator<T> object is allocated in the coroutine allocation frame and not on the stack. So, this generator<T> object lifespan coincides with the lifespan of the resumable function and everything works.

For all these reasons I designed the lazy operators to work only on generator<T>s, not on any kind of ranges (like STL containers). But it is very easy to write an adapter function that converts an eager range into a lazy generator:

template<typename Src, typename T = Src::value_type>
static generator<T> make_generator(const Src& src)
{
    for (const T& t : src) {
        __yield_value t;
    }
}

Examples

Do you still remember the problem from which we started? We can now finally write our own solution to the problem of generating the sequence of Pythagorean triples. What we miss is just an implementation of Range, to generate a sequence of integers:

generator<int> Range(int from, int count)
{
    for (int i = 0; i < count; i++)
    {
        __yield_value (from + i);
    }
}

And finally:

generator<tuple<int, int, int>> Triples(int max)
{
    for (auto z : Range(1, max - 1)) {
        for (auto x : Range(1, z - 1)) {
            for (auto y :
                Range(x, z - x) |
                Where([x, z](int y) { return x*x + y*y == z*z; }))
            {
                __yield_value tuple<int, int, int>(x, y, z);
            }
        }
    }
}

As another, and final, example we can write a (slow) prime numbers generator. We need an additional operator for this, the operator All that returns true if all the elements in a sequence satisfy a given predicate:

class all_op : public generator_operator<all_op>
{
    friend struct generator_operator<all_op>;

    template<typename T, typename Fun>
    static bool exec(generator<T> gen, Fun fun)
    {
        for (auto n : gen) {
            if (!fun(n)) {
                return false;
            }
        }
        return true;
    }
};
constexpr pipeable_factory<all_op> All{};

All shows that a lazy operator does not necessarily need to return a generator, it can return any type. With All we can finally write:

generator<int> Primes(int max)
{
    return Range(2, max - 1) | 
	    Where([](int i) {
               return Range(2, (int)sqrt(i)) |
                      All([i](int j) { return (i % j) != 0; });
	    });
}

But these are just few basic examples. If you are interested, you can find here:

the sources of this small library, with the implementation of a number of LINQ-like operators, like Aggregate, All, Any, Concat, Contains, Count, ElementAt, Empty, Except, First, Last, Max, Min, Range, Repeat, Reverse, Select, Skip, SkipWhile, Sum, Take, TakeWhile, Where, Zip.
The code is definitely not production-quality: error and exception handling are mostly absent, the unit tests are minimal. It is just an example of what we will be able to do with lazy generators, when they will become part of C++.

Stackless coroutines with Visual Studio 2015

Let’s talk about… coroutines! Once again, for a change! 🙂

In my last posts I talked of a Microsoft proposal to add support for resumable functions and the await keyword to the language and described how this feature had been implemented in one of first previews of VS 2015. Almost a year has passed since then, and there are a few noteworthy updates, so it is time for a new post. In fact, there is a new proposal for resumable functions (N4286) that addresses some of the limitations of the past proposal, and that has been prototyped in the latest CTP of Visual Studio 2015. But let’s start from the beginning.

C++17 Concurrency TS

I had been looking for some time now at the problem of implementing coroutines/resumable functions in order to have even in C++ something similar to what is provided by C# await and yield statements.

It turns out that – unbeknown to me 🙂 – this is quite a hot topic in the C++ community. The next major version of the standard is expected for 2017 and, in preparation for this, the activity of the standard committee has been organized in 8 groups working on 8 technical specifications, for things like a File System API, a Networking API, Concepts-Lite, Transactional Memory, Parallelism and Concurrency. (See http://isocpp.org/std/status for more details).

In particular in the Concurrency TS people are discussing how to improve std::futures to make them more composable, and how to introduce executors (task schedulers) and resumable functions. And while the work on extended futures and executors should only involve updates to the standard library, resumable functions could require changes to the language itself. To be honest, it seems unlikely that these changes will be approved in time for 2017, because there is still no agreement in the committee, but even so, this seems a very interesting topic (well, it is for me, at least…).

Stackful?

In the old proposal (N3858) Gustafsson presented the advantages of having language support for resumable functions, proposed the introduction of the await and yield keywords and described two possible implementation, stackful and stackless. The first, based on resumable side stacks (fibers), worked giving to each coroutine its own stack, which was associated to a “real” OS-thread only when the coroutine was actually running. The second, based on heap-allocated activation frames, requires the compiler to do significantly more work, in order to allocate in the heap a data structure to contain arguments and local variables for the coroutine, and to generate the code for a state machine to manage the suspension and resumption points.

The Visual Studio 2013 CTP came with a stackful, fiber-based implementation of the await feature, which I described in a previous post. But stackful coroutines have a big disadvantage in the fact that fibers are VERY expensive. By default, a fiber is allocated with the default stack size of a thread, which is 1MB in Windows. Clearly this makes it impossible to have too many coroutines running concurrently; just a few thousand coroutines would eat completely the virtual memory of a 32-bit process.

Stackless!

The new proposal (N4286) instead focuses mostly on a stackless implementation, and promises to be scalable to billions of concurrent coroutines. The goal, again, is to be able to have functions that can be suspended, waiting for some asynchronous operation to complete, and then automatically resumed after the operation has completed. For the users, this means being able to explicitly await for an asynchronous completion with await statements, like it’s done in C#.

As an example of an asynchronous operation let’s imagine that we have a function that sleeps asynchronously for a given time interval:

std::future<void> delay(int ms)
{
    return std::async([ms]() {
        this_thread::sleep_for(std::chrono::milliseconds(ms));
    });
}

The proposal would make it possible to wait on the asynchronous operation using await:

std::future<void> testAwait()
{
	await delay(2000);
}

Also, resumable function make it possible to have lazy generators (similar to C# iterator blocks) with the yield statement:

std::generator<int> fibonacci(int max)
{
    int a = 0;
    int b = 1;
    while (a <= max) {
        yield a;
        int next = a + b; 
        a = b; 
        b = next;
    }
}

The syntax and the semantic of coroutines remains more or less the same as it was in the old proposal version; the changes are mostly in the way these coroutines should be implemented using activation frames. And since I think that I have already talked more than enough of resumable functions and generators in past, in this post I’d like to focus the attention on the details of this new implementation (which are much better explained in the standard document, anyway).

In fact, making a resumable function using a fiber, while expensive, it is not very complicated: after all a fiber is already a system-level implementation of a coroutine. But making resumable functions with just an activation frame requires much more work for the compiler, which needs to alter the structure of the code, turning it into a sequence of chained continuations.

Visual Studio 2015 preview

To have a look at how stackless coroutines of N4286 are supposed to work we can play with a first prototype which is already available in the CTP version of Visual Studio 2015, freely available for download here.

The only meaningful difference between this prototype and the standard proposal is that in the prototype we’ll have to deal with PPL tasks rather than std::futures. This is because N4286 is based on an improvement of standard std::futures, extended to support the chaining of continuations with std::future::then() like it’s done in boost::future. But, as we know, schedulers and continuation chaining are not yet part of the standard; therefore the VS prototype replaces futures with PPL tasks and utilizes the Concurrency runtime for their scheduling. But this is just an implementation detail: the term “task” in what follows should be read just as an alias for the “std::future” of the future.

Let’s start with await. We can rewrite with the PPL the method delay defined above:

Concurrency::task<void> delay(int ms)
{
    return Concurrency::create_task([ms]() {
        this_thread::sleep_for(std::chrono::milliseconds(ms));
    });
}

At this point we can use the new keyword __await (with a double underscore, because not yet standard):

Concurrency::task<void> testAwait()
{
    __await delay(2000); // suspension point (suspends testAwait, resumes delay).
}

In order to compile this code with the VS2015 CTP we need first to enable a compiler extension by passing the /await flag to the compiler options for our project. To use this new feature we also need to include the header file <pplawait.h>.

If we run this with the debugger we can see that the execution really suspends at the __await point and the control goes back to the caller of our testAwait function, which can then go on doing other stuff. After two seconds the delay task completes and the execution of testAwait resumes from where it had suspended. Normally it will resume on a different thread, one of the worker threads used by the PPL scheduler.

There are other limitations in the current CTP implementation: only the x64 platform is supported, not Win32, and stack-frame run time checks (/RTC) need to be disabled because not yet compatible with /await. But these are just minor issues; after all this is just a prototype and not supposed to be used for production code.

Coroutine handles

But how does all this work? In a previous post. I investigated some of the details of the stackful prototype who came last year with VS2013 CTP. Let’s do the same with this new implementation in the latest CTP.

Most of the library logic for this feature ships in two header files (resumable and generator) located in the \Microsoft Visual Studio 14.0\VC\include\experimental\ folder.

As we said, in a stackless implementation a coroutine is associated with an activation frame that is allocated somewhere in the heap and that contains the status of the coroutine together with all the data required to manage its suspension and resumption. In N4286 the frame is an instance of the compiler-generated class coroutine_handle.

In the VS2015 CTP an activation frame is encapsulated by an instance of the resumable_handle type. (This was the old name of a coroutine_handles in a previous version of the standard document).
A resumable handle owns the block of heap memory that represents the activation frame for a function. In general, data in a frame is organized like in figure: there is first the space for a special coroutine promise type, followed by the space for the “suspend context” and the local state (parameters and local variables).

resumable_handle

Coroutine promises

The promise class is a library class that implements the semantics of a particular type of coroutine (like await or generators). It must implement all the logic to pass return values and exceptions back from asynchronous computations, with methods like:

void set_result(T val) 
T get_return_object(resumable_handle rh)
void set_exception(E e)
AwaitType yield_value(T val)
bool cancellation_requested()
...

We’ll see later how the C++ compiler makes use of these functions in the code that it generates for a suspension/resumption point.
In the VS2015 CTP, the logic of a promise for await statements is in class resumable_traits<RetType, …Args>::promise_type. This traits class (renamed as coroutine_traits<> in the latest version of the proposal) is templated on the return and argument types of the resumable function and aggregates all the type-dependent types and methods, like the activation frame allocator.

Awaitable types

Together with resumable handles and promises, the final piece in the picture are awaitable types. An awaitable type is a type for which a library provides the support for await statements, by implementing the following three functions:

// Returns true is the results of the asynchronous operation is already available.
bool await_ready(T& t) const

// Suspends the execution until the asynchronous operation completes.
void await_suspend(T& t, resumable_handle<> rh)

// Resumes the execution from where it had suspended.
void await_resume(T& t)

For example, in the sample code above, when we call:

    __await delay(2000);

the type on which we want to await is a PPL task. So, with our await statement we are asking the compiler to generate code that suspends on a task, and resumes after the task is completed. The await compiles because the new library provides these three functions for PPL tasks:

bool await_ready(task& t) const {
    return t.is_done();
}
void await_suspend(task& t, resumable_handle rh) {
    t.then([rh](task&){ rh(); };
} 
void await_resume(task& t) {
    t.get();
}

From this, it should be clear that the mechanism is easily extensible to new types. We can await on any type for which we can provide the three await functions.

Behind the scenes…

But what happens exactly when we compile a resumable function like this, with one or more await statements? Let’s take again this function as example:

task<R> testAwait(T1 param1, T2 param2, …)
{
    // *** code before the __await statement

    __await delay(2000); // suspension point (suspends testAwait, resumes delay).

    // *** code after the __await statement
}

Given the presence of suspension points, the compiler transforms the code quite radically and leverage the library code for coroutine handles, promises and awaitable types described before. In practice, the function above is transformed into this (simplified) pseudo-code:

task<R> testAwait(T1 param1, T2 param2, …)
{
    // Allocates coroutine activation frame.
    auto resHandle = _Resumable_helper_traits::_Alloc(frameSize, resumeAddress);
    
    // Creates a task to be returned to the caller.
    // This task is associated to the task completion handle of the promise.
    task<R> taskToReturn = resHandle.promise().get_return_object();

    // *** Here goes all the code before the __await statement

    // Calls the awaitable function and receives the task on which we want
    // to asynchronously block.
    auto taskToAwait = delay(2000);
  
    // If the task has already completed, we have nothing else to do 
    if (! Concurrency::await_ready(taskToAwait))
    {
        // Suspends the coroutine and adds ‘resHandle’ as continuation to be called
        // when the task ‘taskToAwait’ completes.
        Concurrency::await_suspend(taskToAwait, resHandle);
        return taskToReturn;
    }

Resume:
    // we get here when the await has completed
    auto result = Concurrency::await_resume(taskToAwait);

    // *** code after the __await statement

    _Resumable_helper_traits::_Free(resHandle);
    return taskToReturn;
}

Let’s look at this in detail. First, a single resumable handler for the whole function is allocated. This is an instance of a template class, specialized on the types of all the function parameters and on the return type. The frame size is calculated at compile time to also hold the local variables declared in the function. One of the arguments to _Alloc() is resumeAddress, a pointer to the callback function that will be called when the asynchronous operation completes.

Any resumable function must return a task, so the next step is to prepare the promise and the task that will be returned to the caller.

After this preamble, the code of the function executes normally, in a synchronous way, until it gets to an await (or yield) statement. In our case we want to await on the task returned by a call to delay(timeInMs) and, as we saw, there is library support for awaiting on tasks. The compiler adds a call to await_ready to check if the task has already completed. If this is the case, we can skip the asynchronous part and just get the task result. But if the task has not yet completed we need to suspend the execution. This is done by returning our return-value task to the caller, so that the caller is able to carry on doing other work or maybe also awaiting asynchronously on our task to complete.

Suspending a function is relatively simple, but how will the function resume its execution? Before returning the result-task to the caller, a call to await_suspend is made to chain a continuation on the task on which we are awaiting. One of the parameter here is the resumable handle itself, which represents the coroutine-state for this resumable function:

template <class _Ty, typename _Handle>
void await_suspend(task<_Ty>& _Task, _Handle _ResumeCb)
{
    _Task.then([_ResumeCb](task<_Ty>&) {
        _ResumeCb();
    });
}

So, now the method testAwait has suspended and returned a task to the caller. The caller is free to do other things or it can also suspended waiting for the task we returned to complete. Meanwhile, life goes on, the scheduler keeps scheduling, time goes by and at some point also the two seconds of the delay task elapse and a call to our continuation can be scheduled to run.

The class resumable_handle implements the call operator (), which is indeed the callback that will be called after the awaited task completes. This is where task continuations meet coroutines: the call operator simply forwards the call to the resumeAddress function that the compiler passed as argument in the allocation of the resumable frame. In this way, the execution can resume from where it had left. The details are a little more complicated: since a function can have more suspension/resumption points, the VS compiler seems to implement a small state machine to restart the execution at the correct location. But, simplifying, for the pseudocode above it is like if there was a

goto Resume;

in order to jump to the Resume: label and complete the await operation, retrieving the result of the completed task and returning it to the caller as a result of the returned task.

Generators

The other asynchronous constructs made possible by coroutines are lazy generators: functions that produce a (possibly infinite) sequence of values that can be pulled one at the time.

These are the equivalent of C# iterator blocks or Python generators; the function suspends after having “yielded” each value, and resumes execution when the next value is requested, until the sequence completes.

Going back to the usual example, this is a generator for the Fibonacci sequence, written with the syntax of VS2015 (where the keyword __yield_value is used in placed of yield).

    generator<int> fib(int max) {			
    int a = 0;					
    int b = 1;					
    while (a <= max) {
        __yield_value a;
        int next = a + b; 
        a = b;
        b = next;
    }
}

The function returns an object of type generator which behaves like any kind of STL sequence whose range is defined by a begin() and end() methods. Therefore the client code to enumerate the values in the sequence can be as simple as a range for:

for (auto n in fib(1000)) {
    std::cout << n << std::endl;
}

In the VS2015, the code for generators can be found in the file \Microsoft Visual Studio 14.0\VC\include\experimental\generator. I could dig into this code and try to describe all the details, but I would risk to become repetitive and sound like a broken record. In fact, the implementation of the prototype for generators turns out to be very similar to the idea I sketched in one of my last posts. (This, of course, not because my idea was particularly brilliant but only because there are not many other ways to write generators using coroutines :-)).

Just to have a quick overview, this is a very simplified version of the implementation:

template<typename T>
class generator
{ 
    class iterator
    {
        resumable_handle _coro;
  
        iterator(resumable_handle rh) : _coro(rh) {}
        iterator(nullptr_t) {} // represents the end of the sequence
        iterator operator ++ () {
            _coro(); // resumes execution
            return *this;
        }
        bool operator == (const iterator& rhs) {
            return _coro == rhs._coro;
        }
        const T& operator * () const {
            return _coro.promise()._CurrentValue;
        }
    };

public:
    generator(resumable_handle rh) : _coro(rh) {}

    iterator begin() {
        _coro(); // starts the coroutine and executes it until it terminates or yields.
        return iterator(_coro);
    }
    iterator end() { return iterator(nullptr); }

private:
    resumable_handle _coro;
};

We see that in order to compile a function like Fibonacci above, the compiler must allocate a resumable_handle and use it to construct an object of type generator<T>.

The role of the generator class is just to provide the semantics of range, with input iterators returned by begin() and end(); all the logic for the lazy generation is in the iterator class.
An iterator owns the resumable_handle which was passed to the generator, and uses it to resume and suspend its execution. Even here, a resumable handle works as a wrapper for the status of a coroutine. There are just small differences respect to await: here a different type of promise it’s used (of class generator::promise_type), designed to support yield_value and to store the last value yielded so that it can later be pulled by dereferencing the iterator.

The result is quite neat, in my opinion: C#-like generators will be very useful, especially if they could be composable with LINQ-like operators.

But again, for more details, I’d refer to this post, or (even better) to the standard document.

Conclusion

To summarize this brief excursus into the new proposal for resumable functions, there are a few points worth noting:

  1. Resumable functions can be used to add support for await and yield in C++.
  2. They can be implemented with stackless coroutines in order to be more scalable (at the price of greater complexity in the compiler).
  3. Stackless coroutines work allocating a single activation frame for each invocation of a resumable function.
  4. The implementation requires the support of improved std::future (composable with then()), which are also being standardized.

Hopefully, something like that will be part of the standard soon.

(Note: slightly edited on 2015/3/8 to clarify a few points).

Generator functions in C++

In the previous post we had a look at the proposal of introducing resumable functions into the C++ standard to support writing asynchronous code modeled on the C# async/await pattern.

We saw that it is already possible to experiment with the future resumable and await keywords in Visual Studio, by installing the latest November 2013 CTP. But the concept of resumable functions is not limited to asynchrony; in this post we’ll see how it can be expanded to support generator functions.

Generator functions and lazy evaluation

In several languages, like C# and Python, generator functions provide the ability of lazily producing the values in a sequence only when they are needed. In C# a generator (or iterator) is a method that contains at least one yield statement and that returns an IEnumerable<T>.

For example, the following C# code produces the sequence of Fibonacci numbers (1, 1, 2, 3, 5, 8, 13, 21, …):

IEnumerable<T> Fibonacci()
{
    int a = 0;
    int b = 1;
    while (true) {
        yield return b;
        int tmp = a + b;
        a = b;
        b = tmp;
    }
}

A generator acts in two phases. When it is called, it just sets up a resumable function, preparing for its execution, and returns some enumerator (in the case of C#, an IEnumerable<T>). But the actual execution is deferred to the moment when the values are actually enumerated and pulled from the sequence, for example with a foreach statement:

foreach (var num in Fibonacci())
{
    Console.WriteLine("{0}", num);
}

Note that the returned sequence is potentially infinite; its enumeration could go on indefinitely (if we ignore the integer overflows).

Of course there is nothing particularly special about doing the same thing in C++. While STL collections are usually eagerly evaluated (all their values are produced upfront) it is not difficult to write a collection that provides iterators that calculate their current value on the spot, on the base of some state or heuristic.

What gives a particular expressive power to generators is the ability to pause the execution each time a new value is generated, yielding control back to the caller, and then to resume the execution exactly from the point where it had suspended. A generator is therefore a special form of coroutine, limited in the sense that it may only yield back to its caller.

The yield statement hides all the complexity inherent in the suspension and resumption of the function; the developer can express the logic of the sequence plainly, without having to setup callbacks or continuations.

From resumable functions to generators (and beyond)

It would be nice to bring the expressive power of generators to our good old C++, and naturally there is already some work going on for this. In this proposal Gustaffson et al. explain how generator functions could be supported by the language as an extension of resumable functions, making it possible to write code like:

sequence<int> fibonacci() resumable
{
    int a = 0;
    int b = 1;
    while (true)
    {
        yield b;
        int tmp = a + b;
        a = b;
        b = tmp;
    }
}

Here, the proposal introduces two new concepts, the type sequence<T> and the yield keyword.

–        sequence<T> is a (STL-like) collection that only supports iteration and only provides an input iterator.

–        The yield statement suspends the execution of the function and returns one item from the sequence to the caller.

In C# terms, sequence<T> and its iterator are respectively the equivalent of an IEnumerable<T> and IEnumerator<T>. But while the C# generators are implemented with a state machine, in C++ the suspension and resumption would be implemented, as we’ll see, with stackful coroutines.

Once we had a lazy-evaluated sequence<T> we could write client code to pull a sequence of values, which would be generated one at the time, and only when requested:

sequence<int> fibs = fibonacci();
for (auto it = fibs.begin(); it != fibs.end(); ++it)
{
    std::cout << *it << std::endl;
}

In C++11 we could also simplify the iteration with a range-based for loop:

sequence<int> fibs = fibonacci();
for (auto it : fibs)
{
    std::cout << *it << std::endl;
}

More interestingly, we could define other resumable functions that manipulate the elements of a sequence, lazily producing another sequence. This example, taken from Gustaffson’s proposal, shows a lazy version of std::transform():

template<typename Iter>
sequence<int> lazy_tform(Iter beg, Iter end, std::function<int(int)> func) resumable
{
    for (auto iter = beg; iter != end; ++iter)
    {
        yield func(*iter);
    }
}

Moving further with this idea, we could pull another page out of the C# playbook and enrich the sequence class with a whole set of composable, deferred query operators, a la LINQ:

template <typename T>
class sequence
{
public:
    template <typename Predicate> bool all(Predicate predicate);
    [...]
    static sequence<int> range(int from, int to);
    template <typename TResult> sequence<TResult> select(std::function<TResult(T)> selector);
    sequence<T> take(int count);
    sequence<T> where(std::function<bool(T)> predicate);
};

Lazy sequences

Certainly, resumable generators would be a very interesting addition to the standard. But how would they work? We saw that the Visual Studio CTP comes with a first implementation of resumable functions built over the PPL task library, but in this case the CTP is of little help, since it does not support generator functions yet. Maybe they will be part of a future release… but why to wait? We can implement them ourselves! 🙂

In the rest of this post I’ll describe a possible simple implementation of C++ lazy generators.

Let’s begin with the lazy sequence<T> class. This is a STL-like collection which only needs to support input iterators, with a begin() and an end() method.

Every instance of this class must somehow be initialized with a functor that represents the generator function that will generate the values of the sequence. We’ll see later what can be a good prototype for it.

As we said, the evaluation of this function must be deferred to the moment when the values are retrieved, one by one, via the iterator. All the logic for executing, suspending and resuming the generator will actually be implemented by the iterator class, which therefore needs to have a reference to the same functor.

So, our first cut at the sequence class could be something like this:

template<typename T>
class sequence_iterator
{
    // TO DO
};
template<typename T>
class sequence
{
public:
    typedef typename sequence_iterator<T> iterator;
    typedef ??? functor;

    sequence(functor func) : _func(func) { }
    iterator begin() {
        return iterator(_func);
    }
    iterator end() {
        return iterator();
    }

private:
    functor _func;
};

Step by step

The sequence<T> class should not do much more than create iterators. The interesting code is all in the sequence iterator, which is the object that has the ability to actually generate the values.

Let’s go back to our Fibonacci generator and write some code that iterates through it:

sequence<int> fibonacci() resumable
{
    int a = 0;
    int b = 1;
    while (true)
    {
        yield b;
        int tmp = a + b;
        a = b;
        b = tmp;
    }
}

auto fibs = fibonacci();
for (auto it : fibs)
{
    std::cout << *it << std::endl;
}

How should this code really work? Let’s follow its execution step by step.

  1. First, we call the function fibonacci(), which returns an object of type sequence<int>. Note that at this point the execution of the function has not even started yet. We just need to return a sequence object somehow associated to the body of the generator, which will be executed later.
  2. The returned sequence is copied into the variable fibs. We need to define what does it mean to copy a sequence: should we allow copy operations? Should we enforce move semantic?
  3. Given the sequence fibs, we call the begin() method which returns an iterator “pointing ” to the first element of the sequence. The resumable function should start running the moment the iterator is created and execute until a first value is yielded (or until it completes, in case of empty sequences).
  4. When the end() method is called, the sequence returns an iterator that represents the fact that the generator has completed and there are no more values to enumerate.
  5. The operator == () should behave as expected, returning true if both iterators are at the same position of the same sequence, or both pointing at the end of the sequence.
  6. The operator *() will return the value generated by the last yield statement (i.e., the current value of the sequence).
  7. At each step of the iteration, when operator ++() is called, the execution of the generator function will be resumed, and will continue until either the next yield statement updates the current value or until the function returns.

Putting all together, we can begin to write some code for the sequence_iterator class:

template<typename T>
class sequence_iterator
{
public:
    typedef ??? functor;

    sequence_iterator(functor func) {
        // initializes the iterator from the generator functors, executes the functors
        // until it terminates or yields.
    }
    sequence_iterator() : _func(func) {
        // must represent the end of the sequence
    }
    bool operator == (const sequence_iterator& rhs) {
        // true if the iterators are at the same position.
    }
    bool operator != (const sequence_iterator& rhs) {
        return !(*this==rhs);
    }
    const T& operator * () const {
        return _currentVal;
    }
    sequence_iterator operator ++ () {
        // resume execution
        return *this;
    }

private:
    T _currentVal;
};

The behavior of the iterator is fairly straightforward, but there are a few interesting things to note. The first is that evidently a generator function does not do what it says: looking at the code of the fibonacci() function there is no statement that actually returns a sequence<T>; what the code does is simply to yield the sequence elements, one at the time.

So who creates the sequence<T> object? Clearly, the implementation of generators cannot be purely library-based. We can put in a library the code for the sequence<T> and for its iterators, we can also put in a library the platform-dependent code that manages the suspension and resumptions of generators. But it will be up to the compiler to generate the appropriate code that creates a sequence<T> object for a generator function. More on this later.

Also, we should note that there is no asynchrony or concurrency involved in this process. The function could resume in the same thread where it suspended.

Generators as coroutines

The next step is to implement the logic to seamlessly pause and resume a generator. A generator can be seen as an asymmetric coroutine, where the asymmetry lies in the fact that the control can be only yielded back to the caller, contrary to the case of symmetric coroutines that can yield control to any other coroutine at any time.

Unfortunately coroutines cannot be implemented in a platform-independent way. In Windows we can use Win32 Fibers (as I described in this very old post) while on POSIX, you can use the makecontext()/swapcontext() API. There is also a very nice Boost library that we could leverage for this purpose.

But let’s ignore the problems of portability, for the moment, and assume that we have a reliable way to implement coroutines. How should we use them in an iterator? We can encapsulate the non-portable code in a class __resumable_func that exposes this interface:

template <typename TRet>
class __resumable_func
{
    typedef std::function<void(__resumable_func&)> TFunc;

public:
    __resumable_func(TFunc func);

    void yieldReturn(const TRet& value);
    void yieldBreak();
    void resume();

    const TRet& getCurrent() const;
    bool isEos() const;
}

The class is templatized on the type of the values produced by the generator and provides methods to yield one value (yieldReturn()), to retrieve the current value (i.e., the latest value yielded) and to resume the execution and move to the next value.

It should also provide methods to terminate the enumeration (yieldBreak()) and to tell if we have arrived at the end of the sequence (isEos()).

The function object passed to the constructor represents the generator function itself that we want to run. More precisely, it is the function that will be executed as a coroutine, and its prototype tells us that this function, in order to be able to suspend execution, needs a reference to the __resumable_func object that is running the coroutine itself.

In fact the compiler should transform the code of a generator into the (almost identical) code of a lambda that uses the __resumable_func object to yield control and emit a new value.

For example, going back again to our fibonacci() generator, we could expect the C++ compiler to transform the code we wrote:

sequence<int> fibonacci() resumable
{
    int a = 0;
    int b = 1;
    while (true)
    {
        yield b;
        int tmp = a + b;
        a = b;
        b = tmp;
    }
}

into this lambda expression:

auto __fibonacci_func([](__resumable_func<int>& resFn) {
    int a = 0;
    int b = 1;
    while (true)
    {
        resFn.yieldReturn(b);
        int tmp = a + b;
        a = b;
        b = tmp;
    }
});

where the yield statement has been transformed into a call to __resumable_func::yieldReturn().

Likewise, client code that invokes this function, like:

sequence<int> fibs = fibonacci();

should be transformed by the compiler into a call to the sequence constructor, passing this lambda as argument:

sequence<int> fibs(__fibonacci_func);

Sequence iterators

We can ignore the details of the implementation of __resumable_func<T> coroutines for the moment and, assuming that we have them working, we can now complete the implementation of the sequence_iterator class:

template <typename T>
class sequence_iterator
{
    std::unique_ptr<__resumable_func<T>> _resumableFunc;

    sequence_iterator() :
        _resumableFunc(nullptr)
    {
    }

    sequence_iterator(const std::function<void(__resumable_func<T>&)> func) :
        _resumableFunc(new __resumable_func<T>(func))
    {
    }

    sequence_iterator(const sequence_iterator& rhs) = delete;
    sequence_iterator& operator = (const sequence_iterator& rhs) = delete;
    sequence_iterator& operator = (sequence_iterator&& rhs) = delete;

public:
    sequence_iterator(sequence_iterator&& rhs) :
        _resumableFunc(std::move(rhs._resumableFunc))
    {
    }

    sequence_iterator& operator++()
    {
        _ASSERT(_resumableFunc != nullptr);
        _resumableFunc->resume();
        return *this;
    }

    bool operator==(const sequence_iterator& _Right) const
    {
        if (_resumableFunc == _Right._resumableFunc) {
            return true;
        }

        if (_resumableFunc == nullptr) {
            return _Right._resumableFunc->isEos();
        }

        if (_Right._resumableFunc == nullptr) {
            return _resumableFunc->isEos();
        }

        return (_resumableFunc->isEos() == _Right._resumableFunc->isEos());
    }

    bool operator!=(const sequence_iterator& _Right) const
    {
        return (!(*this == _Right));
    }

    const T& operator*() const
    {
        _ASSERT(_resumableFunc != nullptr);
        return (_resumableFunc->getCurrent());
    }
};

The logic here is very simple. Internally, a sequence_iterator contains a __resumable_func object, to run the generator as a coroutine. The default constructor creates an iterator that points at the end of the sequence. Another constructor accepts as argument the generator function that we want to run and starts executing it in a coroutine and the function will run until either it yields a value or terminates, giving the control back to the constructor. In this way we create an iterator that points at the beginning of the sequence.

If a value was yielded, we can call the dereference-operator to retrieve it from the __resumable_func object. If the function terminated, instead, the iterator will already point at the end of the sequence. The equality operator takes care of equating an iterator whose function has terminated to the end()-iterators created with the default constructor. Incrementing the iterator means resuming the execution of the coroutine, from the point it had suspended, giving it the opportunity to produce another value.

Note that, since the class owns the coroutine object, we disable copy constructors and assignment operators and only declare the move constructor, to pass the ownership of the coroutine.

Composable sequence operators

Almost there! We have completed our design, but there are still a few details to work out. The most interesting are related to the lifetime and copyability of sequence objects. What should happen with code like this?

sequence<int> fibs1 = fibonacci();
sequence<int> fibs2 = fibs1;
for (auto it1 : fibs1) {
    for (auto it2 : fibs2) {
        ...
    }
}

If we look at how we defined class sequence<T>, apparently there is no reason why we should prevent the copy of sequence objects. In fact, sequence<T> is an immutable class. Its only data member is the std::function object that wraps the functor we want to run.

However, even though we don’t modify this functor object, we do execute it. This object could have been constructed from a lambda expression that captured some variables, either by value or by reference. Since one of the captured variables could be a reference to the same sequence<T> object that created that iterator, we need to ensure that the sequence object will always outlive its functors, and allowing copy-semantics suddenly becomes complicated.

This brings us to LINQ and to the composability of sequences. Anyone who has worked with C# knows that what makes enumerable types truly powerful and elegant is the ability to apply chains of simple operators that transform the elements of a sequence into another sequence. LINQ to Objects is built on the concept of a data pipeline: we start with a data source which implements IEnumerable<T>, and we can compose together a number of query operators, defined as extension methods to the Enumerable class.

For example, this very, very useless query in C# generates the sequence of all square roots of odd integers between 0 and 10:

var result = Enumerable.Range(0, 10)
    .Where(n => n%2 == 1)
    .Select(n => Math.Sqrt(n));

Similarly, to make the C++ sequence<T> type really powerful we should make it composable and enrich it with a good range of LINQ-like operators to generate, filter, aggregate, group, sort and generally transform sequences.

These are just a few of the operators that we could define in the sequence<T> class:

template <typename T>
class sequence
{
public:
    [...]
    static sequence<int> range(int from, int to);
    template <typename TResult> sequence<TResult> select(std::function<TResult(T)> selector);
    sequence<T> where(std::function<bool(T)> predicate);
};

to finally be able to write the same (useless) query:

sequence<double> result = sequence<int>::range(0, 10)
    .where([](int n) { return n => n%2 == 1; })
    .select([](int n) { return sqrt(n); });

Let’s try to implement select(), as an experiment. It is conceptually identical to the lazy_tform() method  we saw before, but now defined in the sequence class. A very naïve implementation could be as follows:

// Projects each element of a sequence into a new form. (NOT WORKING!)
template <typename TResult>
sequence<TResult> select(std::function<TResult(T)> selector)
{
    auto func = [this, selector](__resumable_func<T>& rf) {
        for (T t : *this)
        {
            auto val = selector(t);
            rf.yieldReturn(val);
        }
    };
    return sequence<TResult>(func);
}

It should be now clear how it works: first we create a generator functor, in this case with a lambda expression, and then we return a new sequence constructed on this functor. The point is that the lambda needs to capture the “parent” sequence object to be able to iterate through the values of its sequence.

Unfortunately this code is very brittle. What happens when we compose more operators, using the result of one as the input of the next one in the chain? When we write:

sequence<double> result = sequence<int>::range(0, 10)
    .where([](int n) { return n => n%2 == 1; })
    .select([](int n) { return sqrt(n); });

there are (at least) three temporary objects created here, of type sequence<T>, and their lifetime is tied to that of the expression, so they are deleted before the whole statement completes.

A chain of sequences

The situation is like in the figure: the functor of each sequence in the chain is a lambda that has captured a pointer to the previous sequence object. The problem is in the deferred execution: nothing really happens until we enumerate the resulting sequence through its iterator, but as soon as we do so each sequence starts pulling values from its predecessor, which has already been deleted.

Temporary objects and deferred execution really do not get along nicely at all. On one hand in order to compose sequences we have to deal with temporaries that can be captured in a closure and then deleted long before being used. On the other hand, the sequence iterators, and their underlying coroutines, should not be copied and can outlive the instance of the sequence that generated them.
We can enforce move semantics on the sequence<T> class, but then what do we capture in a generator like select() that acts on a sequence?

As often happens, a possible solution requires adding another level of indirection. We introduce a new class, sequence_impl<T>, which represents a particular application of a generator function closure:

template <typename T>
class sequence_impl
{
public:
    typedef std::function<void(__resumable_func<T>&)> functor;

private:
    const functor _func;

    sequence_impl(const sequence_impl& rhs) = delete;
    sequence_impl(sequence_impl&& rhs) = delete;
    sequence_impl& operator = (const sequence_impl& rhs) = delete;
    sequence_impl& operator = (sequence_impl&& rhs) = delete;

public:
    sequence_impl(const functor func) : _func(std::move(func)) {}

    sequence_iterator<T> begin() const
    {
        // return iterator for beginning of sequence
        return iterator(_func);
    }
    sequence_iterator<T> end() const
    {
        // return iterator for end of sequence
        return iterator();
    }
};

A sequence_impl<T> is neither copiable nor movable and only provides methods to iterate through it.

The sequence<T> class now keeps only a shared pointer to the unique instance of a sequence_impl<T> that represents that particular application of the generator function. Now we can support chained sequences by allowing move semantics on the sequence<T> class.

template <typename T>
class sequence
{
    std::shared_ptr<sequence_impl<T>> _impl;

    sequence(const sequence& rhs) = delete;
    sequence& operator = (const sequence& rhs) = delete;

public:
    typedef typename sequence_impl<T>::iterator iterator;
    typedef typename sequence_impl<T>::functor functor;

    sequence(functor func) {
        _impl(std::make_shared<sequence_impl<T>>(func))
    }
    sequence(sequence&& rhs) {
        _impl = std::move(rhs._impl);
    }
    sequence& operator = (sequence&& rhs) {
        _impl = std::move(rhs._impl);
    }

    iterator begin() const {
        return _impl->begin();
    }
    iterator end() const {
        return _impl->end();
    }
};

The diagram below illustrates the relationships between the classes involved in the implementation of lazy sequences:

genFunc2

LINQ operators

Ok, now we have really almost done. The only thing left to do, if we want, is to write a few sequence-manipulation operators, modeled on the example of the LINQ-to-objects. I’ll list just a few, as example:

// Determines whether all elements of a sequence satisfy a condition.
bool all(std::function<bool(T)> predicate)
{
    if (nullptr == predicate) {
        throw std::exception();
    }

    for (auto t : *_impl)
    {
        if (!predicate(t)) {
            return false;
        }
    }
    return true;
}

// Returns an empty sequence
static sequence<T> empty()
{
    auto fn = [](__resumable_func<T>& rf) {
        rf.yieldBreak();
    };
    return sequence<T>(fn);
}

// Generates a sequence of integral numbers within a specified range [from, to).
static sequence<int> range(int from, int to)
{
    if (to < from) {
        throw std::exception();
    }

    auto fn = [from, to](__resumable_func<T>& rf) {
        for (int i = from; i < to; i++) {
            rf.yieldReturn(i);
        }
    };
    return sequence<int>(fn);
}

// Projects each element of a sequence into a new form.
template <typename TResult>
sequence<TResult> select(std::function<TResult(T)> selector)
{
    if (nullptr == selector) {
        throw std::exception();
    }

    std::shared_ptr<sequence_impl<T>> impl = _impl;
    auto fn = [impl, selector](__resumable_func<T>& rf) {
        for (T t : *impl)
        {
            auto val = selector(t);
            rf.yieldReturn(val);
        }
    };
    return sequence<TResult>(fn);
}

// Returns a specified number of contiguous elements from the start of a sequence.
sequence<T> take(int count)
{
    if (count < 0) {
        throw std::exception();
    }

    std::shared_ptr<sequence_impl<T>> impl = _impl;
    auto fn = [impl, count](__resumable_func<T>& rf) {
        auto it = impl->begin();
        for (int i = 0; i < count && it != impl->end(); i++, ++it) {
            rf.yieldReturn(*it);
        }
    };
    return sequence<T>(fn);
}

// Filters a sequence of values based on a predicate.
sequence<T> where(std::function<bool(T)> predicate)
{
    if (nullptr == predicate) {
        throw std::exception();
    }

    std::shared_ptr<sequence_impl<T>> impl = _impl;
    auto fn = [impl, predicate](__resumable_func<T>& rf) {
        for (auto item : *impl)
        {
            if (predicate(item)) {
                rf.yieldReturn(item);
            }
        }
    };
    return sequence<T>(fn);
}

We could write many more, but I think these should convey the idea.

Example: a prime numbers generator

As a final example, the following query lazily provides the sequence of prime numbers (smaller than INT_MAX), using a very simple, brute-force algorithm. It is definitely not the fastest generator of prime numbers, it’s maybe a little cryptic, but it’s undoubtedly quite compact!

sequence<int> primes(int max)
{
    return sequence<int>::range(2, max)
        .where([](int i) {
            return sequence<int>::range(2, (int)sqrt(i) + 2)
                .all([i](int j) { return (i % j) != 0; });
            });
}

Conclusion

In this article I rambled about generators in C++, describing a new sequence<T> type that model lazy enumerators and that could be implemented as an extension of resumable functions, as specified in N3858. I have described a possible implementation based on coroutines and introduced the possibility of extending the sequence class with a set of composable operators.

If you are curious and want to play with my sample implementation, you can find a copy of the sources here. Nothing too fancy, just the code that I showed in this post.

Appendix – Coroutines in Win32

Having completed my long ramble on the “platform independent” aspects of C++ generators, it’s time to go back to the point we left open: how to implement, on Windows, the coroutines that we encapsulated in the __resumable_func class?

We saw that the Visual Studio CTP comes with a first implementation of resumable functions, built over the PPL task library and using Win32 fibers as side-stacks. Even though the CTP does not support generator functions yet, my first idea was to just extend the <pplawait.h> library to implement them. However the code there is specialized for resumable functions that suspend awaitingfor some task, andit turns out that we can reuse only part of their code because, even if we are still dealing with resumable functions, the logic of await and yield are quite different.

In the case of await, functions can be suspended (possibly multiple times) waiting for some other task to complete. This means switching to a fiber associated to the task after having set up a continuation that will be invoked after the task completes, to switch the control back to the current fiber. When the function terminates, the control goes back to the calling fiber, returning the single return value of the async resumable function.

In the case of yield, we never suspend to call external async methods. Instead, we can suspend multiple times going back to the calling fiber, each time by returning one of the values that compose the sequence. So, while the implementation of the await keyword needs to leverage the support of PPL tasks, the concept of generator functions does not imply any concurrency or multithreading and using the PPL is not necessary.

Actually, there are ways to implement yield with await) but I could not find a simple way to use the new __await keyword without spawning new threads (maybe this could be possible with a custom PPL scheduler?)

So I chose to write the code for coroutines myself; the idea here is not very different from the one I described in a very old post (it looks like I keep rewriting the same post :-)) but now I can take advantage of the fiber-based code from the CTP’s <pplawait.h> library.

Win32 Fibers

Let’s delve into the details of the implementation.  Before all, let me summarize once again the Win32 Fiber API.

Fibers were added to Windows NT to support cooperative multitasking. They can be thought as lightweight threads that must be manually scheduled by the application. In other words, fibers are a perfect tool to implement coroutines sequencing.

When a fiber is created, with CreateFiber, it is passed a fiber-start function. The OS then assigns it a separate stack and sets up execution to begin at this fiber-start function. To schedule a fiber we need to “switch” to it manually with SwitchToFiber and once it is running, a fiber can then suspend itself only by explicitly yielding execution to another fiber, also by calling SwitchToFiber.

SwitchToFiber only works from a fiber to another, so the first thing to do is to convert the current thread into a fiber, with ConvertThreadToFiber. Finally, when we have done using fibers, we can convert the main fiber back to a normal thread with ConvertFiberToThread.

The __resumable_func class

We want to put all the logic to handle the suspension and resumption of a function in the __resumable_func<T> class, as described before.

In our case we don’t need symmetric coroutines; we just need the ability of returning control to the calling fiber. So our class will contain a handle to the “caller” fiber and a handle to the fiber we want to run.

#include <functional>
#include <pplawait.h>

template <typename TRet>
class __resumable_func : __resumable_func_base
{
    typedef std::function<void(__resumable_func&)> TFunc;

    TFunc _func;
    TRet _currentValue;
    LPVOID _pFiber;
    LPVOID _pCallerFiber;
    Concurrency::details::__resumable_func_fiber_data* _pFuncData;

public:
    __resumable_func(TFunc func);
    ~__resumable_func();

    void yieldReturn(TRet value);
    void yieldBreak();
    void resume();

    const TRet& getCurrent() const const { return _currentValue; }
    bool isEos() const { return _pFiber == nullptr; }

private:
    static void yield();
    static VOID CALLBACK ResumableFuncFiberProc(PVOID lpParameter);
};

The constructor stores a copy of the generator function to run, creates a new fiber object specifying ResumableFuncFiberProc as the function to execute, and immediately switches the execution to this fiber:

    __resumable_func(TFunc func) :
        _currentValue(TRet()),
        _pFiber(nullptr),
        _func(func),
        _pFuncData(nullptr)
    {
        // Convert the current thread to a fiber. This is needed because the thread needs to "be"
        // a fiber in order to be able to switch to another fiber.
        ConvertCurrentThreadToFiber();
        _pCallerFiber = GetCurrentFiber();

        // Create a new fiber (or re-use an existing one from the pool)
        _pFiber = Concurrency::details::POOL CreateFiberEx(Concurrency::details::fiberPool.commitSize,
            Concurrency::details::fiberPool.allocSize, FIBER_FLAG_FLOAT_SWITCH, &ResumableFuncFiberProc, this);
        if (!_pFiber) {
            throw std::bad_alloc();
        }

        // Switch to the newly created fiber. When this "returns" the functor has either returned,
        // or issued an 'yield' statement.
        ::SwitchToFiber(_pFiber);

        _pFuncData->suspending = false;
        _pFuncData->Release();
    }

The fiber will start from the fiber procedure, which has the only task of running the generator function in the context of the fiber:

    // Entry proc for the Resumable Function Fiber.
    static VOID CALLBACK ResumableFuncFiberProc(PVOID lpParameter)
    {
        LPVOID threadFiber;

        // This function does not formally return, due to the SwitchToFiber call at the bottom.
        // This scope block is needed for the destructors of the locals in this block to fire
        // before we do the SwitchToFiber.
        {
            Concurrency::details::__resumable_func_fiber_data funcDataOnFiberStack;
            __resumable_func* pThis = (__resumable_func*)lpParameter;

            // The callee needs to setup some more stuff after we return (which would be either on
            // yield or an ordinary return). Hence the callee needs the pointer to the func_data
            // on our stack. This is not unsafe since the callee has a refcount on this structure
            // which means the fiber will continue to live.
            pThis->_pFuncData = &funcDataOnFiberStack;

            Concurrency::details::POOL SetFiberData(&funcDataOnFiberStack);

            funcDataOnFiberStack.threadFiber = pThis->_pCallerFiber;
            funcDataOnFiberStack.resumableFuncFiber = GetCurrentFiber();

            // Finally calls the function in the context of the fiber. The execution can be
            // suspended by calling yield
            pThis->_func(*pThis);

            // Here the function has completed. We set return to true meaning this is the
            // final 'real' return and not one of the 'yield' returns.
            funcDataOnFiberStack.returned = true;
            pThis->_pFiber = nullptr;
            threadFiber = funcDataOnFiberStack.threadFiber;
        }

        // Return to the calling fiber.
        ::SwitchToFiber(threadFiber);

        // On a normal fiber this function won't exit after this point. However, if the fiber is
        // in a fiber-pool and re-used we can get control back. So just exit this function, which
        // will cause the fiber pool to spin around and re-enter.
    }

There are two ways to suspend the execution of the generator function running in the fiber and to yield control back to the caller. The first is to yield a value, which will be stored in a data member:

    void yieldReturn(TRet value)
    {
        _currentValue = value;
        yield();
    }

The second is to immediately terminate the sequence, for example with a return statement or reaching the end of the function. The compiler should translate a return into a call to the yieldBreak method:

void yieldBreak()
{
    _pFiber = nullptr;
    yield();
}

To yield the control we just need to switch back to the calling fiber:

    static void yield()
    {
        _ASSERT(IsThreadAFiber());
        Concurrency::details::__resumable_func_fiber_data* funcDataOnFiberStack =
            Concurrency::details::__resumable_func_fiber_data::GetCurrentResumableFuncData();

        // Add-ref's the fiber. Even though there can only be one thread active in the fiber
        // context, there can be multiple threads accessing the fiber data.
        funcDataOnFiberStack->AddRef();

        _ASSERT(funcDataOnFiberStack);
        funcDataOnFiberStack->verify();

        // Mark as busy suspending. We cannot run the code in the 'then' statement
        // concurrently with the await doing the setting up of the fiber.
        _ASSERT(!funcDataOnFiberStack->suspending);
        funcDataOnFiberStack->suspending = true;

        // Make note of the thread that we're being called from (Note that we'll always resume
        // on the same thread).
        funcDataOnFiberStack->awaitingThreadId = GetCurrentThreadId();

        _ASSERT(funcDataOnFiberStack->resumableFuncFiber == GetCurrentFiber());

        // Return to the calling fiber.
        ::SwitchToFiber(funcDataOnFiberStack->threadFiber);
    }

Once we have suspended, incrementing the iterator will resume the execution by calling resume, which will switch to this object’s fiber:

    void resume()
    {
        _ASSERT(IsThreadAFiber());
        _ASSERT(_pFiber != nullptr);
        _ASSERT(_pFuncData != nullptr);
        _ASSERT(!_pFuncData->suspending);
        _ASSERT(_pFuncData->awaitingThreadId == GetCurrentThreadId());

        // Switch to the fiber. When this "returns" the functor has either returned, or issued
        // an 'yield' statement.
        ::SwitchToFiber(_pFiber);

        _ASSERT(_pFuncData->returned || _pFuncData->suspending);
        _pFuncData->suspending = false;
        if (_pFuncData->returned) {
            _pFiber = nullptr;
        }
        _pFuncData->Release();
    }

The destructor just needs to convert the current fiber back to a normal thread, but only when there are no more fibers running in the thread. For this reason we need to keep a per-thread fiber count, which is incremented every time we create a __resumable_funcand decremented every time we destroy it.

~__resumable_func()
{
    if (_pCallerFiber != nullptr) {
        ConvertFiberBackToThread();
    }
}

class __resumable_func_base
{
    __declspec(thread) static int ts_count;

protected:
    // Convert the thread to a fiber.
    static void ConvertCurrentThreadToFiber()
    {
        if (!IsThreadAFiber())
        {
            // Convert the thread to a fiber. Use FIBER_FLAG_FLOAT_SWITCH on x86.
            LPVOID threadFiber = ConvertThreadToFiberEx(nullptr, FIBER_FLAG_FLOAT_SWITCH);
            if (threadFiber == NULL) {
                throw std::bad_alloc();
            }
            ts_count = 1;
        }
        else
        {
            ts_count++;
        }
    }

    // Convert the fiber back to a thread.
    static void ConvertFiberBackToThread()
    {
        if (--ts_count == 0)
        {
            if (ConvertFiberToThread() == FALSE) {
                throw std::bad_alloc();
            }
        }
    }
};
__declspec(thread) int __resumable_func_base::ts_count = 0;

And this is all we need to have resumable generators in C++, on Windows. The complete source code can be found here.

Async-Await in C++

In the previous post we had a quick look at the problem of writing concurrent, asynchronous code with C++. We saw that the <future> library, introduced with C++11, offers a partial solution to this problem by separating the act of initiating an operation from the act of waiting for its result. But we saw that the act of waiting for a result is still synchronous, and that std:futures are not easily composable.

We talked of the platform-specific improvements offered by Microsoft’s PPL tasks, which have become the model for a few interesting changes proposed to the C++ standard library.

But are these improvements enough?

The problems with Tasks

Having composable futures would certainly be a big improvement for C++. But the experience with C# tells us that it’s not a perfect solution. A purely library-based solution is bound to have limitations. In particular, .NET’s TPL has the defect of making the code overly complex.

It is difficult to write robust, readable and debuggable code with TPL. It is too easy to end up with spaghetti code: a large number of Task<T>s interconnected in inextricable chains of continuations.

I found out the hard way, when I was given, by my last employer, a .NET component to maintain which had been architected as an incredibly convoluted graph of interconnected TPL Tasks, often joined with conditional ContinueWith().

Loops and conditionals

It is particularly difficult to write iterations and branches with tasks.

In my previous post I used as example a copyFile function that asynchronously read all the file content into a single string and copied it into another file. Of course such an implementation would not be practical with very large files, so let’s rewrite it so that it will read and write a file chunk by chunk.

Here I am using the PPL, but very similar code could be written with std::future (especially when they will support task continuations with future::then).

We can easily write an iterative function that use tasks to read and write asynchronously every chunk of a file, but that still blocks waiting for these tasks to complete:

Concurrency::task<string> readFileChunk(ifstream& file, int chunkLength)
{
    return Concurrency::create_task([&file, chunkLength](){
        vector<char> buffer(chunkLength);
        file.read(&buffer[0], chunkLength);
        return string(&buffer[0], (unsigned int)file.gcount());
    });
}

Concurrency::task<void> writeFileChunk(ofstream& file, const string& chunk)
{
    return Concurrency::create_task([&file, chunk](){
        file.write(chunk.c_str(), chunk.length());
    });
}

void copyFile_get(const string& inFilePath, const string& outFilePath)
{
    ifstream inFile(inFilePath, ios::binary | ios::ate);
    inFile.seekg(0, inFile.beg);
    ofstream outFile(outFilePath, ios::binary);

    string chunk;
    while (chunk = readFileChunk(inFile, 4096).get(), !chunk.empty()) {
        writeFileChunk(outFile, chunk).get();
    }
}

The problem is that this code is not much better than a purely synchronous loop of blocking calls to read and write the file; if our thread schedules asynchronous operations but then immediately blocks waiting for them to complete, we cannot really do much else concurrently.

It would be better to chain a sequence of tasks that read and write the file chunk by chunk, so that they can be scheduled one after the other leaving our thread free to do other work while these tasks are blocked in some I/O operation.

It turns out that writing a “task loop” correctly is not trivial, especially if there are many possible error cases or exceptions to handle. It is actually so tricky that there are MSDN pages that explains what to do, and what the possible pitfalls are, and the PPL Asynchronous Sample Pack (PPL Asynchronous Sample Pack) even provides sample code for this.

The code I put together is neither clean nor concise:

task<shared_ptr<string>> readFileChunk(shared_ptr<ifstream> file, int chunkLength)
{
    return create_task([file, chunkLength]( {
        vector<char> buffer(chunkLength);
        file->read(&buffer[0], chunkLength);
        return make_shared<string>(&buffer[0], (unsigned int)file->gcount());
    });
}

task<bool> writeFileChunk(shared_ptr<ofstream> file, shared_ptr<string> chunk)
{
    return create_task([file, chunk](){
        file->write(chunk->c_str(), chunk->length());
        return chunk->length() == 0;
    });
}

task<void> copyFile_repeat(shared_ptr<ifstream> inFile, shared_ptr<ofstream> outFile)
{
    return readFileChunk(inFile, 4096)
        .then([=](shared_ptr<string> chunk){
            return writeFileChunk(outFile, chunk);
        })
        .then([=](bool eof){
            if (!eof) {
                return copyFile_repeat(inFile, outFile);
            }
            else {
                return task_from_result();
            }
        });
}

task<void> copyFile_then(const string& inFilePath, const string& outFilePath)
{
    auto inFile = make_shared<ifstream>(inFilePath, ios::binary | ios::ate);
    inFile->seekg(0, inFile->beg);
    auto outFile = make_shared<ofstream>(outFilePath, ios::binary);

    return copyFile_repeat(inFile, outFile);
}

It is probably easy to do better, to write cleaner code that executes in a loop a simple continuation chain like this one, but it is also quite evident that when the algorithm becomes more complicated the asynchronous code can become very convoluted.

Debugging

And then there is the problem of debugging: if we set a breakpoint in the file->write(…) line, for example, when the execution stops there the debugger will present us with a quite strange call stack. With continuations we lose the usual “causality chain” of return stacks, typical of synchronous code. Here the lambdas that compose the body of the tasks are scattered in the call stack, and distributed across several threads and sometimes it can be difficult to understand what is going on.

Image

Image

The async-await pattern

The complexity of working with Tasks is well known to C# programmers, who have already had a few years to experiment and gain experience with the .NET TPL. This is the reason why the biggest improvement of C# 5.0 was the introduction of the Async/Await pattern, designed on the model of F# asynchronous workflows.

To see how it works, let’s start by writing in C# an iterative but blocking implementation of our CopyFile function. This can be done very easily:

        private void CopyChunk(Stream input, Stream output)
        {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = input.Read(buffer, 0, buffer.Length)) != 0)
            {
                output.Write(buffer, 0, bytesRead);
            }
        }

        public void CopyFile(string inFile, string outFile)
        {
            using (StreamReader sr = new StreamReader(inFile)
            {
                using (StreamWriter sw = new StreamWriter(outFile)
                {
                    CopyChunk(sr.BaseStream, sw.BaseStream);
                }
            }
        }

Let’s say that we want now to make the function completely asynchronous. Ignoring the fact that the framework actually provides a Stream.CopyToAsync, we can use other asynchronous .NET APIs, like Stream.ReadAsync and Stream.WriteAsync that return a Task, together with async/await, and write this:

        private async Task CopyChunk_Async(Stream input, Stream output)
        {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = await input.ReadAsync(buffer, 0, buffer.Length)) != 0)
            {
                await output.WriteAsync(buffer, 0, bytesRead);
            }
        }

        public async Task CopyFile_Async(string inFile, string outFile)
        {
            using (StreamReader sr = new StreamReader(inFile)
            {
                using (StreamWriter sw = new StreamWriter(outFile))
                {
                   await CopyChunk_Async(sr.BaseStream, sw.BaseStream);
                }
            }
        }

Now, this is really beautiful! We did only a few small changes to the structure of the code, adding the async modifier and changing the return type of our methods, and adding a strange await keyword in the places where we call asynchronous functions. The logic of the function has not changed at all, and yet, it now executes in a completely different way. Let’s see how.

In C# a method can be declared as asynchronous by adding the async modifier to the method declaration. For example:

public static async Task<int> FooAsync() {…}

An asynchronous method is like a regular C# method, but its return type can only be of type void, Task or Task<T>, and it can also contain await expressions, like:

int result = await FooAsync();

Awaiting on an expression means launching the asynchronous execution of that expression (usually an async method) and, if it has not yet completed, immediately yielding the execution to the caller.

So asynchronous methods are, like iterator methods, a kind of coroutine. They are resumable, their execution can “magically” pause in a few points identified by the ‘await’ statements, and then “magically” resume when the asynchronous expression has completed. While the method is paused waiting for the completion its caller continues to run. This means that if the method was called from an UI message loop, for example, the UI thread can continue processing messages, and the application will remain responsive. If the method was called by a worker thread in a web server the thread will be free to serve more requests.

But just like iterator methods, in C# asynchronous methods are not really coroutines. They are instead implemented with a state machine. The details are quite complicated (a very good description can be found here and here), but what happens is that the compiler transforms the function body by attaching a task continuation to the asynchronous operation so that the execution will resume from the point where it had left.

In which thread the awaitable expression executes, and in which thread the async method will resume? It’s up to the task scheduler to decide; asynchrony does not mean parallelism, a method does not need to run on another thread to be asynchronous. The scheduler may run the task later in the same thread, or it may run it on a worker thread from the ThreadPool. The execution will then resume on the right thread (for example the UI thread in a GUI app) according to the current synchronization context. For the point of view of the developer, this means that he cannot rely on the fact that an async method will continue its execution on the same thread where it started (with all the consequent multithreading headaches).

C++ resumable functions

So, everything is nice in the managed world of C#, but what about native programming? Is there anything like async/await that we can use with our futures? We can find the answer in N3858, another proposal made by Gustafsson et al. for C++17.

This time the changes proposed are to the language itself and not just to the library. The idea is to introduce the equivalent of C# async methods in the form of resumable functions. They can be thought as the basis to add to C++ the support for real co-routines, and are not strictly related to the <future> library, even though they have being defined especially to improve the usability of futures and promises.

Resumable functions would have almost exactly the same semantic of async/await in C#: two new keywords would be added to the language:

–        The resumable keyword, added after the argument list, defines a function as being suspendable and resumable, in the way described before. It is the equivalent of C#’s async.

–        Like in C#, the await keyword in the body of a resumable function identifies a suspension point, one of the places where the function may suspend execution and then resume it later (possibly in a different thread) when some asynchronous operation completes.

For example:

        future<int> abs_r(future<int> i_op) resumable
        {
            int i = await i_op;
            return (i < 0) ? –i : i;
        }

This would define a resumable function that takes a future<int> as argument, suspends itself waiting for the future result to be available, and produces an integer as result. Of course, a resumable function can only return a future<T> or shared_future<T>. It completes and produces its final value only when a return statement is executed.

Visual Studio 2013 CTP

Will this proposal be accepted? Will resumable functions become part of the standard? And is there a portable way to implement them?

I really cannot answer any of these questions. But, interestingly, it is already possible to play with resumable functions today. Last November Microsoft shipped the Visual Studio November 2013 CTP with a preview of several C++ features that will be released in the next version of Visual Studio and that improve the conformance to the C++ standard. Between those, there is also the first implementation of resumable functions.

To enable them we must change the “Platform Toolset” of our C++ project to use the updated compiler from the CTP, as shown in figure. Also, we need to include the new header <pplawait.h> which extends the PPL <ppltasks.h>.

Image

The syntax of CTP’s resumable functions is very similar to the one proposed by Gustafsson, with a small change in the keywords. In order not to “taint” the language with (still) non-standard keywords, async has been renamed as __resumable and await as __await.

So, we can now rewrite one last time our copyFile method, this time using the CTP’s resumable functions:

#include <pplawait.h>

task<void> copyFile_res(const string inFilePath, const string outFilePath) __resumable
{
	auto inFile = make_shared<ifstream>(inFilePath, ios::binary | ios::ate);
	inFile->seekg(0, inFile->beg);
	auto outFile = make_shared<ofstream>(outFilePath, ios::binary);

	while (true)
	{
		shared_ptr<string> s = __await readFileChunk(inFile, 4096);
		if (s->empty()) { break; }
		__await writeFileChunk(outFile, s);
	}
}

int _tmain(int argc, _TCHAR* argv[])
{
    …
    copyFile_res(file1, file2).get();
}

We got there! Finally we have simple, readable, maintainable asynchronous C++ code. The only small complication is that it is necessary to pay a lot of attention to the life scope of the objects passed to the asynchronous/resumable methods. In C# things are easier, but we don’t have a garbage collector here, so it is often better to wrap the objects in shared_ptr<T>s to make sure they don’t get destroyed when they are still referenced by some async task. A small tribute to pay to the Gods of asynchrony, anyway.

A kind of magic…

But how would resumable functions work, in practice? To learn more about this it is interesting to study the <pplawait.h> header and to watch the interesting Channel 9 talk of Deon Brewis, from last year’s GoingNative conference.

Gustafsson’s proposal describes two possible solutions:

  1. With a state machine, like in C#.
  2. With resumable side-stacks (aka Fibers).

The first implementation would be similar to how async/await is implemented in C#. The compiler would transform the code of the function, implementing the logic of a state machine that keeps track of the current state of the function. The implementation would be more complicated than in C# because C++ does not have garbage collection; the function variables could not live in the stack (because the stack frame would be lost when the function is suspended), so they would have to be stored in an heap-allocated activation frame.

But the second way of implementing resumable function seems more elegant: each function would have its own side stack, a stack separated by the thread stack and allocated by the compiler when the resumable function is invoked, and destroyed when the resumable function completes. What the proposal describes is actually to using something identical to Fibers, in Windows.

The very, very surprising thing is, in my opinion, that Microsoft chose the second option, the fibers, to implement resumable functions in the CTP.

Fibers

Now, this is curious: a couple of years ago I wrote a few posts about a possible way to write C#-like iterator blocks and LINQ operators in C++ using Fibers.

Fibers are an almost forgotten, old feature added to Windows NT to support cooperative multitasking, in the form of lightweight threads that must be manually scheduled by the application. If I am not wrong, they were added to Windows to improve the performance of the first versions of SQL Server.

My implementation of iterator blocks was very questionable for many reasons (for example, I just copied the IEnumerator/IEnumerable interfaces from .NET rather than writing something that could work better with C++ STL) but I thought that the weakest point was probably the use of fibers.

In fact it is difficult to write correct code with fibers; the entire program must be designed to support them. During the years, many things in Windows have grown increasingly incompatible with them. The whole .NET does not work with Fibers, and an attempt to fix this proved so complicated that was soon abandoned.

Evidently this is not necessarily a problem if fibers are used in the limited scope of supporting native resumable functions, and under the direct control of our compiler. But even so, the functions and libraries that we call from our resumable function must be fiber-safe (in addition to being thread-safe); for example thread-local storage does not work with fibers. The CRT itself was not completely fiber-safe in the past, but I guess this should have been fixed by now. And there is the problem of managing C++ exceptions: in the past they could only be caught by the fiber that threw them, they could not pass across fibers’ boundaries, but even this problem can be solved by the compiler or library.

Also, I wonder whether there is a portable way to implement side-stacks also on non-Windows platforms, and if some support from the OS will necessarily be required.

Under the hood

But let’s look in more detail at the CTP code, to learn how resumable functions actually work. The implementation is split between the compiler and the library code in <pplawait.h>.

The code of <pplawait.h> is organized in two main classes:

template <typename TTask, typename TFunc, typename TTaskType>
class __resumable_func_setup_data
{
    __resumable_func_fiber_data* _func_data;
    LPVOID _pSystemFiber;
    LPVOID _pvLambda; // The function to execute
    task_completion_event<TTaskType> _tce;
    [...]
};
struct __resumable_func_fiber_data
{
    LPVOID threadFiber;         // The thread fiber to which to return.
    LPVOID resumableFuncFiber;  // The fiber on which this function will run
    volatile LONG refcount;     // Refcount of the fiber
    [...]
};

The first, __resumable_func_setup_data, contains the data and logic to initialize a resumable method call, and it’s templatized on the function type.

The second, __resumable_func_fiber_data, contains the data and methods to manage the life of a resumable method that runs on a fiber.

When a function is declared as resumable, like:

    task<int> foo(future<int> fi) __resumable

the compiler generates the code for a stub function foo_resumable.

When a resumable function is called, the compiler generates a call to this stub function, which takes care of executing foo into a new “resumable context”. The code for this is in a static function:

    TTask __resumable_func_setup_data::RunActionInResumableContext(TFunc& lambda)

which shows how a Fiber can be used to seamlessly suspend and resume a function:

  1. The first thing to do is to convert the caller thread into a fiber, by calling __resumable_func_fiber_data::ConvertCurrentThreadToFiber(). When the function completes it will convert the fiber back into a normal thread with ConvertFiberToThread.
  2. Then, a __resumable_func_setup_data struct is initialized with the data required to invoke the function on a fiber.
  3. After this, a new fibers is created, specifying as entry point __resumable_func_setup_data.ResumableFuncFiberProc. Since creating and destroying a fiber is a relatively expensive operation, the library also implements a class FiberPool, to manage a pool of Fiber, which will be reused when needed.
  4. Now the current fiber can suspend itself and start the execution of this new fiber by calling SwitchToFiber.

Calling SwitchToFiber means manually doing a context switch (or better, a fiber switch). The instruction pointer of our thread suddenly jumps to a different place, using a different stack, and all the registers are updated. But the old context and stack is not lost: it is part of the previous fiber. It is just not running, because it is not associated to any thread, but it can be resumed in any moment.

For example, the following figure shows what happens when the resumable copyFile_res is called in our copyFile program: first the current thread (Thread A) is converted into a fiber (Fiber 0), then a new fiber (Fiber 1) is created to run copyFile_res, and the thread execution is switched to the new Fiber. At that point only Fiber 1 is running. Fiber 0 still exists, with its stack, but it’s suspended.

Image

When the ResumableFuncFiberProc starts executing in the context of the new fiber, it:

  1. Retrieves a pointer to the __resumable_func_setup_data from its parameters,
  2. Initializes a new instance of __resumable_func_fiber_data, allocated on the thread stack, with all the data required to manage the function (like the handle of the caller-fiber), and
  3. Finally executes the resumable function in the context of the fiber.

Now we are executing our functor in the context of the fiber. We can leave the function in two ways:

–        We can complete the function (with a return statement), and return the final result, or

–        We can temporarily suspend the function on an ­­__await expression and yield the execution to the caller.

In the first case, all we need to do is to complete the ResumableFuncFiberProc, store the result and call SwitchToFiber(previousFiber) to resume the previous fiber. The “terminated” fiber is now ready to be deleted or recycled.

Things get more interesting when we have an await expression, like:

    shared_ptr<string> s = __await readFileChunk(inFile, 4096);

In this case the compiler converts the __await into a call to the library function

    template <typename T> T await_task(concurrency::task<T> func)

where func is a task that wraps the awaitable expression that is being called.

await_task implements all the logic to suspend the current function/fiber and to yield the control back to the previous (caller) fiber, so resuming it.

But before yielding the execution, await_task attaches a continuation to the task func. When that task will complete, we want to resume the current fiber, and to do this the continuation needs to retrieve and update the func_data_on_fiber_stack struct and to call SwitchToFiber once again.

The actual details are a little complicated, but the whole code in <pplawait.h> is not too difficult to read.

Changes to the Windows Fiber API

The last interesting thing to note is that the new <pplawait.h> library makes use of a new and slightly improved version of the Win32 Fiber API, defined for Windows 8.1 (where _WIN32_WINNT >= 0x0603) but actually available also on Windows 7 (at least with the most recently patched version of kernel32.dll).

The original APIs, available since WinNT 4.0, provided the following methods:

LPVOID WINAPI CreateFiber(
    _In_     SIZE_T dwStackSize,
    _In_     LPFIBER_START_ROUTINE lpStartAddress,
    _In_opt_ LPVOID lpParameter);
LPVOID WINAPI ConvertThreadToFiber(_In_opt_ LPVOID lpParameter);
VOID WINAPI DeleteFiber(_In_ LPVOID lpFiber);
BOOL WINAPI ConvertFiberToThread();
VOID WINAPI SwitchToFiber(_In_ LPVOID lpFiber);

The first two of these methods have now been superseded by an extended version:

LPVOID WINAPI CreateFiberEx(
    _In_     SIZE_T dwStackCommitSize,
    _In_     SIZE_T dwStackReserveSize,
    _In_     DWORD dwFlags,
    _In_     LPFIBER_START_ROUTINE lpStartAddress,
    _In_opt_ LPVOID lpParameter);
LPVOID WINAPI ConvertThreadToFiberEx(
    _In_opt_ LPVOID lpParameter,
    _In_     DWORD dwFlags);

The difference is that a Fiber can now be created specifying different values for the CommitSize and the ReserveSize of the Fiber stack.Here ReserveSize represents the total stack allocation in virtual memory, while CommitSize is the initially committed memory; it can be useful to fine-tunes these values optimize the memory usage. In fact, programs with a large number of concurrent resumable operations could end up allocating a very large amount of memory for the fiber stacks, since a different Fiber must be allocated for each resumable function.

There is also a dwFlags argument that can take the only possible value FIBER_FLAG_FLOAT_SWITCH and that it’s necessary to overcome an old limitation of the Fiber API, which in the past did not save the floating point registers in a Fiber-switch.

Finally, there is a new function, still undocumented but already used by the CTP library:

PVOID WINAPI CalloutOnFiberStack(
    _In_      PVOID lpFiber,
    _In_      PFIBER_CALLOUT_ROUTINE lpStartAddress,
    _In_opt_  PVOID lpParameter);

It is not difficult to imagine that its purpose is to execute a function on the Fiber specified and return the result to the caller.

Generator functions

We have almost completed a quick review of asynchrony in C++. But there is only one last thing to consider before finishing: asynchronous methods are very similar to iterator methods.

They are, both, a kind of co-routine. We can think of an async method as a strange kind of iterator, which returns only one value, at the end, but which is able to suspend its execution as iterators do after yielding each value. Or we could think of iterator blocks as strange kinds of async methods, because they can suspend and resume execution but only returning a value to the caller.

In C# the compiler implements both iterator methods and asynchronous methods transforming the code of the method into a state machine. We have seen that in C++ the proposal is to implement async methods as real co-routines, using side-stacks. So, the logical question to ask is “Can we use resumable functions to implement C#-style iterators”?

Of course we can! The same proposal N3858 explains that the concept of resumable functions can be expanded to include generator functions, which would use the same kind of interruption/resumption logic to produce a sequence of values.

Gustafsson defines a new special container, sequence<T>, to represent a sequence of values lazily generated by a generator. With this, it should be possible to write LINQ-like code, like the following:

template <typename Iter>
sequence<int> lazy_tform(Iter beg, Iter end, std::function<int(int)> func) resumable
{
    for (auto it = beg; it != end; ++it) {
        yield func(*it);
    }
}

But this will be the topic for a future post. Stay tuned! 🙂

Concurrency in C++11

C++11 is a much better language than its previous versions, with features like lambda expressions, initializer lists, rvalue references, automatic type deduction. The C++11 standard library now provides regular expressions, revamped smart pointers and a multi-threading library.

But modern C++ is still limited in its support of parallel and asynchronous computations, especially when compared to languages like C#.

The need for asynchrony

But why do we need support for asynchrony? Computer architectures are becoming more and more parallel and distributed, with multi-core processors nearly ubiquitous and with cores distributed in the cloud. And software applications are more and more often composed by multiple components distributed across many cores located in a single machine or across a network. Modern programming languages need to offer support to manage this parallelism.

At the same time, responsiveness has become a more and more indispensable quality of software (This is one of the tenets of “Reactive Programming”, an emerging programming paradigm).

Responsiveness means that we don’t want to block waiting for the I/O operation to complete. In the server-side we don’t want to block a worker thread while it could be used to do some other work and be alerted when an operation completes. And in the client-side we really don’t want to block in the main/GUI thread of our process, making the application sluggish and non-responsive. Being able to write asynchronous code is therefore more and more important to manage the latency of I/O operations without blocking and losing responsiveness. For example, as a rule, in WinRT all I/O-bound APIs that could take more than 50ms only provide asynchronous interfaces, and cannot even be invoked with a “classical” blocking call.

In this and in the next few posts we will have a look at what C++ currently provides to support concurrency, and what new features are on their way. We’ll see what is in the standard but also at the Windows-specific PPL framework provided by Microsoft.

A simple example

To understand how we can write asynchronous code in C++ let’s play with a very simple example. Let’s imagine that we want to write a function to read a file and copy its content into another file. To do this we could write functions like the following:

#include <string> 
#include <vector> 
#include <fstream> 
#include <iostream> 
using namespace std; 

vector<char> readFile(const string& inPath) 
{ 
    ifstream file(inPath, ios::binary | ios::ate); 
    size_t length = (size_t)file.tellg(); 
    vector<char> buffer(length); 
    file.seekg(0, std::ios::beg); 
    file.read(&buffer[0], length); 
    return buffer; 
} 

size_t writeFile(const vector<char>& buffer, const string& outPath) 
{ 
    ofstream file(outPath, ios::binary); 
    file.write(&buffer[0], buffer.size()); 
    return (size_t)file.tellp(); 
} 

With these, we can easily write a simple function to copy a file into another and return the number of characters written:

size_t sync_copyFile(const string& inFile, const string& outFile) 
{ 
    return writeFile(readFile(inFile), outFile); 
} 

Evidently we want to execute readFile and writeFile in sequence, one after the other. But should we block while waiting for them to complete? Well, this is evidently a contrived example, if the file is not very big it probably does not matter much, and if the file is very large we would rather use buffering and copy it in chunks rather than returning all its content in a big vector. But both readFile and writeFile are I/O bound and represent here just a model for more complex I/O operations; in real applications it is common to have to read some data from a network, transform it in some way and return a response or write it somewhere.

So, let’s say that we want to execute the copyFile operation asynchronously. How can we manage to do this in standard C++?

Task-based parallelism: futures and promises

The C++11 standard library provides a few mechanisms to support concurrency. The first is std::thread, which together with synchronization objects (std::mutex, std::lock_guards, std::condition_variables and so on) finally offer a portable way to write “classic” multithread-based concurrent code in C++.

We could modify copyFile to instantiate a new thread to execute the copy, and use a condition_variable to be notified when the thread completes. But working at the level of threads and locks can be quite tricky. Modern frameworks (like the TPL in .NET) provides offer a higher level of abstraction, in the form of task-based concurrency. There, a task represents an asynchronous operation that can run in parallel with other operations, and the system hides the details of how this parallelism is implemented.

The C++11 library, in its new <future> header, also provides a (somehow limited) support for task-based parallelism, in the form of promises and futures. The classes std::promise<T> and std::future<T> are roughly the C++ equivalent of a .NET Task<T>, or of a Future<T> of Java 8. They work in pairs to separate the act of calling a function from the act of waiting for the call results.

At the caller-side when we call the asynchronous function we do not receive a result of type T. What is returned instead is a std::future<T>, a placeholder for the result, which will be delivered at some point in time, later.

Once we get our future we can move on doing other work, while the task executes on a separate thread.

A std::promise<T> object represents a result in the callee-side of the asynchronous call, and it is the channel for passing the result asynchronously to the caller. When the task completes, it puts its result into a promise object calling promise::set_value.

When the caller finally needs to access the result it will call the blocking future::get() to retrieve it. If the task has already completed the result will be immediately available, otherwise, the caller thread will suspend until the result value becomes available.

In our example, this is a version of copyFile written to use futures and promises:

#include <future> 

size_t future_copyFile(const string& inFile, const string& outFile) 
{ 
    std::promise<vector<char>> prom1; 
    std::future<vector<char>> fut1 = prom1.get_future(); 
    std::thread th1([&prom1, inFile](){ 
        prom1.set_value(readFile(inFile)); 
    }); 

    std::promise<int> prom2; 
    std::future<int> fut2 = prom2.get_future(); 
    std::thread th2([&fut1, &prom2, outFile](){ 
        prom2.set_value(writeFile(fut1.get(), outFile)); 
    }); 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

Note that here we have moved the execution of the readFile and writeFile into separate tasks but we also have to configure and start threads to run them. Also, we capture references to the promise and future objects to make them available to the task functions. The first thread implements the read, and moves its result into a promise when it completes, in the form of a big vector. The second thread waits (blocking) on a corresponding future and when the read completes, get the read vector and pass it to the write function. Finally, when the writes complete, the number of chars written is put in the second future.

In the main function we could take advantage of this parallelism and do some lengthy operation before the call to future::get(). But when we call get() the main thread will still block if the read and write tasks have not completed yet.

Packaged tasks

We can slightly simplify the previous code with packaged_tasks. Class std::packaged_task<T> is a container for a task and its promise. Its template type is the type of the task function (for example, vector<char>(const string&) for our read function. It is a callable type (defines the operator()) and automatically creates and manage a std::promise<T> for us.

size_t packagedtask_copyFile(const string& inFile, const string& outFile) 
{ 
    using Task_Type_Read = vector<char>(const string&); 
    packaged_task<Task_Type_Read> pt1(readFile); 
    future<vector<char>> fut1{ pt1.get_future() }; 
    thread th1{ move(pt1), inFile }; 

    using Task_Type_Write = size_t(const string&); 
    packaged_task<Task_Type_Write> pt2([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }); 
    future<size_t> fut2{ pt2.get_future() }; 
    thread th2{ move(pt2), outFile }; 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

Note that we need to use move() to pass the packaged_task to thread because a packaged_task cannot be copied.

std::async

With packaged_tasks the logic of the function does not change much, the code becomes slightly more readable, but we still have to manually create the threads to run the tasks, and decide on which thread the task will run.

Things become much simpler if we use the std::async() function, also provided by the library. It takes as input a lambda or functor and it returns a future that will contain the return value. This is the version of copyFile modified to use std::async():

size_t async_copyFile(const string& inFile, const string& outFile) 
{ 
    auto fut1 = async(readFile, inFile); 
    auto fut2 = async([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }, 
    outFile); 

    return fut2.get(); 
} 

In a way std::async() is the equivalent of TPL task schedulers. It decides where to run the task, if a new thread needs to be created or if an old (or even the current) thread can be reused.

It is also possible to specify a launch policy, which can be either “async” (which requires to execute the task asynchronously, possibly in a different thread) or “deferred” (which asks to execute the task only at the moment when get() is called).

The nice thing is that std::async hides all the implementation, platform specific details for us. Examining the <future> header file that comes with VS2013 we can see that the Windows implementation of std::async internally uses the Parallel Patterns Library (PPL), the native equivalent of .NET TPL.

PPL

What we have seen so far is what has been standardized in C++11. It is fair to say that the design of futures and promises is still quite limited, especially if compared with what is provided by C# and .NET.

The main limitation is that in C++11 futures are not composable. If we start several tasks to execute computations in parallel, we cannot block on all the futures, waiting for any of them to complete, but only on one future at a time. Also, there is no easy way to combine a set of tasks into a sequence, which each task that consumes as input the result of the previous task. Composable tasks allow to make the whole architecture non-blocking and event-driven. We really would like to have also in C++ something like task continuations or the async/await pattern.

With the PPL (aka Concurrency Runtime) Microsoft had the possibility of overcoming the constraints of the standards and to experiment with a more sophisticated implementation of a task library.

In the PPL, class Concurrency::task<T> (defined in the <ppltasks.h> header) represents a task. A task is the equivalent of a future; it also provides the same blocking method get() to retrieve the result. The type parameter T is the return type, and the task is initialized passing a work function (either a lambda or a function pointer or a function object).

So, let’s abandon all concerns of portability for a moment and let’s re-implement our copyFile function, this time with tasks:

size_t ppl_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<vector<char>> tsk1 = Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }); 
    Concurrency::task<size_t> tsk2 = Concurrency::create_task([&tsk1, outFile]() { 
        return writeFile(tsk1.get(), outFile); 
    }); 
    return tsk2.get(); 
} 

Here we have created two task objects, initialized with two lambda expressions, for the read and write operations.

Now we really don’t have to worry about threads anymore; it’s up to the PPL schedule to decide where to run the tasks and to manage a thread pool. Note however that we are still manually coordinating the interaction of our two tasks: task2 keeps a reference to the first task, and explicitly waits for task1 to terminate before using its result. This is acceptable in a very simple example like this, but it could become quite cumbersome when we deal with more tasks and with more complicated code.

Task continuations

Unlike futures, PPL tasks support composition through continuations. The task::next method allows to add a continuation task to a task; the continuation will be invoked when its antecedent task completes and will receive the value returned by the antecedent task.

So, let’s rewrite the copyFile function again, but this time using a continuation:

size_t ppl_then_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<size_t> result =  
    Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
 
    return result.get(); 
} 

Now the code is really clean. We have split the logic of a copy function into two separate components (tasks) that can run in any thread and will be run by a task scheduler. And we have declared the logic of our function as the dependency graph of the tasks.

In this implementation the copyFile function still blocks, at the end, to get the final value, but in a real program it could just return a task that we would insert in the logic of our application, attaching to it a continuation to asynchronously handle its value. We would have code like this:

Concurrency::task<size_t> ppl_create_copyFile_task(const string& inFile, const string& outFile) 
{ 
    return Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
} 
... 
auto tCopy = ppl_create_copyFile_task(inFile, outFile).then([](size_t written) { 
    cout << written << endl; 
}); 
... 
tCopy.wait(); 

Finally, PPL tasks also provide other ways to compose tasks with the methods task::wait_all and task::wait_any, useful to manage a set of tasks that run in parallel.

Given a set of tasks, when_all creates another task that completes when all the tasks complete (so, it implements a join). Instead when_any creates a task that completes when one of the tasks in the set completes; this can be useful for example to limit the number of concurrent tasks, and start a new one only when another completes.

But this is just the tip of the PPL iceberg… the PPL offers a rich set of functionalities, almost equivalent to what is available in its managed version. It also provides scheduler classes that perform the important duty of managing a pool of worker threads and allocating tasks to threads. More details can be found here.

Towards C++17

Hopefully we could soon see some of the improvements introduced by PPL in the C++ standard. There is already a document (N3857) written by Niklas Gustaffson et al. that proposes a few changes to the library. In particular, the idea is to enable the composability of future with future::then, future::when_any and future::when_all, with the same semantic seen in the PPL. The previous example, with the new futures, would be rewritten in a portable-way like this:

future<size_t> future_then_copyFile(const string& inFile, const string& outFile)
{
	return async([inFile]() {
		return readFile(inFile);
	}).then([outFile](const vector<char>& buffer) {
		return writeFile(buffer, outFile);
	});
}

There would also be a method future::is_ready to test the future for completion before calling the blocking get(), and future::unwrap to manage nested futures (futures that returns future); of course all the details of this proposal can be found in the document above.

Would this be the perfect solution? Not quite. Out experience with .NET teaches us that task-based asynchronous code is still hard to write, debug, and maintain. Sometimes very hard. This is the reason why new keywords were added to the C# language itself, in the version 5.0, to more easily manage asynchrony through the Async/Await pattern.
Is there anything similar brewing even for the unmanaged world of C++?