Async-Await in C++

In the previous post we had a quick look at the problem of writing concurrent, asynchronous code with C++. We saw that the <future> library, introduced with C++11, offers a partial solution to this problem by separating the act of initiating an operation from the act of waiting for its result. But we saw that the act of waiting for a result is still synchronous, and that std:futures are not easily composable.

We talked of the platform-specific improvements offered by Microsoft’s PPL tasks, which have become the model for a few interesting changes proposed to the C++ standard library.

But are these improvements enough?

The problems with Tasks

Having composable futures would certainly be a big improvement for C++. But the experience with C# tells us that it’s not a perfect solution. A purely library-based solution is bound to have limitations. In particular, .NET’s TPL has the defect of making the code overly complex.

It is difficult to write robust, readable and debuggable code with TPL. It is too easy to end up with spaghetti code: a large number of Task<T>s interconnected in inextricable chains of continuations.

I found out the hard way, when I was given, by my last employer, a .NET component to maintain which had been architected as an incredibly convoluted graph of interconnected TPL Tasks, often joined with conditional ContinueWith().

Loops and conditionals

It is particularly difficult to write iterations and branches with tasks.

In my previous post I used as example a copyFile function that asynchronously read all the file content into a single string and copied it into another file. Of course such an implementation would not be practical with very large files, so let’s rewrite it so that it will read and write a file chunk by chunk.

Here I am using the PPL, but very similar code could be written with std::future (especially when they will support task continuations with future::then).

We can easily write an iterative function that use tasks to read and write asynchronously every chunk of a file, but that still blocks waiting for these tasks to complete:

Concurrency::task<string> readFileChunk(ifstream& file, int chunkLength)
{
    return Concurrency::create_task([&file, chunkLength](){
        vector<char> buffer(chunkLength);
        file.read(&buffer[0], chunkLength);
        return string(&buffer[0], (unsigned int)file.gcount());
    });
}

Concurrency::task<void> writeFileChunk(ofstream& file, const string& chunk)
{
    return Concurrency::create_task([&file, chunk](){
        file.write(chunk.c_str(), chunk.length());
    });
}

void copyFile_get(const string& inFilePath, const string& outFilePath)
{
    ifstream inFile(inFilePath, ios::binary | ios::ate);
    inFile.seekg(0, inFile.beg);
    ofstream outFile(outFilePath, ios::binary);

    string chunk;
    while (chunk = readFileChunk(inFile, 4096).get(), !chunk.empty()) {
        writeFileChunk(outFile, chunk).get();
    }
}

The problem is that this code is not much better than a purely synchronous loop of blocking calls to read and write the file; if our thread schedules asynchronous operations but then immediately blocks waiting for them to complete, we cannot really do much else concurrently.

It would be better to chain a sequence of tasks that read and write the file chunk by chunk, so that they can be scheduled one after the other leaving our thread free to do other work while these tasks are blocked in some I/O operation.

It turns out that writing a “task loop” correctly is not trivial, especially if there are many possible error cases or exceptions to handle. It is actually so tricky that there are MSDN pages that explains what to do, and what the possible pitfalls are, and the PPL Asynchronous Sample Pack (PPL Asynchronous Sample Pack) even provides sample code for this.

The code I put together is neither clean nor concise:

