Async-Await in C++

In the previous post we had a quick look at the problem of writing concurrent, asynchronous code with C++. We saw that the <future> library, introduced with C++11, offers a partial solution to this problem by separating the act of initiating an operation from the act of waiting for its result. But we saw that the act of waiting for a result is still synchronous, and that std:futures are not easily composable.

We talked of the platform-specific improvements offered by Microsoft’s PPL tasks, which have become the model for a few interesting changes proposed to the C++ standard library.

But are these improvements enough?

The problems with Tasks

Having composable futures would certainly be a big improvement for C++. But the experience with C# tells us that it’s not a perfect solution. A purely library-based solution is bound to have limitations. In particular, .NET’s TPL has the defect of making the code overly complex.

It is difficult to write robust, readable and debuggable code with TPL. It is too easy to end up with spaghetti code: a large number of Task<T>s interconnected in inextricable chains of continuations.

I found out the hard way, when I was given, by my last employer, a .NET component to maintain which had been architected as an incredibly convoluted graph of interconnected TPL Tasks, often joined with conditional ContinueWith().

Loops and conditionals

It is particularly difficult to write iterations and branches with tasks.

In my previous post I used as example a copyFile function that asynchronously read all the file content into a single string and copied it into another file. Of course such an implementation would not be practical with very large files, so let’s rewrite it so that it will read and write a file chunk by chunk.

Here I am using the PPL, but very similar code could be written with std::future (especially when they will support task continuations with future::then).

We can easily write an iterative function that use tasks to read and write asynchronously every chunk of a file, but that still blocks waiting for these tasks to complete:

Concurrency::task<string> readFileChunk(ifstream& file, int chunkLength)
{
    return Concurrency::create_task([&file, chunkLength](){
        vector<char> buffer(chunkLength);
        file.read(&buffer[0], chunkLength);
        return string(&buffer[0], (unsigned int)file.gcount());
    });
}

Concurrency::task<void> writeFileChunk(ofstream& file, const string& chunk)
{
    return Concurrency::create_task([&file, chunk](){
        file.write(chunk.c_str(), chunk.length());
    });
}

void copyFile_get(const string& inFilePath, const string& outFilePath)
{
    ifstream inFile(inFilePath, ios::binary | ios::ate);
    inFile.seekg(0, inFile.beg);
    ofstream outFile(outFilePath, ios::binary);

    string chunk;
    while (chunk = readFileChunk(inFile, 4096).get(), !chunk.empty()) {
        writeFileChunk(outFile, chunk).get();
    }
}

The problem is that this code is not much better than a purely synchronous loop of blocking calls to read and write the file; if our thread schedules asynchronous operations but then immediately blocks waiting for them to complete, we cannot really do much else concurrently.

It would be better to chain a sequence of tasks that read and write the file chunk by chunk, so that they can be scheduled one after the other leaving our thread free to do other work while these tasks are blocked in some I/O operation.

It turns out that writing a “task loop” correctly is not trivial, especially if there are many possible error cases or exceptions to handle. It is actually so tricky that there are MSDN pages that explains what to do, and what the possible pitfalls are, and the PPL Asynchronous Sample Pack (PPL Asynchronous Sample Pack) even provides sample code for this.

The code I put together is neither clean nor concise:

