You can represent abort status with bool
instead of continue_msg
.
Each process_node
receive predecessor node status and process task when it's available, and send updated abort status to successor node.
struct body
{ std::string my_name;
body( const char *name ) : my_name(name)
{
}
bool operator()( bool avail ) const
{ if (!avail)
printf("%s skipped\n", my_name.c_str());
else
if (my_name == "B")
{ printf("%s fail\n", my_name.c_str());
avail = false; // fail task
}
else
{ sleep(1);
printf("%s\n", my_name.c_str());
}
return avail;
}
};
int main()
{
graph g;
typedef function_node<bool, bool> process_node;
typedef std::tuple<bool,bool> bool_pair;
broadcast_node< bool > start(g);
process_node a( g, unlimited, body("A"));
process_node b( g, unlimited, body("B"));
process_node c( g, unlimited, body("C"));
join_node<bool_pair> join_c(g);
function_node<bool_pair, bool> and_c(g, unlimited, [](const bool_pair& in)->bool {
return std::get<0>(in) && std::get<1>(in);
});
process_node d( g, unlimited, body("D"));
process_node e( g, unlimited, body("E"));
/*
* start -+-> A -+-> E
* | \
* | \
* | join_c -> and_c -> C -> D
* | /
* | /
* +-> B --
*/
make_edge( start, a );
make_edge( start, b );
make_edge( a, input_port<0>(join_c) );
make_edge( b, input_port<1>(join_c) );
make_edge( join_c, and_c );
make_edge( and_c, c );
make_edge( c, d );
make_edge( a, e );
for (int i = 0; i < 3; ++i )
try
{ start.try_put( true );
g.wait_for_all();
} catch (...)
{ printf("Caught exception\n");
}
return 0;
}