Concurrency in C++11

C++11 is a much better language than its previous versions, with features like lambda expressions, initializer lists, rvalue references, automatic type deduction. The C++11 standard library now provides regular expressions, revamped smart pointers and a multi-threading library.

But modern C++ is still limited in its support of parallel and asynchronous computations, especially when compared to languages like C#.

The need for asynchrony

But why do we need support for asynchrony? Computer architectures are becoming more and more parallel and distributed, with multi-core processors nearly ubiquitous and with cores distributed in the cloud. And software applications are more and more often composed by multiple components distributed across many cores located in a single machine or across a network. Modern programming languages need to offer support to manage this parallelism.

At the same time, responsiveness has become a more and more indispensable quality of software (This is one of the tenets of “Reactive Programming”, an emerging programming paradigm).

Responsiveness means that we don’t want to block waiting for the I/O operation to complete. In the server-side we don’t want to block a worker thread while it could be used to do some other work and be alerted when an operation completes. And in the client-side we really don’t want to block in the main/GUI thread of our process, making the application sluggish and non-responsive. Being able to write asynchronous code is therefore more and more important to manage the latency of I/O operations without blocking and losing responsiveness. For example, as a rule, in WinRT all I/O-bound APIs that could take more than 50ms only provide asynchronous interfaces, and cannot even be invoked with a “classical” blocking call.

In this and in the next few posts we will have a look at what C++ currently provides to support concurrency, and what new features are on their way. We’ll see what is in the standard but also at the Windows-specific PPL framework provided by Microsoft.

A simple example

To understand how we can write asynchronous code in C++ let’s play with a very simple example. Let’s imagine that we want to write a function to read a file and copy its content into another file. To do this we could write functions like the following:

#include <string> 
#include <vector> 
#include <fstream> 
#include <iostream> 
using namespace std; 

vector<char> readFile(const string& inPath) 
{ 
    ifstream file(inPath, ios::binary | ios::ate); 
    size_t length = (size_t)file.tellg(); 
    vector<char> buffer(length); 
    file.seekg(0, std::ios::beg); 
    file.read(&buffer[0], length); 
    return buffer; 
} 

size_t writeFile(const vector<char>& buffer, const string& outPath) 
{ 
    ofstream file(outPath, ios::binary); 
    file.write(&buffer[0], buffer.size()); 
    return (size_t)file.tellp(); 
} 

With these, we can easily write a simple function to copy a file into another and return the number of characters written:

size_t sync_copyFile(const string& inFile, const string& outFile) 
{ 
    return writeFile(readFile(inFile), outFile); 
} 

Evidently we want to execute readFile and writeFile in sequence, one after the other. But should we block while waiting for them to complete? Well, this is evidently a contrived example, if the file is not very big it probably does not matter much, and if the file is very large we would rather use buffering and copy it in chunks rather than returning all its content in a big vector. But both readFile and writeFile are I/O bound and represent here just a model for more complex I/O operations; in real applications it is common to have to read some data from a network, transform it in some way and return a response or write it somewhere.

So, let’s say that we want to execute the copyFile operation asynchronously. How can we manage to do this in standard C++?

Task-based parallelism: futures and promises

The C++11 standard library provides a few mechanisms to support concurrency. The first is std::thread, which together with synchronization objects (std::mutex, std::lock_guards, std::condition_variables and so on) finally offer a portable way to write “classic” multithread-based concurrent code in C++.

We could modify copyFile to instantiate a new thread to execute the copy, and use a condition_variable to be notified when the thread completes. But working at the level of threads and locks can be quite tricky. Modern frameworks (like the TPL in .NET) provides offer a higher level of abstraction, in the form of task-based concurrency. There, a task represents an asynchronous operation that can run in parallel with other operations, and the system hides the details of how this parallelism is implemented.

The C++11 library, in its new <future> header, also provides a (somehow limited) support for task-based parallelism, in the form of promises and futures. The classes std::promise<T> and std::future<T> are roughly the C++ equivalent of a .NET Task<T>, or of a Future<T> of Java 8. They work in pairs to separate the act of calling a function from the act of waiting for the call results.

At the caller-side when we call the asynchronous function we do not receive a result of type T. What is returned instead is a std::future<T>, a placeholder for the result, which will be delivered at some point in time, later.

Once we get our future we can move on doing other work, while the task executes on a separate thread.