task<shared_ptr<string>> readFileChunk(shared_ptr<ifstream> file, int chunkLength)
{
    return create_task([file, chunkLength]( {
        vector<char> buffer(chunkLength);
        file->read(&buffer[0], chunkLength);
        return make_shared<string>(&buffer[0], (unsigned int)file->gcount());
    });
}

task<bool> writeFileChunk(shared_ptr<ofstream> file, shared_ptr<string> chunk)
{
    return create_task([file, chunk](){
        file->write(chunk->c_str(), chunk->length());
        return chunk->length() == 0;
    });
}

task<void> copyFile_repeat(shared_ptr<ifstream> inFile, shared_ptr<ofstream> outFile)
{
    return readFileChunk(inFile, 4096)
        .then([=](shared_ptr<string> chunk){
            return writeFileChunk(outFile, chunk);
        })
        .then([=](bool eof){
            if (!eof) {
                return copyFile_repeat(inFile, outFile);
            }
            else {
                return task_from_result();
            }
        });
}

task<void> copyFile_then(const string& inFilePath, const string& outFilePath)
{
    auto inFile = make_shared<ifstream>(inFilePath, ios::binary | ios::ate);
    inFile->seekg(0, inFile->beg);
    auto outFile = make_shared<ofstream>(outFilePath, ios::binary);

    return copyFile_repeat(inFile, outFile);
}

It is probably easy to do better, to write cleaner code that executes in a loop a simple continuation chain like this one, but it is also quite evident that when the algorithm becomes more complicated the asynchronous code can become very convoluted.

Debugging

And then there is the problem of debugging: if we set a breakpoint in the file->write(…) line, for example, when the execution stops there the debugger will present us with a quite strange call stack. With continuations we lose the usual “causality chain” of return stacks, typical of synchronous code. Here the lambdas that compose the body of the tasks are scattered in the call stack, and distributed across several threads and sometimes it can be difficult to understand what is going on.

Image

Image

The async-await pattern

The complexity of working with Tasks is well known to C# programmers, who have already had a few years to experiment and gain experience with the .NET TPL. This is the reason why the biggest improvement of C# 5.0 was the introduction of the Async/Await pattern, designed on the model of F# asynchronous workflows.

To see how it works, let’s start by writing in C# an iterative but blocking implementation of our CopyFile function. This can be done very easily:

        private void CopyChunk(Stream input, Stream output)
        {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = input.Read(buffer, 0, buffer.Length)) != 0)
            {
                output.Write(buffer, 0, bytesRead);
            }
        }

        public void CopyFile(string inFile, string outFile)
        {
            using (StreamReader sr = new StreamReader(inFile)
            {
                using (StreamWriter sw = new StreamWriter(outFile)
                {
                    CopyChunk(sr.BaseStream, sw.BaseStream);
                }
            }
        }

Let’s say that we want now to make the function completely asynchronous. Ignoring the fact that the framework actually provides a Stream.CopyToAsync, we can use other asynchronous .NET APIs, like Stream.ReadAsync and Stream.WriteAsync that return a Task, together with async/await, and write this:

        private async Task CopyChunk_Async(Stream input, Stream output)
        {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = await input.ReadAsync(buffer, 0, buffer.Length)) != 0)
            {
                await output.WriteAsync(buffer, 0, bytesRead);
            }
        }

        public async Task CopyFile_Async(string inFile, string outFile)
        {
            using (StreamReader sr = new StreamReader(inFile)
            {
                using (StreamWriter sw = new StreamWriter(outFile))
                {
                   await CopyChunk_Async(sr.BaseStream, sw.BaseStream);
                }
            }
        }

Now, this is really beautiful! We did only a few small changes to the structure of the code, adding the async modifier and changing the return type of our methods, and adding a strange await keyword in the places where we call asynchronous functions. The logic of the function has not changed at all, and yet, it now executes in a completely different way. Let’s see how.

In C# a method can be declared as asynchronous by adding the async modifier to the method declaration. For example:

public static async Task<int> FooAsync() {…}

An asynchronous method is like a regular C# method, but its return type can only be of type void, Task or Task<T>, and it can also contain await expressions, like:

int result = await FooAsync();

Awaiting on an expression means launching the asynchronous execution of that expression (usually an async method) and, if it has not yet completed, immediately yielding the execution to the caller.

So asynchronous methods are, like iterator methods, a kind of coroutine. They are resumable, their execution can “magically” pause in a few points identified by the ‘await’ statements, and then “magically” resume when the asynchronous expression has completed. While the method is paused waiting for the completion its caller continues to run. This means that if the method was called from an UI message loop, for example, the UI thread can continue processing messages, and the application will remain responsive. If the method was called by a worker thread in a web server the thread will be free to serve more requests.

But just like iterator methods, in C# asynchronous methods are not really coroutines. They are instead implemented with a state machine. The details are quite complicated (a very good description can be found here and here), but what happens is that the compiler transforms the function body by attaching a task continuation to the asynchronous operation so that the execution will resume from the point where it had left.

In which thread the awaitable expression executes, and in which thread the async method will resume? It’s up to the task scheduler to decide; asynchrony does not mean parallelism, a method does not need to run on another thread to be asynchronous. The scheduler may run the task later in the same thread, or it may run it on a worker thread from the ThreadPool. The execution will then resume on the right thread (for example the UI thread in a GUI app) according to the current synchronization context. For the point of view of the developer, this means that he cannot rely on the fact that an async method will continue its execution on the same thread where it started (with all the consequent multithreading headaches).

C++ resumable functions

So, everything is nice in the managed world of C#, but what about native programming? Is there anything like async/await that we can use with our futures? We can find the answer in N3858, another proposal made by Gustafsson et al. for C++17.

This time the changes proposed are to the language itself and not just to the library. The idea is to introduce the equivalent of C# async methods in the form of resumable functions. They can be thought as the basis to add to C++ the support for real co-routines, and are not strictly related to the <future> library, even though they have being defined especially to improve the usability of futures and promises.

Resumable functions would have almost exactly the same semantic of async/await in C#: two new keywords would be added to the language:

–        The resumable keyword, added after the argument list, defines a function as being suspendable and resumable, in the way described before. It is the equivalent of C#’s async.

–        Like in C#, the await keyword in the body of a resumable function identifies a suspension point, one of the places where the function may suspend execution and then resume it later (possibly in a different thread) when some asynchronous operation completes.

For example:

        future<int> abs_r(future<int> i_op) resumable
        {
            int i = await i_op;
            return (i < 0) ? –i : i;
        }

This would define a resumable function that takes a future<int> as argument, suspends itself waiting for the future result to be available, and produces an integer as result. Of course, a resumable function can only return a future<T> or shared_future<T>. It completes and produces its final value only when a return statement is executed.

Visual Studio 2013 CTP

Will this proposal be accepted? Will resumable functions become part of the standard? And is there a portable way to implement them?

I really cannot answer any of these questions. But, interestingly, it is already possible to play with resumable functions today. Last November Microsoft shipped the Visual Studio November 2013 CTP with a preview of several C++ features that will be released in the next version of Visual Studio and that improve the conformance to the C++ standard. Between those, there is also the first implementation of resumable functions.

To enable them we must change the “Platform Toolset” of our C++ project to use the updated compiler from the CTP, as shown in figure. Also, we need to include the new header <pplawait.h> which extends the PPL <ppltasks.h>.

Image

The syntax of CTP’s resumable functions is very similar to the one proposed by Gustafsson, with a small change in the keywords. In order not to “taint” the language with (still) non-standard keywords, async has been renamed as __resumable and await as __await.

So, we can now rewrite one last time our copyFile method, this time using the CTP’s resumable functions:

#include <pplawait.h>

task<void> copyFile_res(const string inFilePath, const string outFilePath) __resumable
{
	auto inFile = make_shared<ifstream>(inFilePath, ios::binary | ios::ate);
	inFile->seekg(0, inFile->beg);
	auto outFile = make_shared<ofstream>(outFilePath, ios::binary);

	while (true)
	{
		shared_ptr<string> s = __await readFileChunk(inFile, 4096);
		if (s->empty()) { break; }
		__await writeFileChunk(outFile, s);
	}
}

int _tmain(int argc, _TCHAR* argv[])
{
    …
    copyFile_res(file1, file2).get();
}

We got there! Finally we have simple, readable, maintainable asynchronous C++ code. The only small complication is that it is necessary to pay a lot of attention to the life scope of the objects passed to the asynchronous/resumable methods. In C# things are easier, but we don’t have a garbage collector here, so it is often better to wrap the objects in shared_ptr<T>s to make sure they don’t get destroyed when they are still referenced by some async task. A small tribute to pay to the Gods of asynchrony, anyway.

A kind of magic…

But how would resumable functions work, in practice? To learn more about this it is interesting to study the <pplawait.h> header and to watch the interesting Channel 9 talk of Deon Brewis, from last year’s GoingNative conference.

Gustafsson’s proposal describes two possible solutions:

  1. With a state machine, like in C#.
  2. With resumable side-stacks (aka Fibers).

The first implementation would be similar to how async/await is implemented in C#. The compiler would transform the code of the function, implementing the logic of a state machine that keeps track of the current state of the function. The implementation would be more complicated than in C# because C++ does not have garbage collection; the function variables could not live in the stack (because the stack frame would be lost when the function is suspended), so they would have to be stored in an heap-allocated activation frame.

But the second way of implementing resumable function seems more elegant: each function would have its own side stack, a stack separated by the thread stack and allocated by the compiler when the resumable function is invoked, and destroyed when the resumable function completes. What the proposal describes is actually to using something identical to Fibers, in Windows.

The very, very surprising thing is, in my opinion, that Microsoft chose the second option, the fibers, to implement resumable functions in the CTP.

Fibers

Now, this is curious: a couple of years ago I wrote a few posts about a possible way to write C#-like iterator blocks and LINQ operators in C++ using Fibers.

Fibers are an almost forgotten, old feature added to Windows NT to support cooperative multitasking, in the form of lightweight threads that must be manually scheduled by the application. If I am not wrong, they were added to Windows to improve the performance of the first versions of SQL Server.

My implementation of iterator blocks was very questionable for many reasons (for example, I just copied the IEnumerator/IEnumerable interfaces from .NET rather than writing something that could work better with C++ STL) but I thought that the weakest point was probably the use of fibers.

In fact it is difficult to write correct code with fibers; the entire program must be designed to support them. During the years, many things in Windows have grown increasingly incompatible with them. The whole .NET does not work with Fibers, and an attempt to fix this proved so complicated that was soon abandoned.

Evidently this is not necessarily a problem if fibers are used in the limited scope of supporting native resumable functions, and under the direct control of our compiler. But even so, the functions and libraries that we call from our resumable function must be fiber-safe (in addition to being thread-safe); for example thread-local storage does not work with fibers. The CRT itself was not completely fiber-safe in the past, but I guess this should have been fixed by now. And there is the problem of managing C++ exceptions: in the past they could only be caught by the fiber that threw them, they could not pass across fibers’ boundaries, but even this problem can be solved by the compiler or library.

Also, I wonder whether there is a portable way to implement side-stacks also on non-Windows platforms, and if some support from the OS will necessarily be required.

Under the hood

But let’s look in more detail at the CTP code, to learn how resumable functions actually work. The implementation is split between the compiler and the library code in <pplawait.h>.

The code of <pplawait.h> is organized in two main classes:

template <typename TTask, typename TFunc, typename TTaskType>
class __resumable_func_setup_data
{
    __resumable_func_fiber_data* _func_data;
    LPVOID _pSystemFiber;
    LPVOID _pvLambda; // The function to execute
    task_completion_event<TTaskType> _tce;
    [...]
};
struct __resumable_func_fiber_data
{
    LPVOID threadFiber;         // The thread fiber to which to return.
    LPVOID resumableFuncFiber;  // The fiber on which this function will run
    volatile LONG refcount;     // Refcount of the fiber
    [...]
};

The first, __resumable_func_setup_data, contains the data and logic to initialize a resumable method call, and it’s templatized on the function type.

The second, __resumable_func_fiber_data, contains the data and methods to manage the life of a resumable method that runs on a fiber.

When a function is declared as resumable, like:

    task<int> foo(future<int> fi) __resumable

the compiler generates the code for a stub function foo_resumable.

When a resumable function is called, the compiler generates a call to this stub function, which takes care of executing foo into a new “resumable context”. The code for this is in a static function:

    TTask __resumable_func_setup_data::RunActionInResumableContext(TFunc& lambda)

which shows how a Fiber can be used to seamlessly suspend and resume a function:

  1. The first thing to do is to convert the caller thread into a fiber, by calling __resumable_func_fiber_data::ConvertCurrentThreadToFiber(). When the function completes it will convert the fiber back into a normal thread with ConvertFiberToThread.
  2. Then, a __resumable_func_setup_data struct is initialized with the data required to invoke the function on a fiber.
  3. After this, a new fibers is created, specifying as entry point __resumable_func_setup_data.ResumableFuncFiberProc. Since creating and destroying a fiber is a relatively expensive operation, the library also implements a class FiberPool, to manage a pool of Fiber, which will be reused when needed.
  4. Now the current fiber can suspend itself and start the execution of this new fiber by calling SwitchToFiber.

Calling SwitchToFiber means manually doing a context switch (or better, a fiber switch). The instruction pointer of our thread suddenly jumps to a different place, using a different stack, and all the registers are updated. But the old context and stack is not lost: it is part of the previous fiber. It is just not running, because it is not associated to any thread, but it can be resumed in any moment.

For example, the following figure shows what happens when the resumable copyFile_res is called in our copyFile program: first the current thread (Thread A) is converted into a fiber (Fiber 0), then a new fiber (Fiber 1) is created to run copyFile_res, and the thread execution is switched to the new Fiber. At that point only Fiber 1 is running. Fiber 0 still exists, with its stack, but it’s suspended.

Image

When the ResumableFuncFiberProc starts executing in the context of the new fiber, it:

  1. Retrieves a pointer to the __resumable_func_setup_data from its parameters,
  2. Initializes a new instance of __resumable_func_fiber_data, allocated on the thread stack, with all the data required to manage the function (like the handle of the caller-fiber), and
  3. Finally executes the resumable function in the context of the fiber.

Now we are executing our functor in the context of the fiber. We can leave the function in two ways:

–        We can complete the function (with a return statement), and return the final result, or

–        We can temporarily suspend the function on an ­­__await expression and yield the execution to the caller.

In the first case, all we need to do is to complete the ResumableFuncFiberProc, store the result and call SwitchToFiber(previousFiber) to resume the previous fiber. The “terminated” fiber is now ready to be deleted or recycled.

Things get more interesting when we have an await expression, like:

    shared_ptr<string> s = __await readFileChunk(inFile, 4096);

In this case the compiler converts the __await into a call to the library function

    template <typename T> T await_task(concurrency::task<T> func)

where func is a task that wraps the awaitable expression that is being called.

await_task implements all the logic to suspend the current function/fiber and to yield the control back to the previous (caller) fiber, so resuming it.

But before yielding the execution, await_task attaches a continuation to the task func. When that task will complete, we want to resume the current fiber, and to do this the continuation needs to retrieve and update the func_data_on_fiber_stack struct and to call SwitchToFiber once again.

The actual details are a little complicated, but the whole code in <pplawait.h> is not too difficult to read.

Changes to the Windows Fiber API

The last interesting thing to note is that the new <pplawait.h> library makes use of a new and slightly improved version of the Win32 Fiber API, defined for Windows 8.1 (where _WIN32_WINNT >= 0x0603) but actually available also on Windows 7 (at least with the most recently patched version of kernel32.dll).

The original APIs, available since WinNT 4.0, provided the following methods:

LPVOID WINAPI CreateFiber(
    _In_     SIZE_T dwStackSize,
    _In_     LPFIBER_START_ROUTINE lpStartAddress,
    _In_opt_ LPVOID lpParameter);
LPVOID WINAPI ConvertThreadToFiber(_In_opt_ LPVOID lpParameter);
VOID WINAPI DeleteFiber(_In_ LPVOID lpFiber);
BOOL WINAPI ConvertFiberToThread();
VOID WINAPI SwitchToFiber(_In_ LPVOID lpFiber);

The first two of these methods have now been superseded by an extended version:

LPVOID WINAPI CreateFiberEx(
    _In_     SIZE_T dwStackCommitSize,
    _In_     SIZE_T dwStackReserveSize,
    _In_     DWORD dwFlags,
    _In_     LPFIBER_START_ROUTINE lpStartAddress,
    _In_opt_ LPVOID lpParameter);
LPVOID WINAPI ConvertThreadToFiberEx(
    _In_opt_ LPVOID lpParameter,
    _In_     DWORD dwFlags);

The difference is that a Fiber can now be created specifying different values for the CommitSize and the ReserveSize of the Fiber stack.Here ReserveSize represents the total stack allocation in virtual memory, while CommitSize is the initially committed memory; it can be useful to fine-tunes these values optimize the memory usage. In fact, programs with a large number of concurrent resumable operations could end up allocating a very large amount of memory for the fiber stacks, since a different Fiber must be allocated for each resumable function.

There is also a dwFlags argument that can take the only possible value FIBER_FLAG_FLOAT_SWITCH and that it’s necessary to overcome an old limitation of the Fiber API, which in the past did not save the floating point registers in a Fiber-switch.

Finally, there is a new function, still undocumented but already used by the CTP library:

PVOID WINAPI CalloutOnFiberStack(
    _In_      PVOID lpFiber,
    _In_      PFIBER_CALLOUT_ROUTINE lpStartAddress,
    _In_opt_  PVOID lpParameter);

It is not difficult to imagine that its purpose is to execute a function on the Fiber specified and return the result to the caller.

Generator functions

We have almost completed a quick review of asynchrony in C++. But there is only one last thing to consider before finishing: asynchronous methods are very similar to iterator methods.

They are, both, a kind of co-routine. We can think of an async method as a strange kind of iterator, which returns only one value, at the end, but which is able to suspend its execution as iterators do after yielding each value. Or we could think of iterator blocks as strange kinds of async methods, because they can suspend and resume execution but only returning a value to the caller.

In C# the compiler implements both iterator methods and asynchronous methods transforming the code of the method into a state machine. We have seen that in C++ the proposal is to implement async methods as real co-routines, using side-stacks. So, the logical question to ask is “Can we use resumable functions to implement C#-style iterators”?

Of course we can! The same proposal N3858 explains that the concept of resumable functions can be expanded to include generator functions, which would use the same kind of interruption/resumption logic to produce a sequence of values.

Gustafsson defines a new special container, sequence<T>, to represent a sequence of values lazily generated by a generator. With this, it should be possible to write LINQ-like code, like the following:

template <typename Iter>
sequence<int> lazy_tform(Iter beg, Iter end, std::function<int(int)> func) resumable
{
    for (auto it = beg; it != end; ++it) {
        yield func(*it);
    }
}

But this will be the topic for a future post. Stay tuned! 🙂

Concurrency in C++11

C++11 is a much better language than its previous versions, with features like lambda expressions, initializer lists, rvalue references, automatic type deduction. The C++11 standard library now provides regular expressions, revamped smart pointers and a multi-threading library.

But modern C++ is still limited in its support of parallel and asynchronous computations, especially when compared to languages like C#.

The need for asynchrony

But why do we need support for asynchrony? Computer architectures are becoming more and more parallel and distributed, with multi-core processors nearly ubiquitous and with cores distributed in the cloud. And software applications are more and more often composed by multiple components distributed across many cores located in a single machine or across a network. Modern programming languages need to offer support to manage this parallelism.

At the same time, responsiveness has become a more and more indispensable quality of software (This is one of the tenets of “Reactive Programming”, an emerging programming paradigm).

Responsiveness means that we don’t want to block waiting for the I/O operation to complete. In the server-side we don’t want to block a worker thread while it could be used to do some other work and be alerted when an operation completes. And in the client-side we really don’t want to block in the main/GUI thread of our process, making the application sluggish and non-responsive. Being able to write asynchronous code is therefore more and more important to manage the latency of I/O operations without blocking and losing responsiveness. For example, as a rule, in WinRT all I/O-bound APIs that could take more than 50ms only provide asynchronous interfaces, and cannot even be invoked with a “classical” blocking call.

In this and in the next few posts we will have a look at what C++ currently provides to support concurrency, and what new features are on their way. We’ll see what is in the standard but also at the Windows-specific PPL framework provided by Microsoft.

A simple example

To understand how we can write asynchronous code in C++ let’s play with a very simple example. Let’s imagine that we want to write a function to read a file and copy its content into another file. To do this we could write functions like the following:

#include <string> 
#include <vector> 
#include <fstream> 
#include <iostream> 
using namespace std; 

vector<char> readFile(const string& inPath) 
{ 
    ifstream file(inPath, ios::binary | ios::ate); 
    size_t length = (size_t)file.tellg(); 
    vector<char> buffer(length); 
    file.seekg(0, std::ios::beg); 
    file.read(&buffer[0], length); 
    return buffer; 
} 

size_t writeFile(const vector<char>& buffer, const string& outPath) 
{ 
    ofstream file(outPath, ios::binary); 
    file.write(&buffer[0], buffer.size()); 
    return (size_t)file.tellp(); 
} 

With these, we can easily write a simple function to copy a file into another and return the number of characters written:

size_t sync_copyFile(const string& inFile, const string& outFile) 
{ 
    return writeFile(readFile(inFile), outFile); 
} 

Evidently we want to execute readFile and writeFile in sequence, one after the other. But should we block while waiting for them to complete? Well, this is evidently a contrived example, if the file is not very big it probably does not matter much, and if the file is very large we would rather use buffering and copy it in chunks rather than returning all its content in a big vector. But both readFile and writeFile are I/O bound and represent here just a model for more complex I/O operations; in real applications it is common to have to read some data from a network, transform it in some way and return a response or write it somewhere.

So, let’s say that we want to execute the copyFile operation asynchronously. How can we manage to do this in standard C++?

Task-based parallelism: futures and promises

The C++11 standard library provides a few mechanisms to support concurrency. The first is std::thread, which together with synchronization objects (std::mutex, std::lock_guards, std::condition_variables and so on) finally offer a portable way to write “classic” multithread-based concurrent code in C++.

We could modify copyFile to instantiate a new thread to execute the copy, and use a condition_variable to be notified when the thread completes. But working at the level of threads and locks can be quite tricky. Modern frameworks (like the TPL in .NET) provides offer a higher level of abstraction, in the form of task-based concurrency. There, a task represents an asynchronous operation that can run in parallel with other operations, and the system hides the details of how this parallelism is implemented.

The C++11 library, in its new <future> header, also provides a (somehow limited) support for task-based parallelism, in the form of promises and futures. The classes std::promise<T> and std::future<T> are roughly the C++ equivalent of a .NET Task<T>, or of a Future<T> of Java 8. They work in pairs to separate the act of calling a function from the act of waiting for the call results.

At the caller-side when we call the asynchronous function we do not receive a result of type T. What is returned instead is a std::future<T>, a placeholder for the result, which will be delivered at some point in time, later.

Once we get our future we can move on doing other work, while the task executes on a separate thread.

A std::promise<T> object represents a result in the callee-side of the asynchronous call, and it is the channel for passing the result asynchronously to the caller. When the task completes, it puts its result into a promise object calling promise::set_value.

When the caller finally needs to access the result it will call the blocking future::get() to retrieve it. If the task has already completed the result will be immediately available, otherwise, the caller thread will suspend until the result value becomes available.

In our example, this is a version of copyFile written to use futures and promises:

#include <future> 

size_t future_copyFile(const string& inFile, const string& outFile) 
{ 
    std::promise<vector<char>> prom1; 
    std::future<vector<char>> fut1 = prom1.get_future(); 
    std::thread th1([&prom1, inFile](){ 
        prom1.set_value(readFile(inFile)); 
    }); 

    std::promise<int> prom2; 
    std::future<int> fut2 = prom2.get_future(); 
    std::thread th2([&fut1, &prom2, outFile](){ 
        prom2.set_value(writeFile(fut1.get(), outFile)); 
    }); 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

Note that here we have moved the execution of the readFile and writeFile into separate tasks but we also have to configure and start threads to run them. Also, we capture references to the promise and future objects to make them available to the task functions. The first thread implements the read, and moves its result into a promise when it completes, in the form of a big vector. The second thread waits (blocking) on a corresponding future and when the read completes, get the read vector and pass it to the write function. Finally, when the writes complete, the number of chars written is put in the second future.

In the main function we could take advantage of this parallelism and do some lengthy operation before the call to future::get(). But when we call get() the main thread will still block if the read and write tasks have not completed yet.

Packaged tasks

We can slightly simplify the previous code with packaged_tasks. Class std::packaged_task<T> is a container for a task and its promise. Its template type is the type of the task function (for example, vector<char>(const string&) for our read function. It is a callable type (defines the operator()) and automatically creates and manage a std::promise<T> for us.

size_t packagedtask_copyFile(const string& inFile, const string& outFile) 
{ 
    using Task_Type_Read = vector<char>(const string&); 
    packaged_task<Task_Type_Read> pt1(readFile); 
    future<vector<char>> fut1{ pt1.get_future() }; 
    thread th1{ move(pt1), inFile }; 

    using Task_Type_Write = size_t(const string&); 
    packaged_task<Task_Type_Write> pt2([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }); 
    future<size_t> fut2{ pt2.get_future() }; 
    thread th2{ move(pt2), outFile }; 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

Note that we need to use move() to pass the packaged_task to thread because a packaged_task cannot be copied.

std::async

With packaged_tasks the logic of the function does not change much, the code becomes slightly more readable, but we still have to manually create the threads to run the tasks, and decide on which thread the task will run.

Things become much simpler if we use the std::async() function, also provided by the library. It takes as input a lambda or functor and it returns a future that will contain the return value. This is the version of copyFile modified to use std::async():

size_t async_copyFile(const string& inFile, const string& outFile) 
{ 
    auto fut1 = async(readFile, inFile); 
    auto fut2 = async([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }, 
    outFile); 

    return fut2.get(); 
} 

In a way std::async() is the equivalent of TPL task schedulers. It decides where to run the task, if a new thread needs to be created or if an old (or even the current) thread can be reused.

It is also possible to specify a launch policy, which can be either “async” (which requires to execute the task asynchronously, possibly in a different thread) or “deferred” (which asks to execute the task only at the moment when get() is called).

The nice thing is that std::async hides all the implementation, platform specific details for us. Examining the <future> header file that comes with VS2013 we can see that the Windows implementation of std::async internally uses the Parallel Patterns Library (PPL), the native equivalent of .NET TPL.

PPL

What we have seen so far is what has been standardized in C++11. It is fair to say that the design of futures and promises is still quite limited, especially if compared with what is provided by C# and .NET.

The main limitation is that in C++11 futures are not composable. If we start several tasks to execute computations in parallel, we cannot block on all the futures, waiting for any of them to complete, but only on one future at a time. Also, there is no easy way to combine a set of tasks into a sequence, which each task that consumes as input the result of the previous task. Composable tasks allow to make the whole architecture non-blocking and event-driven. We really would like to have also in C++ something like task continuations or the async/await pattern.

With the PPL (aka Concurrency Runtime) Microsoft had the possibility of overcoming the constraints of the standards and to experiment with a more sophisticated implementation of a task library.

In the PPL, class Concurrency::task<T> (defined in the <ppltasks.h> header) represents a task. A task is the equivalent of a future; it also provides the same blocking method get() to retrieve the result. The type parameter T is the return type, and the task is initialized passing a work function (either a lambda or a function pointer or a function object).

So, let’s abandon all concerns of portability for a moment and let’s re-implement our copyFile function, this time with tasks:

size_t ppl_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<vector<char>> tsk1 = Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }); 
    Concurrency::task<size_t> tsk2 = Concurrency::create_task([&tsk1, outFile]() { 
        return writeFile(tsk1.get(), outFile); 
    }); 
    return tsk2.get(); 
} 

Here we have created two task objects, initialized with two lambda expressions, for the read and write operations.

Now we really don’t have to worry about threads anymore; it’s up to the PPL schedule to decide where to run the tasks and to manage a thread pool. Note however that we are still manually coordinating the interaction of our two tasks: task2 keeps a reference to the first task, and explicitly waits for task1 to terminate before using its result. This is acceptable in a very simple example like this, but it could become quite cumbersome when we deal with more tasks and with more complicated code.

Task continuations

Unlike futures, PPL tasks support composition through continuations. The task::next method allows to add a continuation task to a task; the continuation will be invoked when its antecedent task completes and will receive the value returned by the antecedent task.

So, let’s rewrite the copyFile function again, but this time using a continuation:

size_t ppl_then_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<size_t> result =  
    Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
 
    return result.get(); 
} 

