Question

Let's assume that I start with a TransformBlock<Uri, string> (which in itself is an implementation of IPropagatorBlock<Uri, string>) that takes the Uri and then gets the content in a string (this is a web crawler):

var downloader = new TransformBlock<Uri, string>(async uri => {
    // Download and return string asynchronously...
});

Once I have the content in a string, I parse it for links. Since a page can have multiple links, I use a TransformManyBlock<string, Uri> to map the singular result (the content) to many links:

// The discovered item block.
var parser = new TransformManyBlock<string, Uri>(s => {
    // Parse the content here, return an IEnumerable<Uri>.
});

The key to the parser is that it can pass back an empty sequence, indicating that there are no more items that it should parse.

However, that's only for a branch of the tree (or section of the web).

I then link the downloader to the parser, and then back to the downloader, like so:

downloader.LinkTo(parser);
parser.LinkTo(downloader);

Now, I know I can get everything to stop outside of the blocks (by calling Complete on one of them) but how can I signal that it's complete from inside the blocks?

Or do I have to somehow manage this state myself?

Right now, it just hangs, because the downloader block is starved after all the content has been downloaded and parsed.

Here's a fully contained test method which hangs on the call to Wait:

[TestMethod]
public void TestSpider()
{
    // The list of numbers.
    var numbers = new[] { 1, 2 };

    // Transforms from an int to a string.
    var downloader = new TransformBlock<Tuple<int, string>, string>(
        t => t.Item2 + t.Item1.ToString(CultureInfo.InvariantCulture),

        // Let's assume four downloads to a domain at a time.
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

    // Gets the next set of strings.
    var parser = new TransformManyBlock<string, Tuple<int, string>>(s => {
        // If the string length is greater than three, return an
        // empty sequence.
        // This is the signal for this branch to stop.
        if (s.Length > 3) return Enumerable.Empty<Tuple<int, string>>();

        // Branch out.
        return numbers.Select(n => new Tuple<int, string>(n, s));
    }, 
    // These are simple transformations/parsing, no need to not parallelize.
    // The dataflow blocks will handle the task allocation.
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    // For broadcasting to an action.
    var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
        // Clone.
        t => new Tuple<int, string>(t.Item1, t.Item2));

    // Indicate what was parsed.
    var parserConsumer = new ActionBlock<Tuple<int, string>>(
        t => Debug.WriteLine(
            string.Format(CultureInfo.InvariantCulture, 
                "Consumed - Item1: {0}, Item2: \"{1}\"",
            t.Item1, t.Item2)));

    // Link downloader to parser.
    downloader.LinkTo(parser);

    // Parser to broadcaster.
    parser.LinkTo(parserBroadcaster);

    // Broadcaster to consumer.
    parserBroadcaster.LinkTo(parserConsumer);

    // Broadcaster back to the downloader.
    parserBroadcaster.LinkTo(downloader);

    // Start the downloader.
    downloader.Post(new Tuple<int, string>(1, ""));

    // Wait on the consumer to finish.
    parserConsumer.Completion.Wait();
}

The output of which (as expected, before it hangs) is:

Consumed - Item1: 1, Item2: "1"
Consumed - Item1: 2, Item2: "1"
Consumed - Item1: 1, Item2: "11"
Consumed - Item1: 2, Item2: "11"
Consumed - Item1: 1, Item2: "12"
Consumed - Item1: 2, Item2: "12"
Consumed - Item1: 1, Item2: "111"
Consumed - Item1: 2, Item2: "111"
Consumed - Item1: 1, Item2: "112"
Consumed - Item1: 2, Item2: "112"
Consumed - Item1: 1, Item2: "121"
Consumed - Item1: 2, Item2: "121"
Consumed - Item1: 1, Item2: "122"
Consumed - Item1: 2, Item2: "122"
Was it helpful?

Solution

The TPL dataflow framework doesn't have anything that can handle this out-of-the-box. It's more a state management issue.

That said, the key is in keeping track of the URLs that have been downloaded, as well as the URLs that still need to be downloaded.

The ideal place to handle this is the parser block; this is the point where you have the content (which will be transformed into more links to download) and the URL that the content was downloaded from.

Working on the sample above, a way to capture the download result as well as the URI that it was downloaded from needs to be introduced (I would have used a Tuple but it would have made things too confusing):

public class DownloadResult
{
    public Tuple<int, string> Uri { get; set; }
    public string Content { get; set; }
}

From there, the download block is pretty much the same, just updated to output the above structure:

[TestMethod]
public void TestSpider2()
{
    // The list of numbers.
    var numbers = new[] { 1, 2 };

    // Performs the downloading.
    var downloader = new TransformBlock<Tuple<int, string>, DownloadResult>(
        t => new DownloadResult { 
            Uri = t, 
            Content = t.Item2 + 
                t.Item1.ToString(CultureInfo.InvariantCulture) 
        },

        // Let's assume four downloads to a domain at a time.
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

The consumer of the parser doesn't need to change, but does need to be declared earlier, as the parser has to signal to the consumer that it should stop consuming and we want to capture it in the closure passed to the parser:

// Indicate what was parsed.
var parserConsumer = new ActionBlock<Tuple<int, string>>(
    t => Debug.WriteLine(
        string.Format(CultureInfo.InvariantCulture, 
            "Consumed - Item1: {0}, Item2: \"{1}\"",
            t.Item1, t.Item2)));

Now the state manager has to be introduced:

// The dictionary indicating what needs to be processed.
var itemsToProcess = new HashSet<Tuple<int, string>>();

At first, I thought of going with just a ConcurrentDictionary<TKey, TValue>, but with the atomic operation having to be performed around a removal and multiple additions, it didn't provide what was needed. A simple lock statement is the best option here.

The parser is what changes the most. It parses the items normally, but also does the following atomically:

  • Removes the URL from the state machine (itemsToProcess)
  • Adds new URLs to the state machine.
  • If no items exist in the state machine after processing the above, then signal to the consumer block that it's done by calling the Complete method on the IDataflowBlock interface

That looks like this:

// Changes content into items and new URLs to download.
var parser = new TransformManyBlock<DownloadResult, Tuple<int, string>>(
    r => {
        // The parsed items.
        IEnumerable<Tuple<int, string>> parsedItems;

        // If the string length is greater than three, return an
        // empty sequence.
        // This is the signal for this branch to stop.
        parsedItems = (r.Uri.Item2.Length > 3) ? 
            Enumerable.Empty<Tuple<int, string>>() :
            numbers.Select(n => new Tuple<int, string>(n, r.Content));

        // Materialize the list.
        IList<Tuple<int, string>> materializedParsedItems = 
            parsedItems.ToList();

        // Lock here, need to make sure the removal from
        // from the items to process dictionary and
        // the addition of the new items are atomic.
        lock (itemsToProcess)
        {
            // Remove the item.
            itemsToProcess.Remove(r.Uri);

            // If the materialized list has zero items, and the new
            // list has zero items, finish the action block.
            if (materializedParsedItems.Count == 0 && 
                itemsToProcess.Count == 0)
            {
                // Complete the consumer block.
                parserConsumer.Complete();
            }

            // Add the items.
            foreach (Tuple<int, string> newItem in materializedParsedItems) 
                itemsToProcess.Add(newItem);

                // Return the items.
                return materializedParsedItems;
            }
        }, 

        // These are simple transformations/parsing, no need to not 
        // parallelize.  The dataflow blocks will handle the task 
        // allocation.
        new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

The broadcaster and the links are the same:

// For broadcasting to an action.
var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
    // Clone.
    t => new Tuple<int, string>(t.Item1, t.Item2));

// Link downloader to parser.
downloader.LinkTo(parser);

// Parser to broadcaster.
parser.LinkTo(parserBroadcaster);

// Broadcaster to consumer.
parserBroadcaster.LinkTo(parserConsumer);

// Broadcaster back to the downloader.
parserBroadcaster.LinkTo(downloader);

When starting the blocks, the state machine has to be primed with the URL to download before the root is passed to the Post method:

// The initial post to download.
var root = new Tuple<int, string>(1, "");

// Add to the items to process.
itemsToProcess.Add(root);

// Post to the downloader.
downloader.Post(root);

And the call to Wait method on the Task class is the same and will now complete without hanging:

    // Wait on the consumer to finish.
    parserConsumer.Completion.Wait();
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top