A std::promise<T> object represents a result in the callee-side of the asynchronous call, and it is the channel for passing the result asynchronously to the caller. When the task completes, it puts its result into a promise object calling promise::set_value.

When the caller finally needs to access the result it will call the blocking future::get() to retrieve it. If the task has already completed the result will be immediately available, otherwise, the caller thread will suspend until the result value becomes available.

In our example, this is a version of copyFile written to use futures and promises:

#include <future> 

size_t future_copyFile(const string& inFile, const string& outFile) 
{ 
    std::promise<vector<char>> prom1; 
    std::future<vector<char>> fut1 = prom1.get_future(); 
    std::thread th1([&prom1, inFile](){ 
        prom1.set_value(readFile(inFile)); 
    }); 

    std::promise<int> prom2; 
    std::future<int> fut2 = prom2.get_future(); 
    std::thread th2([&fut1, &prom2, outFile](){ 
        prom2.set_value(writeFile(fut1.get(), outFile)); 
    }); 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

Note that here we have moved the execution of the readFile and writeFile into separate tasks but we also have to configure and start threads to run them. Also, we capture references to the promise and future objects to make them available to the task functions. The first thread implements the read, and moves its result into a promise when it completes, in the form of a big vector. The second thread waits (blocking) on a corresponding future and when the read completes, get the read vector and pass it to the write function. Finally, when the writes complete, the number of chars written is put in the second future.

In the main function we could take advantage of this parallelism and do some lengthy operation before the call to future::get(). But when we call get() the main thread will still block if the read and write tasks have not completed yet.

Packaged tasks

We can slightly simplify the previous code with packaged_tasks. Class std::packaged_task<T> is a container for a task and its promise. Its template type is the type of the task function (for example, vector<char>(const string&) for our read function. It is a callable type (defines the operator()) and automatically creates and manage a std::promise<T> for us.

size_t packagedtask_copyFile(const string& inFile, const string& outFile) 
{ 
    using Task_Type_Read = vector<char>(const string&); 
    packaged_task<Task_Type_Read> pt1(readFile); 
    future<vector<char>> fut1{ pt1.get_future() }; 
    thread th1{ move(pt1), inFile }; 

    using Task_Type_Write = size_t(const string&); 
    packaged_task<Task_Type_Write> pt2([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }); 
    future<size_t> fut2{ pt2.get_future() }; 
    thread th2{ move(pt2), outFile }; 

    size_t result = fut2.get(); 
    th1.join(); 
    th2.join(); 
    return result; 
} 

Note that we need to use move() to pass the packaged_task to thread because a packaged_task cannot be copied.

std::async

With packaged_tasks the logic of the function does not change much, the code becomes slightly more readable, but we still have to manually create the threads to run the tasks, and decide on which thread the task will run.

Things become much simpler if we use the std::async() function, also provided by the library. It takes as input a lambda or functor and it returns a future that will contain the return value. This is the version of copyFile modified to use std::async():

size_t async_copyFile(const string& inFile, const string& outFile) 
{ 
    auto fut1 = async(readFile, inFile); 
    auto fut2 = async([&fut1](const string& path){ 
        return writeFile(fut1.get(), path); 
    }, 
    outFile); 

    return fut2.get(); 
} 

In a way std::async() is the equivalent of TPL task schedulers. It decides where to run the task, if a new thread needs to be created or if an old (or even the current) thread can be reused.

It is also possible to specify a launch policy, which can be either “async” (which requires to execute the task asynchronously, possibly in a different thread) or “deferred” (which asks to execute the task only at the moment when get() is called).

The nice thing is that std::async hides all the implementation, platform specific details for us. Examining the <future> header file that comes with VS2013 we can see that the Windows implementation of std::async internally uses the Parallel Patterns Library (PPL), the native equivalent of .NET TPL.

PPL

What we have seen so far is what has been standardized in C++11. It is fair to say that the design of futures and promises is still quite limited, especially if compared with what is provided by C# and .NET.

The main limitation is that in C++11 futures are not composable. If we start several tasks to execute computations in parallel, we cannot block on all the futures, waiting for any of them to complete, but only on one future at a time. Also, there is no easy way to combine a set of tasks into a sequence, which each task that consumes as input the result of the previous task. Composable tasks allow to make the whole architecture non-blocking and event-driven. We really would like to have also in C++ something like task continuations or the async/await pattern.

With the PPL (aka Concurrency Runtime) Microsoft had the possibility of overcoming the constraints of the standards and to experiment with a more sophisticated implementation of a task library.

In the PPL, class Concurrency::task<T> (defined in the <ppltasks.h> header) represents a task. A task is the equivalent of a future; it also provides the same blocking method get() to retrieve the result. The type parameter T is the return type, and the task is initialized passing a work function (either a lambda or a function pointer or a function object).

So, let’s abandon all concerns of portability for a moment and let’s re-implement our copyFile function, this time with tasks:

size_t ppl_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<vector<char>> tsk1 = Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }); 
    Concurrency::task<size_t> tsk2 = Concurrency::create_task([&tsk1, outFile]() { 
        return writeFile(tsk1.get(), outFile); 
    }); 
    return tsk2.get(); 
} 