task<shared_ptr<string>> readFileChunk(shared_ptr<ifstream> file, int chunkLength)
{
    return create_task([file, chunkLength]( {
        vector<char> buffer(chunkLength);
        file->read(&buffer[0], chunkLength);
        return make_shared<string>(&buffer[0], (unsigned int)file->gcount());
    });
}

task<bool> writeFileChunk(shared_ptr<ofstream> file, shared_ptr<string> chunk)
{
    return create_task([file, chunk](){
        file->write(chunk->c_str(), chunk->length());
        return chunk->length() == 0;
    });
}

task<void> copyFile_repeat(shared_ptr<ifstream> inFile, shared_ptr<ofstream> outFile)
{
    return readFileChunk(inFile, 4096)
        .then([=](shared_ptr<string> chunk){
            return writeFileChunk(outFile, chunk);
        })
        .then([=](bool eof){
            if (!eof) {
                return copyFile_repeat(inFile, outFile);
            }
            else {
                return task_from_result();
            }
        });
}

task<void> copyFile_then(const string& inFilePath, const string& outFilePath)
{
    auto inFile = make_shared<ifstream>(inFilePath, ios::binary | ios::ate);
    inFile->seekg(0, inFile->beg);
    auto outFile = make_shared<ofstream>(outFilePath, ios::binary);

    return copyFile_repeat(inFile, outFile);
}

It is probably easy to do better, to write cleaner code that executes in a loop a simple continuation chain like this one, but it is also quite evident that when the algorithm becomes more complicated the asynchronous code can become very convoluted.

Debugging

And then there is the problem of debugging: if we set a breakpoint in the file->write(…) line, for example, when the execution stops there the debugger will present us with a quite strange call stack. With continuations we lose the usual “causality chain” of return stacks, typical of synchronous code. Here the lambdas that compose the body of the tasks are scattered in the call stack, and distributed across several threads and sometimes it can be difficult to understand what is going on.

Image

Image

The async-await pattern

The complexity of working with Tasks is well known to C# programmers, who have already had a few years to experiment and gain experience with the .NET TPL. This is the reason why the biggest improvement of C# 5.0 was the introduction of the Async/Await pattern, designed on the model of F# asynchronous workflows.

To see how it works, let’s start by writing in C# an iterative but blocking implementation of our CopyFile function. This can be done very easily:

        private void CopyChunk(Stream input, Stream output)
        {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = input.Read(buffer, 0, buffer.Length)) != 0)
            {
                output.Write(buffer, 0, bytesRead);
            }
        }

        public void CopyFile(string inFile, string outFile)
        {
            using (StreamReader sr = new StreamReader(inFile)
            {
                using (StreamWriter sw = new StreamWriter(outFile)
                {
                    CopyChunk(sr.BaseStream, sw.BaseStream);
                }
            }
        }

Let’s say that we want now to make the function completely asynchronous. Ignoring the fact that the framework actually provides a Stream.CopyToAsync, we can use other asynchronous .NET APIs, like Stream.ReadAsync and Stream.WriteAsync that return a Task, together with async/await, and write this:

        private async Task CopyChunk_Async(Stream input, Stream output)
        {
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = await input.ReadAsync(buffer, 0, buffer.Length)) != 0)
            {
                await output.WriteAsync(buffer, 0, bytesRead);
            }
        }

        public async Task CopyFile_Async(string inFile, string outFile)
        {
            using (StreamReader sr = new StreamReader(inFile)
            {
                using (StreamWriter sw = new StreamWriter(outFile))
                {
                   await CopyChunk_Async(sr.BaseStream, sw.BaseStream);
                }
            }
        }

Now, this is really beautiful! We did only a few small changes to the structure of the code, adding the async modifier and changing the return type of our methods, and adding a strange await keyword in the places where we call asynchronous functions. The logic of the function has not changed at all, and yet, it now executes in a completely different way. Let’s see how.

In C# a method can be declared as asynchronous by adding the async modifier to the method declaration. For example:

public static async Task<int> FooAsync() {…}

An asynchronous method is like a regular C# method, but its return type can only be of type void, Task or Task<T>, and it can also contain await expressions, like:

int result = await FooAsync();

Awaiting on an expression means launching the asynchronous execution of that expression (usually an async method) and, if it has not yet completed, immediately yielding the execution to the caller.

So asynchronous methods are, like iterator methods, a kind of coroutine. They are resumable, their execution can “magically” pause in a few points identified by the ‘await’ statements, and then “magically” resume when the asynchronous expression has completed. While the method is paused waiting for the completion its caller continues to run. This means that if the method was called from an UI message loop, for example, the UI thread can continue processing messages, and the application will remain responsive. If the method was called by a worker thread in a web server the thread will be free to serve more requests.

But just like iterator methods, in C# asynchronous methods are not really coroutines. They are instead implemented with a state machine. The details are quite complicated (a very good description can be found here and here), but what happens is that the compiler transforms the function body by attaching a task continuation to the asynchronous operation so that the execution will resume from the point where it had left.

In which thread the awaitable expression executes, and in which thread the async method will resume? It’s up to the task scheduler to decide; asynchrony does not mean parallelism, a method does not need to run on another thread to be asynchronous. The scheduler may run the task later in the same thread, or it may run it on a worker thread from the ThreadPool. The execution will then resume on the right thread (for example the UI thread in a GUI app) according to the current synchronization context. For the point of view of the developer, this means that he cannot rely on the fact that an async method will continue its execution on the same thread where it started (with all the consequent multithreading headaches).

C++ resumable functions

So, everything is nice in the managed world of C#, but what about native programming? Is there anything like async/await that we can use with our futures? We can find the answer in N3858, another proposal made by Gustafsson et al. for C++17.

This time the changes proposed are to the language itself and not just to the library. The idea is to introduce the equivalent of C# async methods in the form of resumable functions. They can be thought as the basis to add to C++ the support for real co-routines, and are not strictly related to the <future> library, even though they have being defined especially to improve the usability of futures and promises.

Resumable functions would have almost exactly the same semantic of async/await in C#: two new keywords would be added to the language:

–        The resumable keyword, added after the argument list, defines a function as being suspendable and resumable, in the way described before. It is the equivalent of C#’s async.

–        Like in C#, the await keyword in the body of a resumable function identifies a suspension point, one of the places where the function may suspend execution and then resume it later (possibly in a different thread) when some asynchronous operation completes.

For example:

        future<int> abs_r(future<int> i_op) resumable
        {
            int i = await i_op;
            return (i < 0) ? –i : i;
        }

This would define a resumable function that takes a future<int> as argument, suspends itself waiting for the future result to be available, and produces an integer as result. Of course, a resumable function can only return a future<T> or shared_future<T>. It completes and produces its final value only when a return statement is executed.

Visual Studio 2013 CTP

Will this proposal be accepted? Will resumable functions become part of the standard? And is there a portable way to implement them?

I really cannot answer any of these questions. But, interestingly, it is already possible to play with resumable functions today. Last November Microsoft shipped the Visual Studio November 2013 CTP with a preview of several C++ features that will be released in the next version of Visual Studio and that improve the conformance to the C++ standard. Between those, there is also the first implementation of resumable functions.

To enable them we must change the “Platform Toolset” of our C++ project to use the updated compiler from the CTP, as shown in figure. Also, we need to include the new header <pplawait.h> which extends the PPL <ppltasks.h>.

Image

The syntax of CTP’s resumable functions is very similar to the one proposed by Gustafsson, with a small change in the keywords. In order not to “taint” the language with (still) non-standard keywords, async has been renamed as __resumable and await as __await.

So, we can now rewrite one last time our copyFile method, this time using the CTP’s resumable functions:

#include <pplawait.h>

task<void> copyFile_res(const string inFilePath, const string outFilePath) __resumable
{
	auto inFile = make_shared<ifstream>(inFilePath, ios::binary | ios::ate);
	inFile->seekg(0, inFile->beg);
	auto outFile = make_shared<ofstream>(outFilePath, ios::binary);

	while (true)
	{
		shared_ptr<string> s = __await readFileChunk(inFile, 4096);
		if (s->empty()) { break; }
		__await writeFileChunk(outFile, s);
	}
}

int _tmain(int argc, _TCHAR* argv[])
{
    …
    copyFile_res(file1, file2).get();
}

We got there! Finally we have simple, readable, maintainable asynchronous C++ code. The only small complication is that it is necessary to pay a lot of attention to the life scope of the objects passed to the asynchronous/resumable methods. In C# things are easier, but we don’t have a garbage collector here, so it is often better to wrap the objects in shared_ptr<T>s to make sure they don’t get destroyed when they are still referenced by some async task. A small tribute to pay to the Gods of asynchrony, anyway.

A kind of magic…

But how would resumable functions work, in practice? To learn more about this it is interesting to study the <pplawait.h> header and to watch the interesting Channel 9 talk of Deon Brewis, from last year’s GoingNative conference.

Gustafsson’s proposal describes two possible solutions:

  1. With a state machine, like in C#.
  2. With resumable side-stacks (aka Fibers).

The first implementation would be similar to how async/await is implemented in C#. The compiler would transform the code of the function, implementing the logic of a state machine that keeps track of the current state of the function. The implementation would be more complicated than in C# because C++ does not have garbage collection; the function variables could not live in the stack (because the stack frame would be lost when the function is suspended), so they would have to be stored in an heap-allocated activation frame.

But the second way of implementing resumable function seems more elegant: each function would have its own side stack, a stack separated by the thread stack and allocated by the compiler when the resumable function is invoked, and destroyed when the resumable function completes. What the proposal describes is actually to using something identical to Fibers, in Windows.

The very, very surprising thing is, in my opinion, that Microsoft chose the second option, the fibers, to implement resumable functions in the CTP.

Fibers

Now, this is curious: a couple of years ago I wrote a few posts about a possible way to write C#-like iterator blocks and LINQ operators in C++ using Fibers.

Fibers are an almost forgotten, old feature added to Windows NT to support cooperative multitasking, in the form of lightweight threads that must be manually scheduled by the application. If I am not wrong, they were added to Windows to improve the performance of the first versions of SQL Server.

My implementation of iterator blocks was very questionable for many reasons (for example, I just copied the IEnumerator/IEnumerable interfaces from .NET rather than writing something that could work better with C++ STL) but I thought that the weakest point was probably the use of fibers.

In fact it is difficult to write correct code with fibers; the entire program must be designed to support them. During the years, many things in Windows have grown increasingly incompatible with them. The whole .NET does not work with Fibers, and an attempt to fix this proved so complicated that was soon abandoned.

Evidently this is not necessarily a problem if fibers are used in the limited scope of supporting native resumable functions, and under the direct control of our compiler. But even so, the functions and libraries that we call from our resumable function must be fiber-safe (in addition to being thread-safe); for example thread-local storage does not work with fibers. The CRT itself was not completely fiber-safe in the past, but I guess this should have been fixed by now. And there is the problem of managing C++ exceptions: in the past they could only be caught by the fiber that threw them, they could not pass across fibers’ boundaries, but even this problem can be solved by the compiler or library.

Also, I wonder whether there is a portable way to implement side-stacks also on non-Windows platforms, and if some support from the OS will necessarily be required.

Under the hood

But let’s look in more detail at the CTP code, to learn how resumable functions actually work. The implementation is split between the compiler and the library code in <pplawait.h>.

The code of <pplawait.h> is organized in two main classes:

template <typename TTask, typename TFunc, typename TTaskType>
class __resumable_func_setup_data
{
    __resumable_func_fiber_data* _func_data;
    LPVOID _pSystemFiber;
    LPVOID _pvLambda; // The function to execute
    task_completion_event<TTaskType> _tce;
    [...]
};
struct __resumable_func_fiber_data
{
    LPVOID threadFiber;         // The thread fiber to which to return.
    LPVOID resumableFuncFiber;  // The fiber on which this function will run
    volatile LONG refcount;     // Refcount of the fiber
    [...]
};

The first, __resumable_func_setup_data, contains the data and logic to initialize a resumable method call, and it’s templatized on the function type.

The second, __resumable_func_fiber_data, contains the data and methods to manage the life of a resumable method that runs on a fiber.

When a function is declared as resumable, like:

    task<int> foo(future<int> fi) __resumable

the compiler generates the code for a stub function foo_resumable.

When a resumable function is called, the compiler generates a call to this stub function, which takes care of executing foo into a new “resumable context”. The code for this is in a static function:

    TTask __resumable_func_setup_data::RunActionInResumableContext(TFunc& lambda)

which shows how a Fiber can be used to seamlessly suspend and resume a function:

  1. The first thing to do is to convert the caller thread into a fiber, by calling __resumable_func_fiber_data::ConvertCurrentThreadToFiber(). When the function completes it will convert the fiber back into a normal thread with ConvertFiberToThread.
  2. Then, a __resumable_func_setup_data struct is initialized with the data required to invoke the function on a fiber.
  3. After this, a new fibers is created, specifying as entry point __resumable_func_setup_data.ResumableFuncFiberProc. Since creating and destroying a fiber is a relatively expensive operation, the library also implements a class FiberPool, to manage a pool of Fiber, which will be reused when needed.
  4. Now the current fiber can suspend itself and start the execution of this new fiber by calling SwitchToFiber.

Calling SwitchToFiber means manually doing a context switch (or better, a fiber switch). The instruction pointer of our thread suddenly jumps to a different place, using a different stack, and all the registers are updated. But the old context and stack is not lost: it is part of the previous fiber. It is just not running, because it is not associated to any thread, but it can be resumed in any moment.

For example, the following figure shows what happens when the resumable copyFile_res is called in our copyFile program: first the current thread (Thread A) is converted into a fiber (Fiber 0), then a new fiber (Fiber 1) is created to run copyFile_res, and the thread execution is switched to the new Fiber. At that point only Fiber 1 is running. Fiber 0 still exists, with its stack, but it’s suspended.

Image

When the ResumableFuncFiberProc starts executing in the context of the new fiber, it:

  1. Retrieves a pointer to the __resumable_func_setup_data from its parameters,
  2. Initializes a new instance of __resumable_func_fiber_data, allocated on the thread stack, with all the data required to manage the function (like the handle of the caller-fiber), and
  3. Finally executes the resumable function in the context of the fiber.

Now we are executing our functor in the context of the fiber. We can leave the function in two ways:

–        We can complete the function (with a return statement), and return the final result, or

–        We can temporarily suspend the function on an ­­__await expression and yield the execution to the caller.

In the first case, all we need to do is to complete the ResumableFuncFiberProc, store the result and call SwitchToFiber(previousFiber) to resume the previous fiber. The “terminated” fiber is now ready to be deleted or recycled.

Things get more interesting when we have an await expression, like:

    shared_ptr<string> s = __await readFileChunk(inFile, 4096);

In this case the compiler converts the __await into a call to the library function

    template <typename T> T await_task(concurrency::task<T> func)

where func is a task that wraps the awaitable expression that is being called.

await_task implements all the logic to suspend the current function/fiber and to yield the control back to the previous (caller) fiber, so resuming it.

But before yielding the execution, await_task attaches a continuation to the task func. When that task will complete, we want to resume the current fiber, and to do this the continuation needs to retrieve and update the func_data_on_fiber_stack struct and to call SwitchToFiber once again.

The actual details are a little complicated, but the whole code in <pplawait.h> is not too difficult to read.

Changes to the Windows Fiber API

The last interesting thing to note is that the new <pplawait.h> library makes use of a new and slightly improved version of the Win32 Fiber API, defined for Windows 8.1 (where _WIN32_WINNT >= 0x0603) but actually available also on Windows 7 (at least with the most recently patched version of kernel32.dll).

The original APIs, available since WinNT 4.0, provided the following methods:

LPVOID WINAPI CreateFiber(
    _In_     SIZE_T dwStackSize,
    _In_     LPFIBER_START_ROUTINE lpStartAddress,
    _In_opt_ LPVOID lpParameter);
LPVOID WINAPI ConvertThreadToFiber(_In_opt_ LPVOID lpParameter);
VOID WINAPI DeleteFiber(_In_ LPVOID lpFiber);
BOOL WINAPI ConvertFiberToThread();
VOID WINAPI SwitchToFiber(_In_ LPVOID lpFiber);

The first two of these methods have now been superseded by an extended version:

LPVOID WINAPI CreateFiberEx(
    _In_     SIZE_T dwStackCommitSize,
    _In_     SIZE_T dwStackReserveSize,
    _In_     DWORD dwFlags,
    _In_     LPFIBER_START_ROUTINE lpStartAddress,
    _In_opt_ LPVOID lpParameter);
LPVOID WINAPI ConvertThreadToFiberEx(
    _In_opt_ LPVOID lpParameter,
    _In_     DWORD dwFlags);

The difference is that a Fiber can now be created specifying different values for the CommitSize and the ReserveSize of the Fiber stack.Here ReserveSize represents the total stack allocation in virtual memory, while CommitSize is the initially committed memory; it can be useful to fine-tunes these values optimize the memory usage. In fact, programs with a large number of concurrent resumable operations could end up allocating a very large amount of memory for the fiber stacks, since a different Fiber must be allocated for each resumable function.

There is also a dwFlags argument that can take the only possible value FIBER_FLAG_FLOAT_SWITCH and that it’s necessary to overcome an old limitation of the Fiber API, which in the past did not save the floating point registers in a Fiber-switch.

Finally, there is a new function, still undocumented but already used by the CTP library:

PVOID WINAPI CalloutOnFiberStack(
    _In_      PVOID lpFiber,
    _In_      PFIBER_CALLOUT_ROUTINE lpStartAddress,
    _In_opt_  PVOID lpParameter);

It is not difficult to imagine that its purpose is to execute a function on the Fiber specified and return the result to the caller.

Generator functions

We have almost completed a quick review of asynchrony in C++. But there is only one last thing to consider before finishing: asynchronous methods are very similar to iterator methods.

They are, both, a kind of co-routine. We can think of an async method as a strange kind of iterator, which returns only one value, at the end, but which is able to suspend its execution as iterators do after yielding each value. Or we could think of iterator blocks as strange kinds of async methods, because they can suspend and resume execution but only returning a value to the caller.

In C# the compiler implements both iterator methods and asynchronous methods transforming the code of the method into a state machine. We have seen that in C++ the proposal is to implement async methods as real co-routines, using side-stacks. So, the logical question to ask is “Can we use resumable functions to implement C#-style iterators”?

Of course we can! The same proposal N3858 explains that the concept of resumable functions can be expanded to include generator functions, which would use the same kind of interruption/resumption logic to produce a sequence of values.

Gustafsson defines a new special container, sequence<T>, to represent a sequence of values lazily generated by a generator. With this, it should be possible to write LINQ-like code, like the following:

template <typename Iter>
sequence<int> lazy_tform(Iter beg, Iter end, std::function<int(int)> func) resumable
{
    for (auto it = beg; it != end; ++it) {
        yield func(*it);
    }
}

But this will be the topic for a future post. Stay tuned! 🙂

Concurrency in C++11

C++11 is a much better language than its previous versions, with features like lambda expressions, initializer lists, rvalue references, automatic type deduction. The C++11 standard library now provides regular expressions, revamped smart pointers and a multi-threading library.

But modern C++ is still limited in its support of parallel and asynchronous computations, especially when compared to languages like C#.

The need for asynchrony

But why do we need support for asynchrony? Computer architectures are becoming more and more parallel and distributed, with multi-core processors nearly ubiquitous and with cores distributed in the cloud. And software applications are more and more often composed by multiple components distributed across many cores located in a single machine or across a network. Modern programming languages need to offer support to manage this parallelism.

At the same time, responsiveness has become a more and more indispensable quality of software (This is one of the tenets of “Reactive Programming”, an emerging programming paradigm).

Responsiveness means that we don’t want to block waiting for the I/O operation to complete. In the server-side we don’t want to block a worker thread while it could be used to do some other work and be alerted when an operation completes. And in the client-side we really don’t want to block in the main/GUI thread of our process, making the application sluggish and non-responsive. Being able to write asynchronous code is therefore more and more important to manage the latency of I/O operations without blocking and losing responsiveness. For example, as a rule, in WinRT all I/O-bound APIs that could take more than 50ms only provide asynchronous interfaces, and cannot even be invoked with a “classical” blocking call.

In this and in the next few posts we will have a look at what C++ currently provides to support concurrency, and what new features are on their way. We’ll see what is in the standard but also at the Windows-specific PPL framework provided by Microsoft.

A simple example

To understand how we can write asynchronous code in C++ let’s play with a very simple example. Let’s imagine that we want to write a function to read a file and copy its content into another file. To do this we could write functions like the following:

#include <string> 
#include <vector> 
#include <fstream> 
#include <iostream> 
using namespace std; 

vector<char> readFile(const string& inPath) 
{ 
    ifstream file(inPath, ios::binary | ios::ate); 
    size_t length = (size_t)file.tellg(); 
    vector<char> buffer(length); 
    file.seekg(0, std::ios::beg); 
    file.read(&buffer[0], length); 
    return buffer; 
} 

size_t writeFile(const vector<char>& buffer, const string& outPath) 
{ 
    ofstream file(outPath, ios::binary); 
    file.write(&buffer[0], buffer.size()); 
    return (size_t)file.tellp(); 
} 

With these, we can easily write a simple function to copy a file into another and return the number of characters written:

size_t sync_copyFile(const string& inFile, const string& outFile) 
{ 
    return writeFile(readFile(inFile), outFile); 
} 

Evidently we want to execute readFile and writeFile in sequence, one after the other. But should we block while waiting for them to complete? Well, this is evidently a contrived example, if the file is not very big it probably does not matter much, and if the file is very large we would rather use buffering and copy it in chunks rather than returning all its content in a big vector. But both readFile and writeFile are I/O bound and represent here just a model for more complex I/O operations; in real applications it is common to have to read some data from a network, transform it in some way and return a response or write it somewhere.

So, let’s say that we want to execute the copyFile operation asynchronously. How can we manage to do this in standard C++?

Task-based parallelism: futures and promises

The C++11 standard library provides a few mechanisms to support concurrency. The first is std::thread, which together with synchronization objects (std::mutex, std::lock_guards, std::condition_variables and so on) finally offer a portable way to write “classic” multithread-based concurrent code in C++.

We could modify copyFile to instantiate a new thread to execute the copy, and use a condition_variable to be notified when the thread completes. But working at the level of threads and locks can be quite tricky. Modern frameworks (like the TPL in .NET) provides offer a higher level of abstraction, in the form of task-based concurrency. There, a task represents an asynchronous operation that can run in parallel with other operations, and the system hides the details of how this parallelism is implemented.

The C++11 library, in its new <future> header, also provides a (somehow limited) support for task-based parallelism, in the form of promises and futures. The classes std::promise<T> and std::future<T> are roughly the C++ equivalent of a .NET Task<T>, or of a Future<T> of Java 8. They work in pairs to separate the act of calling a function from the act of waiting for the call results.

At the caller-side when we call the asynchronous function we do not receive a result of type T. What is returned instead is a std::future<T>, a placeholder for the result, which will be delivered at some point in time, later.

Once we get our future we can move on doing other work, while the task executes on a separate thread.

A std::promise<T> object represents a result in the callee-side of the asynchronous call, and it is the channel for passing the result asynchronously to the caller. When the task completes, it puts its result into a promise object calling promise::set_value.

When the caller finally needs to access the result it will call the blocking future::get() to retrieve it. If the task has already completed the result will be immediately available, otherwise, the caller thread will suspend until the result value becomes available.

In our example, this is a version of copyFile written to use futures and promises:

#include <future> 

size_t future_copyFile(const string& inFile, const string& outFile) 
{ 
    std::promise<vector<char>> prom1; 
    std::future<vector<char>> fut1 = prom1.get_future(); 
    std::thread th1([&prom1, inFile](){ 
        prom1.set_value(readFile(inFile)); 
    }); 

    std::promise<int> prom2; 
    std::future<int> fut2 = prom2.get_future(); 
    std::thread th2([&fut1, &prom2, outFile](){ 
        prom2.set_value(writeFile(fut1.get(), outFile)); 
    }); 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

Note that here we have moved the execution of the readFile and writeFile into separate tasks but we also have to configure and start threads to run them. Also, we capture references to the promise and future objects to make them available to the task functions. The first thread implements the read, and moves its result into a promise when it completes, in the form of a big vector. The second thread waits (blocking) on a corresponding future and when the read completes, get the read vector and pass it to the write function. Finally, when the writes complete, the number of chars written is put in the second future.

In the main function we could take advantage of this parallelism and do some lengthy operation before the call to future::get(). But when we call get() the main thread will still block if the read and write tasks have not completed yet.

Packaged tasks

We can slightly simplify the previous code with packaged_tasks. Class std::packaged_task<T> is a container for a task and its promise. Its template type is the type of the task function (for example, vector<char>(const string&) for our read function. It is a callable type (defines the operator()) and automatically creates and manage a std::promise<T> for us.

size_t packagedtask_copyFile(const string& inFile, const string& outFile) 
{ 
    using Task_Type_Read = vector<char>(const string&); 
    packaged_task<Task_Type_Read> pt1(readFile); 
    future<vector<char>> fut1{ pt1.get_future() }; 
    thread th1{ move(pt1), inFile }; 

    using Task_Type_Write = size_t(const string&); 
    packaged_task<Task_Type_Write> pt2([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }); 
    future<size_t> fut2{ pt2.get_future() }; 
    thread th2{ move(pt2), outFile }; 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

Note that we need to use move() to pass the packaged_task to thread because a packaged_task cannot be copied.

std::async

With packaged_tasks the logic of the function does not change much, the code becomes slightly more readable, but we still have to manually create the threads to run the tasks, and decide on which thread the task will run.

Things become much simpler if we use the std::async() function, also provided by the library. It takes as input a lambda or functor and it returns a future that will contain the return value. This is the version of copyFile modified to use std::async():

size_t async_copyFile(const string& inFile, const string& outFile) 
{ 
    auto fut1 = async(readFile, inFile); 
    auto fut2 = async([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }, 
    outFile); 

    return fut2.get(); 
} 

In a way std::async() is the equivalent of TPL task schedulers. It decides where to run the task, if a new thread needs to be created or if an old (or even the current) thread can be reused.

It is also possible to specify a launch policy, which can be either “async” (which requires to execute the task asynchronously, possibly in a different thread) or “deferred” (which asks to execute the task only at the moment when get() is called).

The nice thing is that std::async hides all the implementation, platform specific details for us. Examining the <future> header file that comes with VS2013 we can see that the Windows implementation of std::async internally uses the Parallel Patterns Library (PPL), the native equivalent of .NET TPL.

PPL

What we have seen so far is what has been standardized in C++11. It is fair to say that the design of futures and promises is still quite limited, especially if compared with what is provided by C# and .NET.

The main limitation is that in C++11 futures are not composable. If we start several tasks to execute computations in parallel, we cannot block on all the futures, waiting for any of them to complete, but only on one future at a time. Also, there is no easy way to combine a set of tasks into a sequence, which each task that consumes as input the result of the previous task. Composable tasks allow to make the whole architecture non-blocking and event-driven. We really would like to have also in C++ something like task continuations or the async/await pattern.

With the PPL (aka Concurrency Runtime) Microsoft had the possibility of overcoming the constraints of the standards and to experiment with a more sophisticated implementation of a task library.

In the PPL, class Concurrency::task<T> (defined in the <ppltasks.h> header) represents a task. A task is the equivalent of a future; it also provides the same blocking method get() to retrieve the result. The type parameter T is the return type, and the task is initialized passing a work function (either a lambda or a function pointer or a function object).

So, let’s abandon all concerns of portability for a moment and let’s re-implement our copyFile function, this time with tasks:

size_t ppl_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<vector<char>> tsk1 = Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }); 
    Concurrency::task<size_t> tsk2 = Concurrency::create_task([&tsk1, outFile]() { 
        return writeFile(tsk1.get(), outFile); 
    }); 
    return tsk2.get(); 
} 

Here we have created two task objects, initialized with two lambda expressions, for the read and write operations.

Now we really don’t have to worry about threads anymore; it’s up to the PPL schedule to decide where to run the tasks and to manage a thread pool. Note however that we are still manually coordinating the interaction of our two tasks: task2 keeps a reference to the first task, and explicitly waits for task1 to terminate before using its result. This is acceptable in a very simple example like this, but it could become quite cumbersome when we deal with more tasks and with more complicated code.

Task continuations

Unlike futures, PPL tasks support composition through continuations. The task::next method allows to add a continuation task to a task; the continuation will be invoked when its antecedent task completes and will receive the value returned by the antecedent task.

So, let’s rewrite the copyFile function again, but this time using a continuation:

size_t ppl_then_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<size_t> result =  
    Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
 
    return result.get(); 
} 