Now the code is really clean. We have split the logic of a copy function into two separate components (tasks) that can run in any thread and will be run by a task scheduler. And we have declared the logic of our function as the dependency graph of the tasks.

In this implementation the copyFile function still blocks, at the end, to get the final value, but in a real program it could just return a task that we would insert in the logic of our application, attaching to it a continuation to asynchronously handle its value. We would have code like this:

Concurrency::task<size_t> ppl_create_copyFile_task(const string& inFile, const string& outFile) 
{ 
    return Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
} 
... 
auto tCopy = ppl_create_copyFile_task(inFile, outFile).then([](size_t written) { 
    cout << written << endl; 
}); 
... 
tCopy.wait(); 

Finally, PPL tasks also provide other ways to compose tasks with the methods task::wait_all and task::wait_any, useful to manage a set of tasks that run in parallel.

Given a set of tasks, when_all creates another task that completes when all the tasks complete (so, it implements a join). Instead when_any creates a task that completes when one of the tasks in the set completes; this can be useful for example to limit the number of concurrent tasks, and start a new one only when another completes.

But this is just the tip of the PPL iceberg… the PPL offers a rich set of functionalities, almost equivalent to what is available in its managed version. It also provides scheduler classes that perform the important duty of managing a pool of worker threads and allocating tasks to threads. More details can be found here.

