Generator functions in C++

In the previous post we had a look at the proposal of introducing resumable functions into the C++ standard to support writing asynchronous code modeled on the C# async/await pattern.

We saw that it is already possible to experiment with the future resumable and await keywords in Visual Studio, by installing the latest November 2013 CTP. But the concept of resumable functions is not limited to asynchrony; in this post we’ll see how it can be expanded to support generator functions.

Generator functions and lazy evaluation

In several languages, like C# and Python, generator functions provide the ability of lazily producing the values in a sequence only when they are needed. In C# a generator (or iterator) is a method that contains at least one yield statement and that returns an IEnumerable<T>.

For example, the following C# code produces the sequence of Fibonacci numbers (1, 1, 2, 3, 5, 8, 13, 21, …):

IEnumerable<T> Fibonacci()
{
    int a = 0;
    int b = 1;
    while (true) {
        yield return b;
        int tmp = a + b;
        a = b;
        b = tmp;
    }
}

A generator acts in two phases. When it is called, it just sets up a resumable function, preparing for its execution, and returns some enumerator (in the case of C#, an IEnumerable<T>). But the actual execution is deferred to the moment when the values are actually enumerated and pulled from the sequence, for example with a foreach statement:

foreach (var num in Fibonacci())
{
    Console.WriteLine("{0}", num);
}

Note that the returned sequence is potentially infinite; its enumeration could go on indefinitely (if we ignore the integer overflows).

Of course there is nothing particularly special about doing the same thing in C++. While STL collections are usually eagerly evaluated (all their values are produced upfront) it is not difficult to write a collection that provides iterators that calculate their current value on the spot, on the base of some state or heuristic.

What gives a particular expressive power to generators is the ability to pause the execution each time a new value is generated, yielding control back to the caller, and then to resume the execution exactly from the point where it had suspended. A generator is therefore a special form of coroutine, limited in the sense that it may only yield back to its caller.

The yield statement hides all the complexity inherent in the suspension and resumption of the function; the developer can express the logic of the sequence plainly, without having to setup callbacks or continuations.

From resumable functions to generators (and beyond)

It would be nice to bring the expressive power of generators to our good old C++, and naturally there is already some work going on for this. In this proposal Gustaffson et al. explain how generator functions could be supported by the language as an extension of resumable functions, making it possible to write code like:

sequence<int> fibonacci() resumable
{
    int a = 0;
    int b = 1;
    while (true)
    {
        yield b;
        int tmp = a + b;
        a = b;
        b = tmp;
    }
}

Here, the proposal introduces two new concepts, the type sequence<T> and the yield keyword.

–        sequence<T> is a (STL-like) collection that only supports iteration and only provides an input iterator.

–        The yield statement suspends the execution of the function and returns one item from the sequence to the caller.

In C# terms, sequence<T> and its iterator are respectively the equivalent of an IEnumerable<T> and IEnumerator<T>. But while the C# generators are implemented with a state machine, in C++ the suspension and resumption would be implemented, as we’ll see, with stackful coroutines.

Once we had a lazy-evaluated sequence<T> we could write client code to pull a sequence of values, which would be generated one at the time, and only when requested:

sequence<int> fibs = fibonacci();
for (auto it = fibs.begin(); it != fibs.end(); ++it)
{
    std::cout << *it << std::endl;
}

In C++11 we could also simplify the iteration with a range-based for loop:

sequence<int> fibs = fibonacci();
for (auto it : fibs)
{
    std::cout << *it << std::endl;
}

More interestingly, we could define other resumable functions that manipulate the elements of a sequence, lazily producing another sequence. This example, taken from Gustaffson’s proposal, shows a lazy version of std::transform():

template<typename Iter>
sequence<int> lazy_tform(Iter beg, Iter end, std::function<int(int)> func) resumable
{
    for (auto iter = beg; iter != end; ++iter)
    {
        yield func(*iter);
    }
}

Moving further with this idea, we could pull another page out of the C# playbook and enrich the sequence class with a whole set of composable, deferred query operators, a la LINQ:

template <typename T>
class sequence
{
public:
    template <typename Predicate> bool all(Predicate predicate);
    [...]
    static sequence<int> range(int from, int to);
    template <typename TResult> sequence<TResult> select(std::function<TResult(T)> selector);
    sequence<T> take(int count);
    sequence<T> where(std::function<bool(T)> predicate);
};

Lazy sequences

Certainly, resumable generators would be a very interesting addition to the standard. But how would they work? We saw that the Visual Studio CTP comes with a first implementation of resumable functions built over the PPL task library, but in this case the CTP is of little help, since it does not support generator functions yet. Maybe they will be part of a future release… but why to wait? We can implement them ourselves! 🙂

In the rest of this post I’ll describe a possible simple implementation of C++ lazy generators.

Let’s begin with the lazy sequence<T> class. This is a STL-like collection which only needs to support input iterators, with a begin() and an end() method.

Every instance of this class must somehow be initialized with a functor that represents the generator function that will generate the values of the sequence. We’ll see later what can be a good prototype for it.

As we said, the evaluation of this function must be deferred to the moment when the values are retrieved, one by one, via the iterator. All the logic for executing, suspending and resuming the generator will actually be implemented by the iterator class, which therefore needs to have a reference to the same functor.

So, our first cut at the sequence class could be something like this:

template<typename T>
class sequence_iterator
{
    // TO DO
};
template<typename T>
class sequence
{
public:
    typedef typename sequence_iterator<T> iterator;
    typedef ??? functor;

    sequence(functor func) : _func(func) { }
    iterator begin() {
        return iterator(_func);
    }
    iterator end() {
        return iterator();
    }

private:
    functor _func;
};

Step by step

The sequence<T> class should not do much more than create iterators. The interesting code is all in the sequence iterator, which is the object that has the ability to actually generate the values.

Let’s go back to our Fibonacci generator and write some code that iterates through it:

sequence<int> fibonacci() resumable
{
    int a = 0;
    int b = 1;
    while (true)
    {
        yield b;
        int tmp = a + b;
        a = b;
        b = tmp;
    }
}

auto fibs = fibonacci();
for (auto it : fibs)
{
    std::cout << *it << std::endl;
}

How should this code really work? Let’s follow its execution step by step.

  1. First, we call the function fibonacci(), which returns an object of type sequence<int>. Note that at this point the execution of the function has not even started yet. We just need to return a sequence object somehow associated to the body of the generator, which will be executed later.
  2. The returned sequence is copied into the variable fibs. We need to define what does it mean to copy a sequence: should we allow copy operations? Should we enforce move semantic?
  3. Given the sequence fibs, we call the begin() method which returns an iterator “pointing ” to the first element of the sequence. The resumable function should start running the moment the iterator is created and execute until a first value is yielded (or until it completes, in case of empty sequences).
  4. When the end() method is called, the sequence returns an iterator that represents the fact that the generator has completed and there are no more values to enumerate.
  5. The operator == () should behave as expected, returning true if both iterators are at the same position of the same sequence, or both pointing at the end of the sequence.
  6. The operator *() will return the value generated by the last yield statement (i.e., the current value of the sequence).
  7. At each step of the iteration, when operator ++() is called, the execution of the generator function will be resumed, and will continue until either the next yield statement updates the current value or until the function returns.

Putting all together, we can begin to write some code for the sequence_iterator class:

template<typename T>
class sequence_iterator
{
public:
    typedef ??? functor;

    sequence_iterator(functor func) {
        // initializes the iterator from the generator functors, executes the functors
        // until it terminates or yields.
    }
    sequence_iterator() : _func(func) {
        // must represent the end of the sequence
    }
    bool operator == (const sequence_iterator& rhs) {
        // true if the iterators are at the same position.
    }
    bool operator != (const sequence_iterator& rhs) {
        return !(*this==rhs);
    }
    const T& operator * () const {
        return _currentVal;
    }
    sequence_iterator operator ++ () {
        // resume execution
        return *this;
    }

private:
    T _currentVal;
};

The behavior of the iterator is fairly straightforward, but there are a few interesting things to note. The first is that evidently a generator function does not do what it says: looking at the code of the fibonacci() function there is no statement that actually returns a sequence<T>; what the code does is simply to yield the sequence elements, one at the time.

So who creates the sequence<T> object? Clearly, the implementation of generators cannot be purely library-based. We can put in a library the code for the sequence<T> and for its iterators, we can also put in a library the platform-dependent code that manages the suspension and resumptions of generators. But it will be up to the compiler to generate the appropriate code that creates a sequence<T> object for a generator function. More on this later.

Also, we should note that there is no asynchrony or concurrency involved in this process. The function could resume in the same thread where it suspended.

Generators as coroutines

The next step is to implement the logic to seamlessly pause and resume a generator. A generator can be seen as an asymmetric coroutine, where the asymmetry lies in the fact that the control can be only yielded back to the caller, contrary to the case of symmetric coroutines that can yield control to any other coroutine at any time.

Unfortunately coroutines cannot be implemented in a platform-independent way. In Windows we can use Win32 Fibers (as I described in this very old post) while on POSIX, you can use the makecontext()/swapcontext() API. There is also a very nice Boost library that we could leverage for this purpose.

But let’s ignore the problems of portability, for the moment, and assume that we have a reliable way to implement coroutines. How should we use them in an iterator? We can encapsulate the non-portable code in a class __resumable_func that exposes this interface:

template <typename TRet>
class __resumable_func
{
    typedef std::function<void(__resumable_func&)> TFunc;

public:
    __resumable_func(TFunc func);

    void yieldReturn(const TRet& value);
    void yieldBreak();
    void resume();

    const TRet& getCurrent() const;
    bool isEos() const;
}

The class is templatized on the type of the values produced by the generator and provides methods to yield one value (yieldReturn()), to retrieve the current value (i.e., the latest value yielded) and to resume the execution and move to the next value.

It should also provide methods to terminate the enumeration (yieldBreak()) and to tell if we have arrived at the end of the sequence (isEos()).

The function object passed to the constructor represents the generator function itself that we want to run. More precisely, it is the function that will be executed as a coroutine, and its prototype tells us that this function, in order to be able to suspend execution, needs a reference to the __resumable_func object that is running the coroutine itself.

In fact the compiler should transform the code of a generator into the (almost identical) code of a lambda that uses the __resumable_func object to yield control and emit a new value.

For example, going back again to our fibonacci() generator, we could expect the C++ compiler to transform the code we wrote:

sequence<int> fibonacci() resumable
{
    int a = 0;
    int b = 1;
    while (true)
    {
        yield b;
        int tmp = a + b;
        a = b;
        b = tmp;
    }
}

into this lambda expression:

auto __fibonacci_func([](__resumable_func<int>& resFn) {
    int a = 0;
    int b = 1;
    while (true)
    {
        resFn.yieldReturn(b);
        int tmp = a + b;
        a = b;
        b = tmp;
    }
});

where the yield statement has been transformed into a call to __resumable_func::yieldReturn().

Likewise, client code that invokes this function, like:

sequence<int> fibs = fibonacci();

should be transformed by the compiler into a call to the sequence constructor, passing this lambda as argument:

sequence<int> fibs(__fibonacci_func);

Sequence iterators

We can ignore the details of the implementation of __resumable_func<T> coroutines for the moment and, assuming that we have them working, we can now complete the implementation of the sequence_iterator class:

template <typename T>
class sequence_iterator
{
    std::unique_ptr<__resumable_func<T>> _resumableFunc;

    sequence_iterator() :
        _resumableFunc(nullptr)
    {
    }

    sequence_iterator(const std::function<void(__resumable_func<T>&)> func) :
        _resumableFunc(new __resumable_func<T>(func))
    {
    }

    sequence_iterator(const sequence_iterator& rhs) = delete;
    sequence_iterator& operator = (const sequence_iterator& rhs) = delete;
    sequence_iterator& operator = (sequence_iterator&& rhs) = delete;

public:
    sequence_iterator(sequence_iterator&& rhs) :
        _resumableFunc(std::move(rhs._resumableFunc))
    {
    }

    sequence_iterator& operator++()
    {
        _ASSERT(_resumableFunc != nullptr);
        _resumableFunc->resume();
        return *this;
    }

    bool operator==(const sequence_iterator& _Right) const
    {
        if (_resumableFunc == _Right._resumableFunc) {
            return true;
        }

        if (_resumableFunc == nullptr) {
            return _Right._resumableFunc->isEos();
        }

        if (_Right._resumableFunc == nullptr) {
            return _resumableFunc->isEos();
        }

        return (_resumableFunc->isEos() == _Right._resumableFunc->isEos());
    }

    bool operator!=(const sequence_iterator& _Right) const
    {
        return (!(*this == _Right));
    }

    const T& operator*() const
    {
        _ASSERT(_resumableFunc != nullptr);
        return (_resumableFunc->getCurrent());
    }
};

The logic here is very simple. Internally, a sequence_iterator contains a __resumable_func object, to run the generator as a coroutine. The default constructor creates an iterator that points at the end of the sequence. Another constructor accepts as argument the generator function that we want to run and starts executing it in a coroutine and the function will run until either it yields a value or terminates, giving the control back to the constructor. In this way we create an iterator that points at the beginning of the sequence.

If a value was yielded, we can call the dereference-operator to retrieve it from the __resumable_func object. If the function terminated, instead, the iterator will already point at the end of the sequence. The equality operator takes care of equating an iterator whose function has terminated to the end()-iterators created with the default constructor. Incrementing the iterator means resuming the execution of the coroutine, from the point it had suspended, giving it the opportunity to produce another value.

Note that, since the class owns the coroutine object, we disable copy constructors and assignment operators and only declare the move constructor, to pass the ownership of the coroutine.

Composable sequence operators

Almost there! We have completed our design, but there are still a few details to work out. The most interesting are related to the lifetime and copyability of sequence objects. What should happen with code like this?

sequence<int> fibs1 = fibonacci();
sequence<int> fibs2 = fibs1;
for (auto it1 : fibs1) {
    for (auto it2 : fibs2) {
        ...
    }
}

If we look at how we defined class sequence<T>, apparently there is no reason why we should prevent the copy of sequence objects. In fact, sequence<T> is an immutable class. Its only data member is the std::function object that wraps the functor we want to run.

However, even though we don’t modify this functor object, we do execute it. This object could have been constructed from a lambda expression that captured some variables, either by value or by reference. Since one of the captured variables could be a reference to the same sequence<T> object that created that iterator, we need to ensure that the sequence object will always outlive its functors, and allowing copy-semantics suddenly becomes complicated.

This brings us to LINQ and to the composability of sequences. Anyone who has worked with C# knows that what makes enumerable types truly powerful and elegant is the ability to apply chains of simple operators that transform the elements of a sequence into another sequence. LINQ to Objects is built on the concept of a data pipeline: we start with a data source which implements IEnumerable<T>, and we can compose together a number of query operators, defined as extension methods to the Enumerable class.

For example, this very, very useless query in C# generates the sequence of all square roots of odd integers between 0 and 10:

var result = Enumerable.Range(0, 10)
    .Where(n => n%2 == 1)
    .Select(n => Math.Sqrt(n));

Similarly, to make the C++ sequence<T> type really powerful we should make it composable and enrich it with a good range of LINQ-like operators to generate, filter, aggregate, group, sort and generally transform sequences.

These are just a few of the operators that we could define in the sequence<T> class:

template <typename T>
class sequence
{
public:
    [...]
    static sequence<int> range(int from, int to);
    template <typename TResult> sequence<TResult> select(std::function<TResult(T)> selector);
    sequence<T> where(std::function<bool(T)> predicate);
};

to finally be able to write the same (useless) query:

sequence<double> result = sequence<int>::range(0, 10)
    .where([](int n) { return n => n%2 == 1; })
    .select([](int n) { return sqrt(n); });

Let’s try to implement select(), as an experiment. It is conceptually identical to the lazy_tform() method  we saw before, but now defined in the sequence class. A very naïve implementation could be as follows:

// Projects each element of a sequence into a new form. (NOT WORKING!)
template <typename TResult>
sequence<TResult> select(std::function<TResult(T)> selector)
{
    auto func = [this, selector](__resumable_func<T>& rf) {
        for (T t : *this)
        {
            auto val = selector(t);
            rf.yieldReturn(val);
        }
    };
    return sequence<TResult>(func);
}

It should be now clear how it works: first we create a generator functor, in this case with a lambda expression, and then we return a new sequence constructed on this functor. The point is that the lambda needs to capture the “parent” sequence object to be able to iterate through the values of its sequence.

Unfortunately this code is very brittle. What happens when we compose more operators, using the result of one as the input of the next one in the chain? When we write:

sequence<double> result = sequence<int>::range(0, 10)
    .where([](int n) { return n => n%2 == 1; })
    .select([](int n) { return sqrt(n); });

there are (at least) three temporary objects created here, of type sequence<T>, and their lifetime is tied to that of the expression, so they are deleted before the whole statement completes.

A chain of sequences

The situation is like in the figure: the functor of each sequence in the chain is a lambda that has captured a pointer to the previous sequence object. The problem is in the deferred execution: nothing really happens until we enumerate the resulting sequence through its iterator, but as soon as we do so each sequence starts pulling values from its predecessor, which has already been deleted.

Temporary objects and deferred execution really do not get along nicely at all. On one hand in order to compose sequences we have to deal with temporaries that can be captured in a closure and then deleted long before being used. On the other hand, the sequence iterators, and their underlying coroutines, should not be copied and can outlive the instance of the sequence that generated them.
We can enforce move semantics on the sequence<T> class, but then what do we capture in a generator like select() that acts on a sequence?

As often happens, a possible solution requires adding another level of indirection. We introduce a new class, sequence_impl<T>, which represents a particular application of a generator function closure:

template <typename T>
class sequence_impl
{
public:
    typedef std::function<void(__resumable_func<T>&)> functor;

private:
    const functor _func;

    sequence_impl(const sequence_impl& rhs) = delete;
    sequence_impl(sequence_impl&& rhs) = delete;
    sequence_impl& operator = (const sequence_impl& rhs) = delete;
    sequence_impl& operator = (sequence_impl&& rhs) = delete;

public:
    sequence_impl(const functor func) : _func(std::move(func)) {}

    sequence_iterator<T> begin() const
    {
        // return iterator for beginning of sequence
        return iterator(_func);
    }
    sequence_iterator<T> end() const
    {
        // return iterator for end of sequence
        return iterator();
    }
};

A sequence_impl<T> is neither copiable nor movable and only provides methods to iterate through it.

The sequence<T> class now keeps only a shared pointer to the unique instance of a sequence_impl<T> that represents that particular application of the generator function. Now we can support chained sequences by allowing move semantics on the sequence<T> class.

template <typename T>
class sequence
{
    std::shared_ptr<sequence_impl<T>> _impl;

    sequence(const sequence& rhs) = delete;
    sequence& operator = (const sequence& rhs) = delete;

public:
    typedef typename sequence_impl<T>::iterator iterator;
    typedef typename sequence_impl<T>::functor functor;

    sequence(functor func) {
        _impl(std::make_shared<sequence_impl<T>>(func))
    }
    sequence(sequence&& rhs) {
        _impl = std::move(rhs._impl);
    }
    sequence& operator = (sequence&& rhs) {
        _impl = std::move(rhs._impl);
    }

    iterator begin() const {
        return _impl->begin();
    }
    iterator end() const {
        return _impl->end();
    }
};

The diagram below illustrates the relationships between the classes involved in the implementation of lazy sequences:

genFunc2

LINQ operators

Ok, now we have really almost done. The only thing left to do, if we want, is to write a few sequence-manipulation operators, modeled on the example of the LINQ-to-objects. I’ll list just a few, as example:

// Determines whether all elements of a sequence satisfy a condition.
bool all(std::function<bool(T)> predicate)
{
    if (nullptr == predicate) {
        throw std::exception();
    }

    for (auto t : *_impl)
    {
        if (!predicate(t)) {
            return false;
        }
    }
    return true;
}

// Returns an empty sequence
static sequence<T> empty()
{
    auto fn = [](__resumable_func<T>& rf) {
        rf.yieldBreak();
    };
    return sequence<T>(fn);
}

// Generates a sequence of integral numbers within a specified range [from, to).
static sequence<int> range(int from, int to)
{
    if (to < from) {
        throw std::exception();
    }

    auto fn = [from, to](__resumable_func<T>& rf) {
        for (int i = from; i < to; i++) {
            rf.yieldReturn(i);
        }
    };
    return sequence<int>(fn);
}

// Projects each element of a sequence into a new form.
template <typename TResult>
sequence<TResult> select(std::function<TResult(T)> selector)
{
    if (nullptr == selector) {
        throw std::exception();
    }

    std::shared_ptr<sequence_impl<T>> impl = _impl;
    auto fn = [impl, selector](__resumable_func<T>& rf) {
        for (T t : *impl)
        {
            auto val = selector(t);
            rf.yieldReturn(val);
        }
    };
    return sequence<TResult>(fn);
}

// Returns a specified number of contiguous elements from the start of a sequence.
sequence<T> take(int count)
{
    if (count < 0) {
        throw std::exception();
    }

    std::shared_ptr<sequence_impl<T>> impl = _impl;
    auto fn = [impl, count](__resumable_func<T>& rf) {
        auto it = impl->begin();
        for (int i = 0; i < count && it != impl->end(); i++, ++it) {
            rf.yieldReturn(*it);
        }
    };
    return sequence<T>(fn);
}

// Filters a sequence of values based on a predicate.
sequence<T> where(std::function<bool(T)> predicate)
{
    if (nullptr == predicate) {
        throw std::exception();
    }

    std::shared_ptr<sequence_impl<T>> impl = _impl;
    auto fn = [impl, predicate](__resumable_func<T>& rf) {
        for (auto item : *impl)
        {
            if (predicate(item)) {
                rf.yieldReturn(item);
            }
        }
    };
    return sequence<T>(fn);
}

We could write many more, but I think these should convey the idea.

Example: a prime numbers generator

As a final example, the following query lazily provides the sequence of prime numbers (smaller than INT_MAX), using a very simple, brute-force algorithm. It is definitely not the fastest generator of prime numbers, it’s maybe a little cryptic, but it’s undoubtedly quite compact!

sequence<int> primes(int max)
{
    return sequence<int>::range(2, max)
        .where([](int i) {
            return sequence<int>::range(2, (int)sqrt(i) + 2)
                .all([i](int j) { return (i % j) != 0; });
            });
}

Conclusion

In this article I rambled about generators in C++, describing a new sequence<T> type that model lazy enumerators and that could be implemented as an extension of resumable functions, as specified in N3858. I have described a possible implementation based on coroutines and introduced the possibility of extending the sequence class with a set of composable operators.

If you are curious and want to play with my sample implementation, you can find a copy of the sources here. Nothing too fancy, just the code that I showed in this post.

Appendix – Coroutines in Win32

Having completed my long ramble on the “platform independent” aspects of C++ generators, it’s time to go back to the point we left open: how to implement, on Windows, the coroutines that we encapsulated in the __resumable_func class?

We saw that the Visual Studio CTP comes with a first implementation of resumable functions, built over the PPL task library and using Win32 fibers as side-stacks. Even though the CTP does not support generator functions yet, my first idea was to just extend the <pplawait.h> library to implement them. However the code there is specialized for resumable functions that suspend awaitingfor some task, andit turns out that we can reuse only part of their code because, even if we are still dealing with resumable functions, the logic of await and yield are quite different.

In the case of await, functions can be suspended (possibly multiple times) waiting for some other task to complete. This means switching to a fiber associated to the task after having set up a continuation that will be invoked after the task completes, to switch the control back to the current fiber. When the function terminates, the control goes back to the calling fiber, returning the single return value of the async resumable function.

In the case of yield, we never suspend to call external async methods. Instead, we can suspend multiple times going back to the calling fiber, each time by returning one of the values that compose the sequence. So, while the implementation of the await keyword needs to leverage the support of PPL tasks, the concept of generator functions does not imply any concurrency or multithreading and using the PPL is not necessary.

Actually, there are ways to implement yield with await) but I could not find a simple way to use the new __await keyword without spawning new threads (maybe this could be possible with a custom PPL scheduler?)

