Question

I'm trying to write a script which will download at most N files simultaneously via HTTP.

I've previously used AnyEvent::Worker::Pool to manage a pool of BLOCKING tasks. I've also used AnyEvent::HTTP in combination with AnyEvent->condvar to manage NON-BLOCKING downloads individually.

I thought that it should be pretty simple to combine the two approaches so that AnyEvent->condvar makes AnyEvent::HTTP::http_get look BLOCKING from the perspective of AnyEvent::Worker::Pool.

However, I'm getting some errors I don't understand, presumably due to implementation details of AnyEvent::Worker. Here's a really cut-down version of the script that demonstrates the issue:

use EV;
use AnyEvent 5;
use AnyEvent::Worker::Pool;
use AnyEvent::HTTP;
use 5.10.0;
use strict;

my $pool_size = 2;
my $num_jobs  = 7;

# Create a pool of $pool_size workers
my $workers = AnyEvent::Worker::Pool->new($pool_size, sub {
  my ($job) = @_;
  eval {
    my $cv = AnyEvent->condvar;
    print "worker starting download [$job] ...\n";
    http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
      my ($data, $headers) = @_;
      if ($headers->{Status} =~ /^2/) { 
        print "download [$job] succeeded.\n"; 
      } else { 
        print "download [$job] failed.\n"; 
      }
      $cv->send; # notification of download complete/exit.
    };

    $cv->recv; # wait for download to complete/exit before returning to pool
  }; if ($@) {
    print "worker payload error: $@\n";
  }
  return 1;
});

# dispatch the full list of downloads
my ($need,$done) = ($num_jobs, 0);
for my $job (0 .. ($need - 1)) {
  print "dispatching job $job...\n";
  $workers->do($job, sub {
    print "worker [$job] payload threw exception: $@\n" if $@;
    print "worker [$job] payload completed successfully!\n" unless $@;
    EV::unloop if ++$done == $need;
  });
}

EV::loop; # wait here for all downloads to complete
print "We're done!\n"; # some useful code to follow here...

Demo output is as follows:

user@host:~$ ./test.pl
dispatching job 0...
dispatching job 1...
dispatching job 2...
dispatching job 3...
dispatching job 4...
dispatching job 5...
dispatching job 6...
worker starting download [0] ...
worker starting download [1] ...
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
worker [6] payload threw exception: no worker connection
EV: error in callback (ignoring): no worker connection at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 60

^C
user@host:~$
user@host:~$
user@host:~$ download [1] failed.
unable to write results: Broken pipe at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 139.
  ...caught at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 145.

Why AnyEvent::HTTP?

In my real script I'm using many more features of AnyEvent::HTTP; particularly, I'm combining the on_body callback with Term::StatusBar to show a progress bar for the end-user of the script; additionally, I'm strategically 'pausing' in the on_body callback such that I maintain a transfer rate equal to or less than a rate pre-defined by the end-user.

Please feel free to suggest an alternative with those features (or an easy way to hack them in!)

Why AnyEvent::Worker::Pool?

I was familiar with it already. Alternative suggestions welcome.

Why EV?

It's fast. Again, alternatives suggestions welcome.

Was it helpful?

Solution

You shouldn't use AnyEvent::Worker::Poll for this task.
And I'll recommend you do not use loop specific features like EV::loop EV::unloop. This makes your code incompatible with other loops implementation.

Your code may be rewrited like this

use strict;
use AnyEvent;
use AnyEvent::HTTP;

my $pool_size = 2;
my $num_jobs  = 7;
my $cur_job = 0;

my $cv = AnyEvent->condvar;
$cv->begin();

for (1..($pool_size < $num_jobs ? $pool_size : $num_jobs)) {
    $cv->begin();
    make_job($cur_job++);
}

$cv->end();

sub make_job {
    my $job = shift;
    $num_jobs--;

    http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
        my ($data, $headers) = @_;
        if ($headers->{Status} =~ /^2/) { 
            print "download [$job] succeeded.\n"; 
        } else { 
            print "download [$job] failed.\n"; 
        }

        if ($num_jobs > 0) {
            make_job($cur_job++);
        }
        else {
            $cv->end();
        }
    };
}

$cv->recv();
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top