Question

Edit: OK, here's a much much simpler example illustrating my problem. why is only the first task ever put onto the queue?

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        pool.put( task!simpleWorker(depth,maxDepth,pool));
    }
}

void main(){
    auto pool = new TaskPool();
    pool.put(task!simpleWorker(0,5,pool));
    pool.finish(true);
    writeln("Done");
}

Original:

I need to traverse this DAG. When I visit a node I clean it. I can't clean a node until all of its parents are clean.

The way I'm attempting is to have the worker thread's current node check all of its children to see which ones can be processed. Any that can be processed are added to the TaskPool.

My problem is I can't figure out how to add new tasks to the TaskPool and get them processed. This just cleans the first node in the DAG, and then exits, leaving everything else dirty.

void cleanNode(Node node, TaskPool pool){
    node.doProcess();
    foreach (client; node.clients){
        if (client.canProcess()){
            pool.put(task!cleanNode(client, pool));
        }
    }
}

void main(){
    auto dag = mkTestDag(5);
    auto pool = new TaskPool();

    pool.put( task!cleanNode(dag[0], pool));
    pool.finish(true); 

    writeln("\n\nOutput:");
    foreach (d;dag){
        writeln(d);
        writeln(d.dirty ? "dirty" : "clean","\n");
    }
}

The full code is here: http://pastebin.com/LLfMyKVp

Was it helpful?

Solution

It is because an Error is being thrown from the put inside simpleWorker.

This version shows the error:

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        try {
            pool.put( task!simpleWorker(depth,maxDepth,pool));
        } catch (Error e) {
            writeln("Fail: ", e.msg);
        }
    }
}

void main(){
    auto pool = new TaskPool();
    pool.put(task!simpleWorker(0,5,pool));
    pool.finish(true);
    writeln("Done");
}

And the output:

Depth is: 0
Fail: Cannot submit a new task to a pool after calling finish() or stop().
Done

Hopefully someone else can explain the correct way to use TaskPool.

Edit

Got it working by telling the tasks to run like this:

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        try 
        {
            auto subWorker = task!simpleWorker(depth,maxDepth, pool);
            pool.put(subWorker);
            subWorker.yieldForce();
        } catch (Error t) {
            writeln("Fail: (",  typeof(t).stringof, ") ", t.msg);
        }
    }
}

void main(){
    auto pool = new TaskPool();

    auto worker = task!simpleWorker(0,5, pool);
    pool.put(worker);
    worker.yieldForce();

    pool.finish(true);
    writeln("Done");
}

Output:

Depth is: 0
Depth is: 1
Depth is: 2
Depth is: 3
Depth is: 4
Done

OTHER TIPS

You have such behaviour because your canProcess() returns false for all nodes except the root node. Since cleanNodeSimple() calls canProcess() to check whether to put new task to the pool, it never does that I believe... Check the output of this (modified) program: http://dpaste.dzfl.pl/ea0c393a .

EDIT: Furthermore, after some analysis, the loop is executed only once as well, so nothing is added to the task pool. :)

Code is on DPaste, check it out. Here is the most interesting part that may help you solve the problem:

// Dejan: will be called only once!
// Because canProcess() returns false for all children.
void cleanNodeSimple(Node node, TaskPool pool){
  node.doProcess();
  writeln("cleanNodeSimple() called for node `", node, "`"); 
  foreach (cli; node.clients){
    writeln("I am ", cli, " and I am executed only once, because I am only RootNode's client.");
    if (cli.canProcess()) {
      writeln("I will be executed only once because because ", cli, " has ", cli.clients.length, " clients.");
      writeln("And its master #1 is ", cli.clients[0].masters[0].dirty ? "dirty" : "clean");
      pool.put( task!cleanNodeSimple(cli, pool) );
    }
  }
}

to iterate in a parallel fashion you can use parallel:

foreach (client; pool.parallel(node.clients,1)){
    if (client.canProcess()){
        cleanNode(client, pool);//<- EDIT: no need to make a task here
    }
}

this will block until all clients have been iterated over (letting the current thread do some work as well)

to start you need to force the first task in main() instead of calling finish on the pool

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top