So I chose to write the code for coroutines myself; the idea here is not very different from the one I described in a very old post (it looks like I keep rewriting the same post :-)) but now I can take advantage of the fiber-based code from the CTP’s <pplawait.h> library.

Win32 Fibers

Let’s delve into the details of the implementation.  Before all, let me summarize once again the Win32 Fiber API.

Fibers were added to Windows NT to support cooperative multitasking. They can be thought as lightweight threads that must be manually scheduled by the application. In other words, fibers are a perfect tool to implement coroutines sequencing.

When a fiber is created, with CreateFiber, it is passed a fiber-start function. The OS then assigns it a separate stack and sets up execution to begin at this fiber-start function. To schedule a fiber we need to “switch” to it manually with SwitchToFiber and once it is running, a fiber can then suspend itself only by explicitly yielding execution to another fiber, also by calling SwitchToFiber.

SwitchToFiber only works from a fiber to another, so the first thing to do is to convert the current thread into a fiber, with ConvertThreadToFiber. Finally, when we have done using fibers, we can convert the main fiber back to a normal thread with ConvertFiberToThread.

The __resumable_func class

We want to put all the logic to handle the suspension and resumption of a function in the __resumable_func<T> class, as described before.

In our case we don’t need symmetric coroutines; we just need the ability of returning control to the calling fiber. So our class will contain a handle to the “caller” fiber and a handle to the fiber we want to run.