Towards C++17

Hopefully we could soon see some of the improvements introduced by PPL in the C++ standard. There is already a document (N3857) written by Niklas Gustaffson et al. that proposes a few changes to the library. In particular, the idea is to enable the composability of future with future::then, future::when_any and future::when_all, with the same semantic seen in the PPL. The previous example, with the new futures, would be rewritten in a portable-way like this:

future<size_t> future_then_copyFile(const string& inFile, const string& outFile)
{
	return async([inFile]() {
		return readFile(inFile);
	}).then([outFile](const vector<char>& buffer) {
		return writeFile(buffer, outFile);
	});
}

There would also be a method future::is_ready to test the future for completion before calling the blocking get(), and future::unwrap to manage nested futures (futures that returns future); of course all the details of this proposal can be found in the document above.

Would this be the perfect solution? Not quite. Out experience with .NET teaches us that task-based asynchronous code is still hard to write, debug, and maintain. Sometimes very hard. This is the reason why new keywords were added to the C# language itself, in the version 5.0, to more easily manage asynchrony through the Async/Await pattern.
Is there anything similar brewing even for the unmanaged world of C++?

Blunders!

blunders

Blunders! is my first Windows 8 store app. It is a client for the Free Internet Chess Server (FICS). The name comes from a quote of Savielly Tartakower: “Chess is a fairy tale of 1001 blunders“, which could tersely summarize my chess playing style.

