Question

I am trying to do a recursive directory listing using a multi threaded approach. The following code works fine when replacing the async calls as a normal single threaded recursive function call but when implemented with async, the recursively started threads all seem to terminate when the initial async call made from main finishes as the output shows several calls to the function starting but the only directory where all of the files are output is the initial one and "Finished" is only output once although "Started" is output several times and some other directory's files are also output. I suspect I am missing something fundamental. Can anyone explain what is wrong with this code?

#include <filesystem>
#include <future>
#include <functional>
#include <concurrent_vector.h>
#include <concurrent_queue.h>
#include <iostream>

using namespace std;
using namespace std::tr2::sys;
using namespace concurrency;

concurrent_vector<future<void>> taskList;

void searchFiles(wstring path, concurrent_queue<wstring>& fileList)
{
    wcout << L"Started " << path << endl;
    wdirectory_iterator directoryIterator(path);
    wdirectory_iterator endDirectory;
    for( ; directoryIterator != endDirectory; ++directoryIterator)
    {
        wcout << path + L"/" + (wstring)directoryIterator->path() << endl;
        if ( is_directory(directoryIterator->status() ) )
        {
            taskList.push_back( async( launch::async, searchFiles, path + 
            L"/" + (wstring)directoryIterator->path(), ref(fileList) ));
        }
        else
        {
            fileList.push( path + L"/" + (wstring)directoryIterator->path() );
        }
    }
    wcout << L"Finished " << path <<  endl;
}

int main()
{
    concurrent_queue<wstring> fileList;
    wstring path = L"..";
    taskList.push_back( async( launch::async, searchFiles, path, ref(fileList) ));
    for (auto &x: taskList)
        x.wait();
} 

BTW some might ask why I am not using wrecursive_directory_iterator. Apparently wrecursive_directory_iterator will throw an exception and stop with no way to continue if you do not have read permission so this method should allow you to continue on in that case.

Was it helpful?

Solution

The problem is the range-based for loop.

If we take a look at how the range-based for statement is defined, we see that the end-iterator of the loop will only be calculated once. At the time of entering the loop, there is probably(this is a race) only one future in your vector(the one you pushed back in the line above). Thus after that task finishes, the iterator will be incremented and be equal to your old end-iterator and the loop will finish even though the vector might now contain more elements which got pushed back in your first task. There are even more problems to this.

The destructor of the vector which will be called after finishing the loop should normally call the destructor of all its elements which for a future from std::async would be equal to calling wait, though you are still adding elements to the vector while it's already in its destructor, which is probably UB.

Another point is that the end-iterator you created on entering the for-loop will be invalidated as soon as you push_back to your vector in your first thread, this means that you are operating on invalidated iterators.

As a solution I would propose to avoid the global task-list and instead use a local task-list in your searchFiles function, you can then wait on all your local futures in your searchFiles function on each level. This is a common pattern in non-managed recursive parallelism.

Note: I don't know all the details from the ppl concurrent_vector but I assume it behaves similar to a std::vector.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top