#include <functional>
#include <pplawait.h>

template <typename TRet>
class __resumable_func : __resumable_func_base
{
    typedef std::function<void(__resumable_func&)> TFunc;

    TFunc _func;
    TRet _currentValue;
    LPVOID _pFiber;
    LPVOID _pCallerFiber;
    Concurrency::details::__resumable_func_fiber_data* _pFuncData;

public:
    __resumable_func(TFunc func);
    ~__resumable_func();

    void yieldReturn(TRet value);
    void yieldBreak();
    void resume();

    const TRet& getCurrent() const const { return _currentValue; }
    bool isEos() const { return _pFiber == nullptr; }

private:
    static void yield();
    static VOID CALLBACK ResumableFuncFiberProc(PVOID lpParameter);
};

The constructor stores a copy of the generator function to run, creates a new fiber object specifying ResumableFuncFiberProc as the function to execute, and immediately switches the execution to this fiber:

    __resumable_func(TFunc func) :
        _currentValue(TRet()),
        _pFiber(nullptr),
        _func(func),
        _pFuncData(nullptr)
    {
        // Convert the current thread to a fiber. This is needed because the thread needs to "be"
        // a fiber in order to be able to switch to another fiber.
        ConvertCurrentThreadToFiber();
        _pCallerFiber = GetCurrentFiber();

        // Create a new fiber (or re-use an existing one from the pool)
        _pFiber = Concurrency::details::POOL CreateFiberEx(Concurrency::details::fiberPool.commitSize,
            Concurrency::details::fiberPool.allocSize, FIBER_FLAG_FLOAT_SWITCH, &ResumableFuncFiberProc, this);
        if (!_pFiber) {
            throw std::bad_alloc();
        }

        // Switch to the newly created fiber. When this "returns" the functor has either returned,
        // or issued an 'yield' statement.
        ::SwitchToFiber(_pFiber);

        _pFuncData->suspending = false;
        _pFuncData->Release();
    }