The genesis of the app was quite simple: some time ago I received a nice Surface as a gift (thank you Steve! :-)) and I could not find any good client to play chess, so I thought that it could be interesting to write my own, to learn more about Windows 8, WinRT, XAML, MVVM and all the new cool stuff.

Blunders! is finally in the Windows App store and you can download it, for free, of course, from here. I also hope to publish soon the (open) sources in codeplex.

These are a few screenshots, starting with the main page, with the FICS console window. Non-standard chess rules (like Fischer random, atomic, bughouse, suicide) are not supported yet.

The main page

You can challenge other players:

players

Or accept one of the “game ads” posted by other players:

challenges

And, of course, you can play, or observe games being played:

game2

Happy playing! 🙂

Implementing iterator blocks in C++ (part 3: LINQ)

Now that we know how to implement coroutines with fibers, we can use them to port to the unmanaged world of C++ a few familiar C# constructs, like iterator blocks and LINQ operators. The result will be in the form of a small template class library. A first code drop of the cppLinq library is available in this source repository on Google code.

The cppLinq library compiles with the Developer Preview of Visual Studio 2011; I have used a few features from the latest C+11 standard that were not well implemented in VS2010. (For example, VS2010 had a few problems with nested lambda expressions). I am also using the new C++ unit test framework to implement the unit tests.

Small disclaimer: this is still a work in progress; I have not yet implemented all the LINQ operators, and the code could certainly use some refactoring and cleaning up. Also, using fibers in a Windows application is a bit dangerous and should be done carefully. In my opinion, this code is mostly useful as a reference of how C# iterators and LINQ is actually works. I doubt I will ever use this code in a real program, but certainly I know better how to use these constructs in C# now!

Iterator blocks

We have seen that In C# an iterator is a function that returns an IEnumerable<T> or an IEnumerator<T>. When the C# compiler compiles such a function, it generates an hidden iterator class that implement the IEnumerator<T> and IEnumerable<T> interfaces  and generates code for a state machine that produces the sequence of values “yielded” by the iterator. (See part 1, or, even better, this article for a detailed description).

We can base the C++ implementation iterator blocks on the equivalent autogenerated C# code, using fibers in place of the state machine. In C++, we can define IEnumerator and IEnumerable as follows:

template <typename T>
class IEnumerator
{
public:
    virtual void Reset() = 0;
    virtual bool MoveNext() = 0;
    virtual T& get_Current() = 0;
};

template<typename T>;
class IEnumerable
{
public:
    virtual std::shared_ptr<IEnumerator<T>> GetEnumerator() = 0;
};

Collecting our garbage

Note that GetEnumerator returns a std::shared_ptr. Managing the lifespan of iterator blocks turns out to be an interesting problem: iterators can be generated inside other iterators and can be composed in a data pipeline, so it is not easy to manually dispose of them. The code of iterator blocks is naturally more elegant when written in languages that support garbage collection. We don’t have GC in C++, but we can use shared pointers; as long as we guarantee that we always use shared pointers to manage the lifetime of our IEnumerables and IEnumerators (and that there are no circular references between them) we can be sure that they will be automatically deleted when the reference count associated to the shared pointer goes to zero.

By the way, I think that the standard implementation of shared_ptr is very cool. It also provides ways to generate a shared_ptr from this and, in case of multiple-inheritance, to convert a shared_ptr <Base1> into a shared_ptr <Base2>.

The IteratorBlock class

Going on with the implementation, we can write a template class IteratorBlock<T> that implements an iterator that produces a sequence of T objects through the IEnumerator and IEnumerable interfaces:

template <typename TSource>
class IteratorBlock : public IEnumerable<TSource>,
                      public IEnumerator<TSource>,
                      public Fiber
{
public:
    // IEnumerable
    virtual std::shared_ptr<IEnumerator<TSource>> GetEnumerator() {
        if (::GetCurrentThreadId() == _threadId && ! _enumeratorCreated) {
            _enumeratorCreated = true;
            return std::dynamic_pointer_cast<IEnumerator<TSource>>(shared_from_this());
        }

        std::shared_ptr<IteratorBlock<TSource>> cloned = clone();
        return cloned->GetEnumerator();
    }

    // IEnumerator
    virtual void Reset() {
        throw InvalidOperationException();
    }

    virtual bool MoveNext() {
        return resume();
    }

    virtual TSource& get_Current() {
        return _current;
    }

    void yieldReturn(TSource returnValue) {
        _current = returnValue;
        yield(true);
    }

    void yieldBreak() {
        yield(false);
    }

protected:
    IteratorBlock() :
        _current(),
        _enumeratorCreated(false),
        _threadId(::GetCurrentThreadId()) {
    }
    virtual ~IteratorBlock() {}

    virtual std::shared_ptr<IteratorBlock<Source>> clone() const = 0;

private:
    TSource _current;
    bool _enumeratorCreated;
    DWORD _threadId;
};

Class IteratorBlock is designed to be used as the base class of an iterator class. Its implementation works as a coroutine, based on the Fiber class from which it inherits.

In order to implement an iterator block, we need to inherit from IteratorBlock<T> and provide an implementation for the abstract methods clone() and run() .

As in C# iterators, the method Reset should never be called and throws an exception.

The method MoveNext simply restarts the coroutine, calling Fiber::resume. From here, the execution goes to the overridden method run. Inside run we can suspend the execution of the coroutine by calling yieldReturn and yieldBreak which behave, respectively, like the C# yield return and yield break keywords. A call to yieldReturn also specifies the current value to be returned by the iterator, stored in the data member _current­.