Now the code is really clean. We have split the logic of a copy function into two separate components (tasks) that can run in any thread and will be run by a task scheduler. And we have declared the logic of our function as the dependency graph of the tasks.

In this implementation the copyFile function still blocks, at the end, to get the final value, but in a real program it could just return a task that we would insert in the logic of our application, attaching to it a continuation to asynchronously handle its value. We would have code like this:

Concurrency::task<size_t> ppl_create_copyFile_task(const string& inFile, const string& outFile) 
{ 
    return Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
} 
... 
auto tCopy = ppl_create_copyFile_task(inFile, outFile).then([](size_t written) { 
    cout << written << endl; 
}); 
... 
tCopy.wait(); 

Finally, PPL tasks also provide other ways to compose tasks with the methods task::wait_all and task::wait_any, useful to manage a set of tasks that run in parallel.

Given a set of tasks, when_all creates another task that completes when all the tasks complete (so, it implements a join). Instead when_any creates a task that completes when one of the tasks in the set completes; this can be useful for example to limit the number of concurrent tasks, and start a new one only when another completes.

But this is just the tip of the PPL iceberg… the PPL offers a rich set of functionalities, almost equivalent to what is available in its managed version. It also provides scheduler classes that perform the important duty of managing a pool of worker threads and allocating tasks to threads. More details can be found here.

Towards C++17

Hopefully we could soon see some of the improvements introduced by PPL in the C++ standard. There is already a document (N3857) written by Niklas Gustaffson et al. that proposes a few changes to the library. In particular, the idea is to enable the composability of future with future::then, future::when_any and future::when_all, with the same semantic seen in the PPL. The previous example, with the new futures, would be rewritten in a portable-way like this:

future<size_t> future_then_copyFile(const string& inFile, const string& outFile)
{
	return async([inFile]() {
		return readFile(inFile);
	}).then([outFile](const vector<char>& buffer) {
		return writeFile(buffer, outFile);
	});
}

There would also be a method future::is_ready to test the future for completion before calling the blocking get(), and future::unwrap to manage nested futures (futures that returns future); of course all the details of this proposal can be found in the document above.

Would this be the perfect solution? Not quite. Out experience with .NET teaches us that task-based asynchronous code is still hard to write, debug, and maintain. Sometimes very hard. This is the reason why new keywords were added to the C# language itself, in the version 5.0, to more easily manage asynchrony through the Async/Await pattern.
Is there anything similar brewing even for the unmanaged world of C++?