Here we have created two task objects, initialized with two lambda expressions, for the read and write operations.

Now we really don’t have to worry about threads anymore; it’s up to the PPL schedule to decide where to run the tasks and to manage a thread pool. Note however that we are still manually coordinating the interaction of our two tasks: task2 keeps a reference to the first task, and explicitly waits for task1 to terminate before using its result. This is acceptable in a very simple example like this, but it could become quite cumbersome when we deal with more tasks and with more complicated code.

Task continuations

Unlike futures, PPL tasks support composition through continuations. The task::next method allows to add a continuation task to a task; the continuation will be invoked when its antecedent task completes and will receive the value returned by the antecedent task.

So, let’s rewrite the copyFile function again, but this time using a continuation:

size_t ppl_then_copyFile(const string& inFile, const string& outFile) 
{ 
    Concurrency::task<size_t> result =  
    Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
 
    return result.get(); 
} 

Now the code is really clean. We have split the logic of a copy function into two separate components (tasks) that can run in any thread and will be run by a task scheduler. And we have declared the logic of our function as the dependency graph of the tasks.

In this implementation the copyFile function still blocks, at the end, to get the final value, but in a real program it could just return a task that we would insert in the logic of our application, attaching to it a continuation to asynchronously handle its value. We would have code like this:

Concurrency::task<size_t> ppl_create_copyFile_task(const string& inFile, const string& outFile) 
{ 
    return Concurrency::create_task([inFile]() { 
        return readFile(inFile); 
    }).then([outFile](const vector<char>& buffer) { 
        return writeFile(buffer, outFile); 
    }); 
} 
... 
auto tCopy = ppl_create_copyFile_task(inFile, outFile).then([](size_t written) { 
    cout << written << endl; 
}); 
... 
tCopy.wait(); 

Finally, PPL tasks also provide other ways to compose tasks with the methods task::wait_all and task::wait_any, useful to manage a set of tasks that run in parallel.

Given a set of tasks, when_all creates another task that completes when all the tasks complete (so, it implements a join). Instead when_any creates a task that completes when one of the tasks in the set completes; this can be useful for example to limit the number of concurrent tasks, and start a new one only when another completes.

But this is just the tip of the PPL iceberg… the PPL offers a rich set of functionalities, almost equivalent to what is available in its managed version. It also provides scheduler classes that perform the important duty of managing a pool of worker threads and allocating tasks to threads. More details can be found here.

Towards C++17

Hopefully we could soon see some of the improvements introduced by PPL in the C++ standard. There is already a document (N3857) written by Niklas Gustaffson et al. that proposes a few changes to the library. In particular, the idea is to enable the composability of future with future::then, future::when_any and future::when_all, with the same semantic seen in the PPL. The previous example, with the new futures, would be rewritten in a portable-way like this:

future<size_t> future_then_copyFile(const string& inFile, const string& outFile)
{
	return async([inFile]() {
		return readFile(inFile);
	}).then([outFile](const vector<char>& buffer) {
		return writeFile(buffer, outFile);
	});
}

There would also be a method future::is_ready to test the future for completion before calling the blocking get(), and future::unwrap to manage nested futures (futures that returns future); of course all the details of this proposal can be found in the document above.

Would this be the perfect solution? Not quite. Out experience with .NET teaches us that task-based asynchronous code is still hard to write, debug, and maintain. Sometimes very hard. This is the reason why new keywords were added to the C# language itself, in the version 5.0, to more easily manage asynchrony through the Async/Await pattern.
Is there anything similar brewing even for the unmanaged world of C++?

7 thoughts on “Concurrency in C++11

  1. Hi Cesar. Thanks!!! Yes, I am playing with the new resumable functions, they are much more efficient. I hope to write a new post soon… 🙂

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s