The implementation of GetEnumerator deserves a few more words. An iterator block is an IEnumerable, from which we can ask for one or more instances of an IEnumerator. The first time we call GetEnumerator the function can return a pointer to the object itself, since it implements both interfaces, but following calls to GetEnumerator return a pointer to a copy of the iterator block object. The abstract method clone needs to be implemented to create this copy by cloning an instance of the iterator class.

Example: a Fibonacci generator

As example we can implement a generator for the Fibonacci sequence writing a class like the following, putting all the logic in the overridden run function.

class FibonacciIterator : public IteratorBlock<long>
{
public:
    virtual void run() {
        long a = 0;
        long b = 1;
        yieldReturn(a);
        yieldReturn(b);
        for (int i = 2; i < 10; i++) {
            long tmp = a + b;
            a = b;
            b = tmp;
            yieldReturn(b);
        }
    }

    virtual std::shared_ptr<IteratorBlock<long>> clone() const {
        return this;
    }
};

The foreach loop

Client code can interact with an iterator with code like this:

auto source = std::shared_ptr<IEnumerable<long>>(new FibonacciIterator());
auto e = source->GetEnumerator();
while (e->MoveNext())
{
    long value = e->get_Current();
    // use ‘value’
}

This is the equivalent of a foreach statement, and can be encapsulated in a foreach function, templatized on the type of a function that will be applied to each item in the sequence:

template <typename T, typename Func>
static void foreach(std::shared_ptr<IEnumerable<T>> enumerable, Func f)
{
    std::shared_ptr<IEnumerator<T>> e = enumerable->GetEnumerator();
    while (e->MoveNext()) {
        f(e->get_Current());
    }
}

This allows us to write code like the following:

std::vector<std::string> greek;
greek.push_back("alpha"); // initializer lists not supported in VS11
greek.push_back("beta");
greek.push_back("gamma");

auto it = new StlEnumerable<std::vector<std::string>, std::string>(v);

foreach<int>(it, [](std::string& s) -> void {
    std::cout << s << std::endl;
});

Here we assume to have an iterator class StlEnumerable<Container, T>, which generates a sequence iterating over the items of a STL container of T’s. (You can find this class in the sources).

The code simply creates an iterator from a STL vector, and then applies a lambda expression to each item in the vector, with a foreach loop. Nothing very useful, but we are now almost ready to extend this with composition and LINQ-like operators.

Closures

A class like the FibonacciIterator is a closure; it encapsulates some behavior (the function run) and also the context (the data members) on which the function operates.

Writing code like this (that is, a class that derives from IteratorBlock and overrides the method run) works well, but can be a little “verbose” since it forces us to define a different class for each different iterator. In C++11 we can instead implement our coroutine with a lambda expression, which will automatically implement a closure for us.

The following class, _IteratorBlock, inherits from the previous ­IteratorBlock class and adds the capability of passing a lambda expression to the constructor, which will be executed by the method run.

template <typename TSource>
class _IteratorBlock : public IteratorBlock<TSource>
{
protected:
    struct IF {
        virtual void run(IteratorBlock<TSource>* pThis) = 0;
    };

    template <typename Func>
    struct F : public IF
    {
        F(Func func) : _func(func) {
        }

        virtual void run(IteratorBlock<TSource&>* pThis) {
            _func(pThis);
        }

        Func _func;
    };

public:
    template <typename _F>
    _IteratorBlock(_F f) :
        _f(std::shared_ptr<IF>(new F<_F>(f))) {
    }

    _IteratorBlock(const _IteratorBlock& rhs) :
        _f(rhs._f) {
    }

protected:
    virtual void run() {
        _f->run(this);
    }

    virtual std::shared_ptr<IteratorBlock<TSource>> clone() const {
        return std::shared_ptr<IteratorBlock<TSource>>(new _IteratorBlock<TSource>(*this));
    }

private:
    std::shared_ptr<IF> _f;
};

Using this specialized ­_IteratorBlock class we can simply create an iterator defining a lambda. (Interestingly, it will be the compiler to create a closure class for us, storing captured variables in data members, but we don’t need to worry about this implementation detail).

So, we can more simply define our Fibonacci iterator like follows:

auto fn = [](IteratorBlock<long>* it) {
    long a = 0;
    long b = 1;
    it->yieldReturn(a);
    it->yieldReturn(b);
    for (int i = 2; i < 10; i++) {
        long tmp = a + b;
        a = b;
        b = tmp;
        it->yieldReturn(b);
    }
}
return std::shared_ptr<IEnumerable<T>>(new _IteratorBlock<T>(fn));

Reimplementing LINQ to Objects

Now we have finally all the pieces ready to reimplement LINQ to objects.

How to proceed? I could have tried to reimplement all the LINQ clauses/methods myself. I could have used Reflector to find out how they are actually implemented in System.Linq.dll. But since I am very lazy, I decided to just “reuse” the work of Jon Skeet, who wrote a long and interesting series of articles about reimplementing the whole of LINQ to Objects in C#, some time ago.

What is better, he also wrote an exhaustive test suite for his reimplementation. All I had to do was to convert his code and tests from C# to C++… J

Of course, C++ does not have extensions methods, so I just added new methods to the IEnumerable<T> interface. For example, this is the code for the Where LINQ clause:

template <typename T>
class IEnumerable : public std::enable_shared_from_this<IEnumerable<T>>
{
public:
    virtual std::shared_ptr<IEnumerator<T>> GetEnumerator() = 0;

    // Linq operators
    …
    template <typename Predicate>
    std::shared_ptr<IEnumerable<T>> Where(Predicate predicate) {
        if (nullptr == predicate) {
            throw ArgumentNullException();
        }

        // deferred
        std::shared_ptr<IEnumerable<T>> source = shared_from_this();
        auto fn =  [source, predicate](IteratorBlock<T>* it) {
            foreach<T>(source, [it, predicate](T& item) {
                if (predicate(item)) {
                    it->yieldReturn(item);
                }
            });
        };
        return std::shared_ptr<IEnumerable<T>>(new _IteratorBlock<T>(fn));
    }
};

LINQ methods use deferred execution – until a client start trying to fetch items from the output sequence, they won’t start fetching items from the input sequence. Consequently, a method like Where is divided in two parts. Argument validation can be executed immediately, and the actual iterative code gets executed later, “on demand”.

The constructor of the Where-iterator class is templatized on the type of the predicate function passed as argument. This allows to pass as predicate either a lambda expression, or a std::function, or also any “functor” class that implements operator (). The implementation of Where is simply a lambda expression that is passed to an instance of the _IteratorBlock class. The lambda keeps fetching items from its source enumerator until the sequence is over or until an item is found that satisfies the predicate. In the latter case, the item is yielded to the caller.

Again, LINQ operators are lazily evaluated: expressions are evaluated only when their value is effectively required.

Data pipelines

What makes LINQ operators particularly useful is the ability of compose them into data pipelines, using the output of an iterator as the source of the next iterator in a pipe.

For example, the code in the following (quite contrived) examples prints the square of all even integers smaller than 10:

auto source = IEnumerable<int>::Range(0, 10);

auto it = source->Where([](int val) { return ((val % 2) == 0); })
                ->Select<double>([](int val) -> double { return (val * val); }));

foreach<double>(it, [](double& val){
    printf("%.2f\n", val);
});

Unit Tests

Unit tests are written using the new “CppUnitTest” framework that ships with the preview of VS11. As said, the tests are based on (and with “based on” I mean copied from J) the unit tests that Jon Skeet wrote for his “EduLinq” blog series.

For example, this is one of the unit tests for the Where method:

TEST_CLASS(WhereTest)
{
public:
    ...
    TEST_METHOD(WhereTest_SimpleFiltering2)
    {
        int v[] = { 1, 3, 4, 2, 8, 1 };
        std::shared_ptr<IEnumerable<int>> source(new Vector<int>(v, ARRAYSIZE(v)));
        std::function<bool(const int&)> predicate = [](int x) { return x < 4; };

        auto result = source->Where(predicate);

        int expected[] = { 1, 3, 2, 1 };
        Assert::IsTrue(result->SequenceEqual(expected, ARRAYSIZE(expected)));
    }
    ...
};