The fiber will start from the fiber procedure, which has the only task of running the generator function in the context of the fiber:

    // Entry proc for the Resumable Function Fiber.
    static VOID CALLBACK ResumableFuncFiberProc(PVOID lpParameter)
    {
        LPVOID threadFiber;

        // This function does not formally return, due to the SwitchToFiber call at the bottom.
        // This scope block is needed for the destructors of the locals in this block to fire
        // before we do the SwitchToFiber.
        {
            Concurrency::details::__resumable_func_fiber_data funcDataOnFiberStack;
            __resumable_func* pThis = (__resumable_func*)lpParameter;

            // The callee needs to setup some more stuff after we return (which would be either on
            // yield or an ordinary return). Hence the callee needs the pointer to the func_data
            // on our stack. This is not unsafe since the callee has a refcount on this structure
            // which means the fiber will continue to live.
            pThis->_pFuncData = &funcDataOnFiberStack;

            Concurrency::details::POOL SetFiberData(&funcDataOnFiberStack);

            funcDataOnFiberStack.threadFiber = pThis->_pCallerFiber;
            funcDataOnFiberStack.resumableFuncFiber = GetCurrentFiber();

            // Finally calls the function in the context of the fiber. The execution can be
            // suspended by calling yield
            pThis->_func(*pThis);

            // Here the function has completed. We set return to true meaning this is the
            // final 'real' return and not one of the 'yield' returns.
            funcDataOnFiberStack.returned = true;
            pThis->_pFiber = nullptr;
            threadFiber = funcDataOnFiberStack.threadFiber;
        }

        // Return to the calling fiber.
        ::SwitchToFiber(threadFiber);

        // On a normal fiber this function won't exit after this point. However, if the fiber is
        // in a fiber-pool and re-used we can get control back. So just exit this function, which
        // will cause the fiber pool to spin around and re-enter.
    }

There are two ways to suspend the execution of the generator function running in the fiber and to yield control back to the caller. The first is to yield a value, which will be stored in a data member:

    void yieldReturn(TRet value)
    {
        _currentValue = value;
        yield();
    }

The second is to immediately terminate the sequence, for example with a return statement or reaching the end of the function. The compiler should translate a return into a call to the yieldBreak method:

void yieldBreak()
{
    _pFiber = nullptr;
    yield();
}

To yield the control we just need to switch back to the calling fiber:

    static void yield()
    {
        _ASSERT(IsThreadAFiber());
        Concurrency::details::__resumable_func_fiber_data* funcDataOnFiberStack =
            Concurrency::details::__resumable_func_fiber_data::GetCurrentResumableFuncData();

        // Add-ref's the fiber. Even though there can only be one thread active in the fiber
        // context, there can be multiple threads accessing the fiber data.
        funcDataOnFiberStack->AddRef();

        _ASSERT(funcDataOnFiberStack);
        funcDataOnFiberStack->verify();

        // Mark as busy suspending. We cannot run the code in the 'then' statement
        // concurrently with the await doing the setting up of the fiber.
        _ASSERT(!funcDataOnFiberStack->suspending);
        funcDataOnFiberStack->suspending = true;

        // Make note of the thread that we're being called from (Note that we'll always resume
        // on the same thread).
        funcDataOnFiberStack->awaitingThreadId = GetCurrentThreadId();

        _ASSERT(funcDataOnFiberStack->resumableFuncFiber == GetCurrentFiber());

        // Return to the calling fiber.
        ::SwitchToFiber(funcDataOnFiberStack->threadFiber);
    }