Finally…

This ends my quick excursus in the world on unmanaged iterator blocks. The sources of the small cppLinq library can be found here.  A few final comments:

  • As I said, this is still a work in progress. There are a few LINQ methods still left to implement (mainly the OrderBy method, which is the most laborious to write), but I hope to be able to complete them soon.
  • Basing the implementation of Win32 fibers has many disadvantages, but also one advantage: in C# iterators are implemented as state-machines and it is not possible to yield from more than one stack frame deep. Here, we don’t have that limitation.
  • It is very interesting to play with some of the features of the new C++ standard. Used together, templates and lambdas are a very powerful tool and allow us to make the C++ code very similar to its C# equivalent. But I found that this kind of C++ code is a bit too complicate to write correctly and when something is wrong the resulting error messages are not always easy to “decrypt”.
  • I learned a lot about the actual LINQ  writing this.

Implementing iterator blocks in C++ (part 2: Win32 Fibers)

Fibers were added to Windows NT to support cooperative multitasking. They can be thought as lightweight threads that must be manually scheduled by the application.

When a fiber is created, it is passed a fiber-start function. The OS then assigns it a separate stack and sets up execution to begin at this fiber-start function. To schedule this fiber, you need to switch to it manually and when running, a fiber can then suspend itself by yielding execution to another fiber, or back to “calling” fiber. In other words, fibers are a perfect tool to implement coroutines sequencing.