Once we have suspended, incrementing the iterator will resume the execution by calling resume, which will switch to this object’s fiber:

    void resume()
    {
        _ASSERT(IsThreadAFiber());
        _ASSERT(_pFiber != nullptr);
        _ASSERT(_pFuncData != nullptr);
        _ASSERT(!_pFuncData->suspending);
        _ASSERT(_pFuncData->awaitingThreadId == GetCurrentThreadId());

        // Switch to the fiber. When this "returns" the functor has either returned, or issued
        // an 'yield' statement.
        ::SwitchToFiber(_pFiber);

        _ASSERT(_pFuncData->returned || _pFuncData->suspending);
        _pFuncData->suspending = false;
        if (_pFuncData->returned) {
            _pFiber = nullptr;
        }
        _pFuncData->Release();
    }

The destructor just needs to convert the current fiber back to a normal thread, but only when there are no more fibers running in the thread. For this reason we need to keep a per-thread fiber count, which is incremented every time we create a __resumable_funcand decremented every time we destroy it.

~__resumable_func()
{
    if (_pCallerFiber != nullptr) {
        ConvertFiberBackToThread();
    }
}

class __resumable_func_base
{
    __declspec(thread) static int ts_count;

protected:
    // Convert the thread to a fiber.
    static void ConvertCurrentThreadToFiber()
    {
        if (!IsThreadAFiber())
        {
            // Convert the thread to a fiber. Use FIBER_FLAG_FLOAT_SWITCH on x86.
            LPVOID threadFiber = ConvertThreadToFiberEx(nullptr, FIBER_FLAG_FLOAT_SWITCH);
            if (threadFiber == NULL) {
                throw std::bad_alloc();
            }
            ts_count = 1;
        }
        else
        {
            ts_count++;
        }
    }

    // Convert the fiber back to a thread.
    static void ConvertFiberBackToThread()
    {
        if (--ts_count == 0)
        {
            if (ConvertFiberToThread() == FALSE) {
                throw std::bad_alloc();
            }
        }
    }
};
__declspec(thread) int __resumable_func_base::ts_count = 0;

And this is all we need to have resumable generators in C++, on Windows. The complete source code can be found here.

Async-Await in C++

In the previous post we had a quick look at the problem of writing concurrent, asynchronous code with C++. We saw that the <future> library, introduced with C++11, offers a partial solution to this problem by separating the act of initiating an operation from the act of waiting for its result. But we saw that the act of waiting for a result is still synchronous, and that std:futures are not easily composable.

We talked of the platform-specific improvements offered by Microsoft’s PPL tasks, which have become the model for a few interesting changes proposed to the C++ standard library.

But are these improvements enough?

The problems with Tasks

Having composable futures would certainly be a big improvement for C++. But the experience with C# tells us that it’s not a perfect solution. A purely library-based solution is bound to have limitations. In particular, .NET’s TPL has the defect of making the code overly complex.

It is difficult to write robust, readable and debuggable code with TPL. It is too easy to end up with spaghetti code: a large number of Task<T>s interconnected in inextricable chains of continuations.

I found out the hard way, when I was given, by my last employer, a .NET component to maintain which had been architected as an incredibly convoluted graph of interconnected TPL Tasks, often joined with conditional ContinueWith().

Loops and conditionals

It is particularly difficult to write iterations and branches with tasks.

In my previous post I used as example a copyFile function that asynchronously read all the file content into a single string and copied it into another file. Of course such an implementation would not be practical with very large files, so let’s rewrite it so that it will read and write a file chunk by chunk.

Here I am using the PPL, but very similar code could be written with std::future (especially when they will support task continuations with future::then).

We can easily write an iterative function that use tasks to read and write asynchronously every chunk of a file, but that still blocks waiting for these tasks to complete:

Concurrency::task<string> readFileChunk(ifstream& file, int chunkLength)
{
    return Concurrency::create_task([&file, chunkLength](){
        vector<char> buffer(chunkLength);
        file.read(&buffer[0], chunkLength);
        return string(&buffer[0], (unsigned int)file.gcount());
    });
}

Concurrency::task<void> writeFileChunk(ofstream& file, const string& chunk)
{
    return Concurrency::create_task([&file, chunk](){
        file.write(chunk.c_str(), chunk.length());
    });
}

void copyFile_get(const string& inFilePath, const string& outFilePath)
{
    ifstream inFile(inFilePath, ios::binary | ios::ate);
    inFile.seekg(0, inFile.beg);
    ofstream outFile(outFilePath, ios::binary);

    string chunk;
    while (chunk = readFileChunk(inFile, 4096).get(), !chunk.empty()) {
        writeFileChunk(outFile, chunk).get();
    }
}

The problem is that this code is not much better than a purely synchronous loop of blocking calls to read and write the file; if our thread schedules asynchronous operations but then immediately blocks waiting for them to complete, we cannot really do much else concurrently.

It would be better to chain a sequence of tasks that read and write the file chunk by chunk, so that they can be scheduled one after the other leaving our thread free to do other work while these tasks are blocked in some I/O operation.

It turns out that writing a “task loop” correctly is not trivial, especially if there are many possible error cases or exceptions to handle. It is actually so tricky that there are MSDN pages that explains what to do, and what the possible pitfalls are, and the PPL Asynchronous Sample Pack (PPL Asynchronous Sample Pack) even provides sample code for this.

The code I put together is neither clean nor concise:

task<shared_ptr<string>> readFileChunk(shared_ptr<ifstream> file, int chunkLength)
{
    return create_task([file, chunkLength]( {
        vector<char> buffer(chunkLength);
        file->read(&buffer[0], chunkLength);
        return make_shared<string>(&buffer[0], (unsigned int)file->gcount());
    });
}

task<bool> writeFileChunk(shared_ptr<ofstream> file, shared_ptr<string> chunk)
{
    return create_task([file, chunk](){
        file->write(chunk->c_str(), chunk->length());
        return chunk->length() == 0;
    });
}

task<void> copyFile_repeat(shared_ptr<ifstream> inFile, shared_ptr<ofstream> outFile)
{
    return readFileChunk(inFile, 4096)
        .then([=](shared_ptr<string> chunk){
            return writeFileChunk(outFile, chunk);
        })
        .then([=](bool eof){
            if (!eof) {
                return copyFile_repeat(inFile, outFile);
            }
            else {
                return task_from_result();
            }
        });
}

task<void> copyFile_then(const string& inFilePath, const string& outFilePath)
{
    auto inFile = make_shared<ifstream>(inFilePath, ios::binary | ios::ate);
    inFile->seekg(0, inFile->beg);
    auto outFile = make_shared<ofstream>(outFilePath, ios::binary);

    return copyFile_repeat(inFile, outFile);
}

It is probably easy to do better, to write cleaner code that executes in a loop a simple continuation chain like this one, but it is also quite evident that when the algorithm becomes more complicated the asynchronous code can become very convoluted.

Debugging

And then there is the problem of debugging: if we set a breakpoint in the file->write(…) line, for example, when the execution stops there the debugger will present us with a quite strange call stack. With continuations we lose the usual “causality chain” of return stacks, typical of synchronous code. Here the lambdas that compose the body of the tasks are scattered in the call stack, and distributed across several threads and sometimes it can be difficult to understand what is going on.

Image

Image

The async-await pattern

The complexity of working with Tasks is well known to C# programmers, who have already had a few years to experiment and gain experience with the .NET TPL. This is the reason why the biggest improvement of C# 5.0 was the introduction of the Async/Await pattern, designed on the model of F# asynchronous workflows.

To see how it works, let’s start by writing in C# an iterative but blocking implementation of our CopyFile function. This can be done very easily:

        private void CopyChunk(Stream input, Stream output)
        {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = input.Read(buffer, 0, buffer.Length)) != 0)
            {
                output.Write(buffer, 0, bytesRead);
            }
        }

        public void CopyFile(string inFile, string outFile)
        {
            using (StreamReader sr = new StreamReader(inFile)
            {
                using (StreamWriter sw = new StreamWriter(outFile)
                {
                    CopyChunk(sr.BaseStream, sw.BaseStream);
                }
            }
        }

Let’s say that we want now to make the function completely asynchronous. Ignoring the fact that the framework actually provides a Stream.CopyToAsync, we can use other asynchronous .NET APIs, like Stream.ReadAsync and Stream.WriteAsync that return a Task, together with async/await, and write this:

        private async Task CopyChunk_Async(Stream input, Stream output)
        {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = await input.ReadAsync(buffer, 0, buffer.Length)) != 0)
            {
                await output.WriteAsync(buffer, 0, bytesRead);
            }
        }

        public async Task CopyFile_Async(string inFile, string outFile)
        {
            using (StreamReader sr = new StreamReader(inFile)
            {
                using (StreamWriter sw = new StreamWriter(outFile))
                {
                   await CopyChunk_Async(sr.BaseStream, sw.BaseStream);
                }
            }
        }

Now, this is really beautiful! We did only a few small changes to the structure of the code, adding the async modifier and changing the return type of our methods, and adding a strange await keyword in the places where we call asynchronous functions. The logic of the function has not changed at all, and yet, it now executes in a completely different way. Let’s see how.

In C# a method can be declared as asynchronous by adding the async modifier to the method declaration. For example:

public static async Task<int> FooAsync() {…}

An asynchronous method is like a regular C# method, but its return type can only be of type void, Task or Task<T>, and it can also contain await expressions, like:

int result = await FooAsync();

Awaiting on an expression means launching the asynchronous execution of that expression (usually an async method) and, if it has not yet completed, immediately yielding the execution to the caller.

So asynchronous methods are, like iterator methods, a kind of coroutine. They are resumable, their execution can “magically” pause in a few points identified by the ‘await’ statements, and then “magically” resume when the asynchronous expression has completed. While the method is paused waiting for the completion its caller continues to run. This means that if the method was called from an UI message loop, for example, the UI thread can continue processing messages, and the application will remain responsive. If the method was called by a worker thread in a web server the thread will be free to serve more requests.

But just like iterator methods, in C# asynchronous methods are not really coroutines. They are instead implemented with a state machine. The details are quite complicated (a very good description can be found here and here), but what happens is that the compiler transforms the function body by attaching a task continuation to the asynchronous operation so that the execution will resume from the point where it had left.

In which thread the awaitable expression executes, and in which thread the async method will resume? It’s up to the task scheduler to decide; asynchrony does not mean parallelism, a method does not need to run on another thread to be asynchronous. The scheduler may run the task later in the same thread, or it may run it on a worker thread from the ThreadPool. The execution will then resume on the right thread (for example the UI thread in a GUI app) according to the current synchronization context. For the point of view of the developer, this means that he cannot rely on the fact that an async method will continue its execution on the same thread where it started (with all the consequent multithreading headaches).

C++ resumable functions

So, everything is nice in the managed world of C#, but what about native programming? Is there anything like async/await that we can use with our futures? We can find the answer in N3858, another proposal made by Gustafsson et al. for C++17.

This time the changes proposed are to the language itself and not just to the library. The idea is to introduce the equivalent of C# async methods in the form of resumable functions. They can be thought as the basis to add to C++ the support for real co-routines, and are not strictly related to the <future> library, even though they have being defined especially to improve the usability of futures and promises.

Resumable functions would have almost exactly the same semantic of async/await in C#: two new keywords would be added to the language:

–        The resumable keyword, added after the argument list, defines a function as being suspendable and resumable, in the way described before. It is the equivalent of C#’s async.

–        Like in C#, the await keyword in the body of a resumable function identifies a suspension point, one of the places where the function may suspend execution and then resume it later (possibly in a different thread) when some asynchronous operation completes.

For example:

        future<int> abs_r(future<int> i_op) resumable
        {
            int i = await i_op;
            return (i < 0) ? –i : i;
        }

This would define a resumable function that takes a future<int> as argument, suspends itself waiting for the future result to be available, and produces an integer as result. Of course, a resumable function can only return a future<T> or shared_future<T>. It completes and produces its final value only when a return statement is executed.

Visual Studio 2013 CTP

Will this proposal be accepted? Will resumable functions become part of the standard? And is there a portable way to implement them?

I really cannot answer any of these questions. But, interestingly, it is already possible to play with resumable functions today. Last November Microsoft shipped the Visual Studio November 2013 CTP with a preview of several C++ features that will be released in the next version of Visual Studio and that improve the conformance to the C++ standard. Between those, there is also the first implementation of resumable functions.

To enable them we must change the “Platform Toolset” of our C++ project to use the updated compiler from the CTP, as shown in figure. Also, we need to include the new header <pplawait.h> which extends the PPL <ppltasks.h>.

Image

The syntax of CTP’s resumable functions is very similar to the one proposed by Gustafsson, with a small change in the keywords. In order not to “taint” the language with (still) non-standard keywords, async has been renamed as __resumable and await as __await.

So, we can now rewrite one last time our copyFile method, this time using the CTP’s resumable functions:

#include <pplawait.h>

task<void> copyFile_res(const string inFilePath, const string outFilePath) __resumable
{
	auto inFile = make_shared<ifstream>(inFilePath, ios::binary | ios::ate);
	inFile->seekg(0, inFile->beg);
	auto outFile = make_shared<ofstream>(outFilePath, ios::binary);

	while (true)
	{
		shared_ptr<string> s = __await readFileChunk(inFile, 4096);
		if (s->empty()) { break; }
		__await writeFileChunk(outFile, s);
	}
}

int _tmain(int argc, _TCHAR* argv[])
{
    …
    copyFile_res(file1, file2).get();
}

We got there! Finally we have simple, readable, maintainable asynchronous C++ code. The only small complication is that it is necessary to pay a lot of attention to the life scope of the objects passed to the asynchronous/resumable methods. In C# things are easier, but we don’t have a garbage collector here, so it is often better to wrap the objects in shared_ptr<T>s to make sure they don’t get destroyed when they are still referenced by some async task. A small tribute to pay to the Gods of asynchrony, anyway.

A kind of magic…

But how would resumable functions work, in practice? To learn more about this it is interesting to study the <pplawait.h> header and to watch the interesting Channel 9 talk of Deon Brewis, from last year’s GoingNative conference.

Gustafsson’s proposal describes two possible solutions:

  1. With a state machine, like in C#.
  2. With resumable side-stacks (aka Fibers).

The first implementation would be similar to how async/await is implemented in C#. The compiler would transform the code of the function, implementing the logic of a state machine that keeps track of the current state of the function. The implementation would be more complicated than in C# because C++ does not have garbage collection; the function variables could not live in the stack (because the stack frame would be lost when the function is suspended), so they would have to be stored in an heap-allocated activation frame.

But the second way of implementing resumable function seems more elegant: each function would have its own side stack, a stack separated by the thread stack and allocated by the compiler when the resumable function is invoked, and destroyed when the resumable function completes. What the proposal describes is actually to using something identical to Fibers, in Windows.

The very, very surprising thing is, in my opinion, that Microsoft chose the second option, the fibers, to implement resumable functions in the CTP.

Fibers

Now, this is curious: a couple of years ago I wrote a few posts about a possible way to write C#-like iterator blocks and LINQ operators in C++ using Fibers.

Fibers are an almost forgotten, old feature added to Windows NT to support cooperative multitasking, in the form of lightweight threads that must be manually scheduled by the application. If I am not wrong, they were added to Windows to improve the performance of the first versions of SQL Server.

My implementation of iterator blocks was very questionable for many reasons (for example, I just copied the IEnumerator/IEnumerable interfaces from .NET rather than writing something that could work better with C++ STL) but I thought that the weakest point was probably the use of fibers.

In fact it is difficult to write correct code with fibers; the entire program must be designed to support them. During the years, many things in Windows have grown increasingly incompatible with them. The whole .NET does not work with Fibers, and an attempt to fix this proved so complicated that was soon abandoned.

Evidently this is not necessarily a problem if fibers are used in the limited scope of supporting native resumable functions, and under the direct control of our compiler. But even so, the functions and libraries that we call from our resumable function must be fiber-safe (in addition to being thread-safe); for example thread-local storage does not work with fibers. The CRT itself was not completely fiber-safe in the past, but I guess this should have been fixed by now. And there is the problem of managing C++ exceptions: in the past they could only be caught by the fiber that threw them, they could not pass across fibers’ boundaries, but even this problem can be solved by the compiler or library.

Also, I wonder whether there is a portable way to implement side-stacks also on non-Windows platforms, and if some support from the OS will necessarily be required.

Under the hood

But let’s look in more detail at the CTP code, to learn how resumable functions actually work. The implementation is split between the compiler and the library code in <pplawait.h>.

The code of <pplawait.h> is organized in two main classes:

template <typename TTask, typename TFunc, typename TTaskType>
class __resumable_func_setup_data
{
    __resumable_func_fiber_data* _func_data;
    LPVOID _pSystemFiber;
    LPVOID _pvLambda; // The function to execute
    task_completion_event<TTaskType> _tce;
    [...]
};
struct __resumable_func_fiber_data
{
    LPVOID threadFiber;         // The thread fiber to which to return.
    LPVOID resumableFuncFiber;  // The fiber on which this function will run
    volatile LONG refcount;     // Refcount of the fiber
    [...]
};

The first, __resumable_func_setup_data, contains the data and logic to initialize a resumable method call, and it’s templatized on the function type.

The second, __resumable_func_fiber_data, contains the data and methods to manage the life of a resumable method that runs on a fiber.

When a function is declared as resumable, like:

    task<int> foo(future<int> fi) __resumable

the compiler generates the code for a stub function foo_resumable.

When a resumable function is called, the compiler generates a call to this stub function, which takes care of executing foo into a new “resumable context”. The code for this is in a static function:

    TTask __resumable_func_setup_data::RunActionInResumableContext(TFunc& lambda)

which shows how a Fiber can be used to seamlessly suspend and resume a function:

  1. The first thing to do is to convert the caller thread into a fiber, by calling __resumable_func_fiber_data::ConvertCurrentThreadToFiber(). When the function completes it will convert the fiber back into a normal thread with ConvertFiberToThread.
  2. Then, a __resumable_func_setup_data struct is initialized with the data required to invoke the function on a fiber.
  3. After this, a new fibers is created, specifying as entry point __resumable_func_setup_data.ResumableFuncFiberProc. Since creating and destroying a fiber is a relatively expensive operation, the library also implements a class FiberPool, to manage a pool of Fiber, which will be reused when needed.
  4. Now the current fiber can suspend itself and start the execution of this new fiber by calling SwitchToFiber.

Calling SwitchToFiber means manually doing a context switch (or better, a fiber switch). The instruction pointer of our thread suddenly jumps to a different place, using a different stack, and all the registers are updated. But the old context and stack is not lost: it is part of the previous fiber. It is just not running, because it is not associated to any thread, but it can be resumed in any moment.

For example, the following figure shows what happens when the resumable copyFile_res is called in our copyFile program: first the current thread (Thread A) is converted into a fiber (Fiber 0), then a new fiber (Fiber 1) is created to run copyFile_res, and the thread execution is switched to the new Fiber. At that point only Fiber 1 is running. Fiber 0 still exists, with its stack, but it’s suspended.

Image

When the ResumableFuncFiberProc starts executing in the context of the new fiber, it:

  1. Retrieves a pointer to the __resumable_func_setup_data from its parameters,
  2. Initializes a new instance of __resumable_func_fiber_data, allocated on the thread stack, with all the data required to manage the function (like the handle of the caller-fiber), and
  3. Finally executes the resumable function in the context of the fiber.

Now we are executing our functor in the context of the fiber. We can leave the function in two ways:

–        We can complete the function (with a return statement), and return the final result, or

–        We can temporarily suspend the function on an ­­__await expression and yield the execution to the caller.

In the first case, all we need to do is to complete the ResumableFuncFiberProc, store the result and call SwitchToFiber(previousFiber) to resume the previous fiber. The “terminated” fiber is now ready to be deleted or recycled.

Things get more interesting when we have an await expression, like:

    shared_ptr<string> s = __await readFileChunk(inFile, 4096);

In this case the compiler converts the __await into a call to the library function

    template <typename T> T await_task(concurrency::task<T> func)

where func is a task that wraps the awaitable expression that is being called.

await_task implements all the logic to suspend the current function/fiber and to yield the control back to the previous (caller) fiber, so resuming it.

But before yielding the execution, await_task attaches a continuation to the task func. When that task will complete, we want to resume the current fiber, and to do this the continuation needs to retrieve and update the func_data_on_fiber_stack struct and to call SwitchToFiber once again.

The actual details are a little complicated, but the whole code in <pplawait.h> is not too difficult to read.

Changes to the Windows Fiber API

The last interesting thing to note is that the new <pplawait.h> library makes use of a new and slightly improved version of the Win32 Fiber API, defined for Windows 8.1 (where _WIN32_WINNT >= 0x0603) but actually available also on Windows 7 (at least with the most recently patched version of kernel32.dll).

The original APIs, available since WinNT 4.0, provided the following methods:

LPVOID WINAPI CreateFiber(
    _In_     SIZE_T dwStackSize,
    _In_     LPFIBER_START_ROUTINE lpStartAddress,
    _In_opt_ LPVOID lpParameter);
LPVOID WINAPI ConvertThreadToFiber(_In_opt_ LPVOID lpParameter);
VOID WINAPI DeleteFiber(_In_ LPVOID lpFiber);
BOOL WINAPI ConvertFiberToThread();
VOID WINAPI SwitchToFiber(_In_ LPVOID lpFiber);

The first two of these methods have now been superseded by an extended version:

LPVOID WINAPI CreateFiberEx(
    _In_     SIZE_T dwStackCommitSize,
    _In_     SIZE_T dwStackReserveSize,
    _In_     DWORD dwFlags,
    _In_     LPFIBER_START_ROUTINE lpStartAddress,
    _In_opt_ LPVOID lpParameter);
LPVOID WINAPI ConvertThreadToFiberEx(
    _In_opt_ LPVOID lpParameter,
    _In_     DWORD dwFlags);

The difference is that a Fiber can now be created specifying different values for the CommitSize and the ReserveSize of the Fiber stack.Here ReserveSize represents the total stack allocation in virtual memory, while CommitSize is the initially committed memory; it can be useful to fine-tunes these values optimize the memory usage. In fact, programs with a large number of concurrent resumable operations could end up allocating a very large amount of memory for the fiber stacks, since a different Fiber must be allocated for each resumable function.

There is also a dwFlags argument that can take the only possible value FIBER_FLAG_FLOAT_SWITCH and that it’s necessary to overcome an old limitation of the Fiber API, which in the past did not save the floating point registers in a Fiber-switch.

Finally, there is a new function, still undocumented but already used by the CTP library:

PVOID WINAPI CalloutOnFiberStack(
    _In_      PVOID lpFiber,
    _In_      PFIBER_CALLOUT_ROUTINE lpStartAddress,
    _In_opt_  PVOID lpParameter);

It is not difficult to imagine that its purpose is to execute a function on the Fiber specified and return the result to the caller.

Generator functions

We have almost completed a quick review of asynchrony in C++. But there is only one last thing to consider before finishing: asynchronous methods are very similar to iterator methods.

They are, both, a kind of co-routine. We can think of an async method as a strange kind of iterator, which returns only one value, at the end, but which is able to suspend its execution as iterators do after yielding each value. Or we could think of iterator blocks as strange kinds of async methods, because they can suspend and resume execution but only returning a value to the caller.

In C# the compiler implements both iterator methods and asynchronous methods transforming the code of the method into a state machine. We have seen that in C++ the proposal is to implement async methods as real co-routines, using side-stacks. So, the logical question to ask is “Can we use resumable functions to implement C#-style iterators”?

Of course we can! The same proposal N3858 explains that the concept of resumable functions can be expanded to include generator functions, which would use the same kind of interruption/resumption logic to produce a sequence of values.

Gustafsson defines a new special container, sequence<T>, to represent a sequence of values lazily generated by a generator. With this, it should be possible to write LINQ-like code, like the following:

template <typename Iter>
sequence<int> lazy_tform(Iter beg, Iter end, std::function<int(int)> func) resumable
{
    for (auto it = beg; it != end; ++it) {
        yield func(*it);
    }
}

But this will be the topic for a future post. Stay tuned! 🙂

Implementing iterator blocks in C++ (part 2: Win32 Fibers)

Fibers were added to Windows NT to support cooperative multitasking. They can be thought as lightweight threads that must be manually scheduled by the application.

When a fiber is created, it is passed a fiber-start function. The OS then assigns it a separate stack and sets up execution to begin at this fiber-start function. To schedule this fiber, you need to switch to it manually and when running, a fiber can then suspend itself by yielding execution to another fiber, or back to “calling” fiber. In other words, fibers are a perfect tool to implement coroutines sequencing.

These two articles, from Dr.Dobb’s and MSDN Magazine, explain how to implement coroutines using the Fiber API. The MSDN Magazine article also shows how to do this in .NET (it was written before the release of .NET 2.0, so before iterators were available in C#. But actually Fibers don’t get along well with the CLR). Together with an old series of Raymond Chen’s blog posts, these articles have been the main source of inspiration for this small project. (In this article Duffy proposes another possible implementations of coroutines based on threads, which is however quite inefficient).

Interestingly, a Fiber-based implementation does not have the limitations of (state machine based) C# iterators; with fibers it is possible to yield the control from any function in a stack frame. There are other problems, though.

Fibers are like dynamite

The Win32 Fiber API is quite simple. The first thing to do is to convert the thread on which the fibers will run into a fiber, by calling ConvertThreadToFiber. After this, additional fibers are created using CreateFiber passing as parameter the address of the function to execute, just as the threadProc for real threads. Then, a fiber can suspend itself and start the execution of another fiber by calling SwitchToFiber. Finally, when the application has done using fibers, it can convert the “main” fiber back to a normal thread with ConvertFiberToThread.

There are a few important caveats to consider:

  • It is difficult to write a library or framework that uses fibers, because the entire program must be designed to support them.  For example, the function ConvertThreadToFiber must be called only once in a thread.
  • Since each fiber has its own stack, it also has its own SEH exception chain. This means that if a fiber throws an exception, only that fiber can catch it. The same is true for C++ exceptions. Exceptions cannot pass across fibers’ “boundaries”.
  • The default stack size is 1MB, so using many fibers can consume a lot of memory.
  • The code must be “fiber-safe”, but most code is designed to be just “thread-safe”. For example, using thread-local storage does not work with fibers, and in fact Windows Vista introduced an API for Fiber local storage. More importantly, the CRT was not completely fiber-safe in the past, and I am not sure it is now. There are also compiler options to set in Visual Studio, like /GT, which enables only Fiber-safe code optimizations.

In other words, as Raymond Chen put it, “Fibers are like dynamite. Mishandle them and your process explodes”.  Therefore, this framework for iterators and Linq-like operators we’ll define should be used VERY CAREFULLY in real programs.
(And if you think things are bad in C++, consider that fibers are practically unusable with .NET managed code!)

Implementation details

Putting all these caveats aside, let’s see how coroutines can be implemented with fibers. This is the declaration of a Fiber class I created as a thin wrapper over the Win32 Fiber API.

class Fiber
{
public:
    Fiber();
    virtual ~Fiber();

    static void enableFibersInCurrentThread();
    static bool disableFibersInCurrentThread();

    void* main();
    void* resume();

protected:
    virtual void run() = 0;
    void yield(bool goOn);

private:
    static void WINAPI fiberProc(void* lpFiberParameter);

    PFIBER_START_ROUTINE _fiber;
    PFIBER_START_ROUTINE _previousFiber;

    enum FiberState {
        FiberCreated, FiberRunning, FiberStopPending, FiberStopped
    };
    FiberState _state;
};

The Fiber class exposes the static functions enableFibersInCurrentThread and disableFibersInCurrentThread that wrap the initialization/termination functions.
The constructor creates a new fiber object, specifying Fiber::fiberProc as the function to execute.

Fiber::Fiber() :
    _state(FiberCreated),
    _previousFiber(nullptr),
    _exception(),
    _exceptionCaught(false)
{
    _fiber = (PFIBER_START_ROUTINE)::CreateFiber(256 * 1024, fiberProc, this);
}

As said, fibers use cooperative multitasking: the execution of a fiber must be explicitly scheduled by the application by calling SwitchToFiber. This is encapsulated by the method resume below:

bool Fiber::resume()
{
    if (nullptr == _fiber || _state == FiberStopped) {
        return false;
    }

    _previousFiber = (PFIBER_START_ROUTINE)::GetCurrentFiber();
    assert(_previousFiber != _fiber);

    ::SwitchToFiber(_fiber);

    if (_exceptionCaught) {
        throw _exception;
    }

    return (FiberRunning == _state);
}

When the fiber is started with SwitchToFiber, it begins executing from the fiberProc method, which calls main:

void CALLBACK Fiber::fiberProc(void* pObj)
{
    Fiber* pFiber = (Fiber*)pObj;
    void* previousFiber = pFiber-&gt;main();
    ::SwitchToFiber(previousFiber);
}

What main does is simply to call the abstract function run, which any class derived from Fiber needs to implement.
Inside run we can at some point yield the execution to a different Fiber object, calling its resume method, so effectively creating a “stack” of fibers nested into each other. Or, we can yield the execution to the previously running fiber (the one that launched the current one), calling the method yield (equivalent to yield return and yield break in C#):

void Fiber::yield(bool goOn)
{
    if (! goOn) {
        _state = FiberStopped; // yield break
    }
    ::SwitchToFiber(_previousFiber);
}

Logically this works like the sequence of nested function calls in a thread stack, but of course there is no physical stack here, and we can return back to the caller only by storing a pointer to the previous fiber and explicitly yielding control to it.

Since exceptions cannot travel outside a fiber, the call to run is wrapped in a try-catch clause which tries to catch any kind of exceptions (even Win32 exceptions). If caught, data about an exception is stored, the fiber is stopped and the fiberProc ends restarting the execution of the previous fiber, inside the resume function. Here exceptions can be re-created and re-thrown in the context of the previous fiber, so effectively forwarding them back, up on the “fiber stack”. Not sure this is a very elegant workaround, but I could not find a better solution.

void* Fiber::main()
{
    _state = FiberRunning;
    _exceptionCaught = false;

    try {
        run();
    }
    catch (StopFiberException&amp;)
    {
        _state = FiberStopped;
    }
    catch (std::exception&amp; e)
    {
        _exception = e;
        _exceptionCaught = true;
        _state = FiberStopped;
    }
    catch (...)
    {
        _exception = std::exception("win32 exception");
        _exceptionCaught = true;
        _state = FiberStopped;
    }

    _state = FiberStopped;
    return _previousFiber;
}

Finally, when we have done with a fiber object, we can delete it, so releasing all the resources it had allocated and the memory of its stack.

Fiber::~Fiber()
{
    if (_state == FiberRunning) {
        _previousFiber = (PFIBER_START_ROUTINE)::GetCurrentFiber();
        _state = FiberStopPending;
        ::SwitchToFiber(_fiber);
    }
    ::DeleteFiber(_fiber);
}

Next…

The Fiber class is the main building block in the implementation of coroutines. What is left to do is just to write classes that inherit from Fiber and implement in the virtual function run the actual coroutines code to execute.
In the next posts we’ll see how to use the Fiber class to implement iterator blocks that work like in C#, implementing the IEnumerable and IEnumerator interfaces, and how to use this to reimplement many Linq operators.

If you are curious, you can look at the sources of this small project. A first code drop (still quite a preliminary version) is available in this source repository on Google code.
Let me know your comments and suggestions, if you are so kind to have a look.