These two articles, from Dr.Dobb’s and MSDN Magazine, explain how to implement coroutines using the Fiber API. The MSDN Magazine article also shows how to do this in .NET (it was written before the release of .NET 2.0, so before iterators were available in C#. But actually Fibers don’t get along well with the CLR). Together with an old series of Raymond Chen’s blog posts, these articles have been the main source of inspiration for this small project. (In this article Duffy proposes another possible implementations of coroutines based on threads, which is however quite inefficient).

Interestingly, a Fiber-based implementation does not have the limitations of (state machine based) C# iterators; with fibers it is possible to yield the control from any function in a stack frame. There are other problems, though.

Fibers are like dynamite

The Win32 Fiber API is quite simple. The first thing to do is to convert the thread on which the fibers will run into a fiber, by calling ConvertThreadToFiber. After this, additional fibers are created using CreateFiber passing as parameter the address of the function to execute, just as the threadProc for real threads. Then, a fiber can suspend itself and start the execution of another fiber by calling SwitchToFiber. Finally, when the application has done using fibers, it can convert the “main” fiber back to a normal thread with ConvertFiberToThread.

There are a few important caveats to consider:

  • It is difficult to write a library or framework that uses fibers, because the entire program must be designed to support them.  For example, the function ConvertThreadToFiber must be called only once in a thread.
  • Since each fiber has its own stack, it also has its own SEH exception chain. This means that if a fiber throws an exception, only that fiber can catch it. The same is true for C++ exceptions. Exceptions cannot pass across fibers’ “boundaries”.
  • The default stack size is 1MB, so using many fibers can consume a lot of memory.
  • The code must be “fiber-safe”, but most code is designed to be just “thread-safe”. For example, using thread-local storage does not work with fibers, and in fact Windows Vista introduced an API for Fiber local storage. More importantly, the CRT was not completely fiber-safe in the past, and I am not sure it is now. There are also compiler options to set in Visual Studio, like /GT, which enables only Fiber-safe code optimizations.

In other words, as Raymond Chen put it, “Fibers are like dynamite. Mishandle them and your process explodes”.  Therefore, this framework for iterators and Linq-like operators we’ll define should be used VERY CAREFULLY in real programs.
(And if you think things are bad in C++, consider that fibers are practically unusable with .NET managed code!)

Implementation details

Putting all these caveats aside, let’s see how coroutines can be implemented with fibers. This is the declaration of a Fiber class I created as a thin wrapper over the Win32 Fiber API.

class Fiber
{
public:
    Fiber();
    virtual ~Fiber();

    static void enableFibersInCurrentThread();
    static bool disableFibersInCurrentThread();

    void* main();
    void* resume();

protected:
    virtual void run() = 0;
    void yield(bool goOn);

private:
    static void WINAPI fiberProc(void* lpFiberParameter);

    PFIBER_START_ROUTINE _fiber;
    PFIBER_START_ROUTINE _previousFiber;

    enum FiberState {
        FiberCreated, FiberRunning, FiberStopPending, FiberStopped
    };
    FiberState _state;
};

The Fiber class exposes the static functions enableFibersInCurrentThread and disableFibersInCurrentThread that wrap the initialization/termination functions.
The constructor creates a new fiber object, specifying Fiber::fiberProc as the function to execute.

Fiber::Fiber() :
    _state(FiberCreated),
    _previousFiber(nullptr),
    _exception(),
    _exceptionCaught(false)
{
    _fiber = (PFIBER_START_ROUTINE)::CreateFiber(256 * 1024, fiberProc, this);
}

As said, fibers use cooperative multitasking: the execution of a fiber must be explicitly scheduled by the application by calling SwitchToFiber. This is encapsulated by the method resume below:

bool Fiber::resume()
{
    if (nullptr == _fiber || _state == FiberStopped) {
        return false;
    }

    _previousFiber = (PFIBER_START_ROUTINE)::GetCurrentFiber();
    assert(_previousFiber != _fiber);

    ::SwitchToFiber(_fiber);

    if (_exceptionCaught) {
        throw _exception;
    }

    return (FiberRunning == _state);
}

When the fiber is started with SwitchToFiber, it begins executing from the fiberProc method, which calls main:

void CALLBACK Fiber::fiberProc(void* pObj)
{
    Fiber* pFiber = (Fiber*)pObj;
    void* previousFiber = pFiber-&gt;main();
    ::SwitchToFiber(previousFiber);
}

What main does is simply to call the abstract function run, which any class derived from Fiber needs to implement.
Inside run we can at some point yield the execution to a different Fiber object, calling its resume method, so effectively creating a “stack” of fibers nested into each other. Or, we can yield the execution to the previously running fiber (the one that launched the current one), calling the method yield (equivalent to yield return and yield break in C#):

void Fiber::yield(bool goOn)
{
    if (! goOn) {
        _state = FiberStopped; // yield break
    }
    ::SwitchToFiber(_previousFiber);
}

Logically this works like the sequence of nested function calls in a thread stack, but of course there is no physical stack here, and we can return back to the caller only by storing a pointer to the previous fiber and explicitly yielding control to it.

Since exceptions cannot travel outside a fiber, the call to run is wrapped in a try-catch clause which tries to catch any kind of exceptions (even Win32 exceptions). If caught, data about an exception is stored, the fiber is stopped and the fiberProc ends restarting the execution of the previous fiber, inside the resume function. Here exceptions can be re-created and re-thrown in the context of the previous fiber, so effectively forwarding them back, up on the “fiber stack”. Not sure this is a very elegant workaround, but I could not find a better solution.

void* Fiber::main()
{
    _state = FiberRunning;
    _exceptionCaught = false;

    try {
        run();
    }
    catch (StopFiberException&amp;)
    {
        _state = FiberStopped;
    }
    catch (std::exception&amp; e)
    {
        _exception = e;
        _exceptionCaught = true;
        _state = FiberStopped;
    }
    catch (...)
    {
        _exception = std::exception("win32 exception");
        _exceptionCaught = true;
        _state = FiberStopped;
    }

    _state = FiberStopped;
    return _previousFiber;
}

Finally, when we have done with a fiber object, we can delete it, so releasing all the resources it had allocated and the memory of its stack.

Fiber::~Fiber()
{
    if (_state == FiberRunning) {
        _previousFiber = (PFIBER_START_ROUTINE)::GetCurrentFiber();
        _state = FiberStopPending;
        ::SwitchToFiber(_fiber);
    }
    ::DeleteFiber(_fiber);
}

Next…

The Fiber class is the main building block in the implementation of coroutines. What is left to do is just to write classes that inherit from Fiber and implement in the virtual function run the actual coroutines code to execute.
In the next posts we’ll see how to use the Fiber class to implement iterator blocks that work like in C#, implementing the IEnumerable and IEnumerator interfaces, and how to use this to reimplement many Linq operators.

If you are curious, you can look at the sources of this small project. A first code drop (still quite a preliminary version) is available in this source repository on Google code.
Let me know your comments and suggestions, if you are so kind to have a look.

Implementing iterator blocks in C++ (part 1)

Recently I had to port some C# code to C++. The C# code was making a limited use of the .NET framework, so, most of the porting turned out to be fairly simple. (I only had to pay attention to the difference between value and reference types and to fix small issues with uninitialized data members, where the code relied on the default initialization in C#).

But part of the code was also using some of the features of C# 3.0: implicit types, lambda expressions, iterator blocks and LINQ. Some of this code is not difficult to port to C++, if we target the new C++11 standard. For example C# ‘var’ maps to C++ ‘auto’; C# lambda expressions can be translated into C++ lambdas.

But there is nothing in C++ that matches the elegance of IEnumerable<T> sequences and LINQ, so for the problem at hand I just ended up rewriting the LINQ-based code using STL containers and algorithms. That worked fine, but let me wonder: is there a way to implement C# iterators in C++, and, consequently, to have something similar to LINQ-to-objects in unmanaged code?

The answer is… kind of. In this short series of posts I’ll try to present a possible solution, based on co-routines implemented with Win32 Fibers.

Caveats

A few caveats: this is really just an experiment, a proof of concept, a toy project. The best way to deal with algorithms and containers in C++ is certainly to use the STL. I found that using Fibers has many drawbacks in Windows and makes it very difficult to write correct code. Also, the absence of a garbage collector makes things much trickier, as we will see.

But still, I think that emulating the behavior of C# iterator in C++ is a very interesting exercise, and a great way to learn how LINQ actually works. In many ways it also made me appreciate even more the simplicity and elegance of C#. It’s not the destination, but the journey.

C# iterator blocks

Iterators are objects that allow you to traverse a sequence of elements. Iterators in .NET are the basis of the foreach statement and of the LINQ-to-Objects framework. A very good introduction to iterators, iterator blocks and data pipelines can be found here, or in John Skeet’s “C# In Depth”.

To summarize, the iterator pattern in .NET is encapsulated in the IEnumerable/IEnumerator interfaces, which are defined as follows:

public interface IEnumerable : IEnumerable {
     IEnumerator GetEnumerator();
}

public interface IEnumerator : IEnumerator, IDisposable {
     void Reset();
     bool MoveNext();
     T Current { get; }

     void Dispose();
}

An object that wants to enumerate a sequence of data will implement the IEnumerable interface. Data consumers ask the IEnumerable for an IEnumerator calling GetEnumerator(), and then iterate over the sequence calling MoveNext and getting the Current property. MoveNext returns false when there is no more data to return.

A foreach statement is transformed by the C# compiler in code like this:

IEnumerator it = source.GetEnumerator();

while (it.MoveNext()) {
     int value = it.Current;
     // use ‘value’
}

Note that the Reset() method is never called.

C#2.0 added support for iterator blocks, with the yield keyword. An iterator block is a function that contains the yield keyword and returns an IEnumerable interface. For example, this function returns the sequence of the first 10 Fibonacci numbers:

class Test
{
     public static IEnumerable Fibonacci()
     {
         long a = 0;
         long b = 1;
         yield return a;
         yield return b;
         for (int i = 2; i < 10; i++)
         {
             long tmp = a + b;
             a = b;
             b = tmp;
             yield return b;
         }
     }
}

Coroutines

It is interesting to understand how the function returns the sequence of numbers. A function like this is completely different from normal methods of a class. When normal methods are invoked, execution begins at the beginning and once the method exits, it is finished; an instance of a method only returns once. In the case of iterators, instead, the execution is paused each time a value is yielded. Therefore, C# iterators behave like coroutines.

More precisely, coroutines can be defined as generalized subroutines with the following three properties:

· Local data to a coroutine persists between successive calls.

· Execution is suspended at the point where control leaves a coroutine (perhaps to allow another coroutine to produce something for it). When control reenters the coroutine, execution is resumed at the point where it previously left off.

· Control can be passed from one coroutine to another, causing the current coroutine to suspend its execution and transfer control to the new coroutine.

C# iterators can be considered like a basic form of coroutines. In this article Joe Duffy explains how they can be used to implement a kind of co-routine scheduler, which can be used to schedule lightweight tasks for simulated concurrent execution.

State machine

However, C# iterators are not really coroutines. They are implemented by the C# compiler by generating the code of a state machine. More precisely, for a function like the one above, the compiler generates a class that represents the closure for the block of code in the function, and that exposes both the IEnumerable and the IEnumerator interfaces. The effect of this compiler magic is that an instance of this iterator function will suspend (or “yield” itself and can be resumed at well-defined points.

The core of the implementation is in the MoveNext method: this is where the state machine is carefully crafted to produce the sequence of values according to the sequence of yield statements. When a yield statement is met the method MoveNext exits returning the corresponding value and the state machine keeps track of the exact state of the iterator in that point. When MoveNext is called again, afterwards, the state machine restarts the execution from the instruction that follows the last issued yield.

This implementation has one drawback: it is possible to yield only from one stack frame deep. The C# compiler state-machine can only generate the code required to transform one function, but obviously cannot do that for the entire stack. Real coroutines can yield from an arbitrarily nested call stack, and have that entire stack restored when they are resumed.

For example, C# iterators cannot be used to easily implement in-order traversal of a binary tree. The following code implements a recursive iterator. It works, but with poor performance: if the tree has N nodes, it builds O(N) temporary iterator objects, which is not ideal.

class TreeNode<T>
{
    public TreeNode<T> left;
    public TreeNode<T> right;
    public T value;

    public static IEnumerable<T> InOrder(TreeNode<T> t)
    {
        if (t != null)
        {
            foreach (var i in InOrder(t.left)) { yield return i; }
            yield return t.value;
            foreach (var i in InOrder(t.right)) { yield return i; }
        }
    }
}

// build tree
TreeNode<int> root = new TreeNode<int> { … };

// in-order traversal
var query = TreeNode<int>.InOrder(root);
foreach (int i in query) {
     Console.WriteLine("{0}", i); // only the root node will be printed!
}

We’ll see how this limitation can be overcome with an implementation of coroutines based on Fibers.

I will not delve into more implementation details, since a very detailed explanation can be found here and here. Also, it can be very interesting to look at the generated code using the Reflector  tool.

Next…

In the next posts we’ll see a possible way to reproduce in C++ the behavior of C# iterators (deferred execution, lazy evaluation, “yield” semantic and so on) using Win32 Fibers. And we’ll use iterators as the base for reimplementing LINQ-style queries in C++.

Of course, I can’t think of a way to reproduce the declarative query syntax in C++, with keywords like from, where, select. But we should be able to write code like the following, with expressions defined as the pipe of query operators:

auto source = IEnumerable::Range(0, 10);

auto it = source->Where([](int val) { return ((val % 2) == 0); })
                ->Select([](int val) -> double { return (val * val); }));

foreach(it, [](double& val){
     printf("%.2f\n", val);
});

iThanks

“I always thought, for him to die young, it seems so strange. Because other people of his magnitude, like Henry Ford and Thomas Edison, you sort of feel, like, we wrung everything out of them, they were old when they died. With Steve Jobs you really got the sense like “Ah! We’ve not done with you yet!” and there is this sense that, you know: “So what are we supposed to do now, how do we know what’s next?”. It’s sorta like an alien that comes down and gives you this new technology and then kind of show you how to use it and takes off in the spaceship and you’re like “Ahh! What’s this green button?”

Jon Stewart, “The Daily Show”, October 6th 2011.

 

Also, a moving tribute by Stephen Colbert: http://thecolbertreport.cc.com/videos/fottda/tribute-to-steve-jobs.

Thanks for everything, indeed.