Вопрос

I've got some issue with a part of my perl script, bothering me for days now. To summarize the purpose is to read in a large file in chunks and do some operation on the input stream (not relevant for my question). When I first implemented it, I just looped over the file and then did some stuff on it, like this:

while (read FILE, $buffer, $chunksize){ 
  callSomeOperation($buffer);
  # Do some other stuff
}

Unfortunately the file is really big and the operation somehow complex with many function calls, therefore this led to steadily increasing Memory perl couldn't allocate memory anymore and the script failed. So I did some investigation and tried several things to minimize the memory overhead (defined variables outside the loop, set to undef and so on), which led the allocated memory size increasing slower, but at the end still failed. (And if I figured out right, perl giving back memory to the OS is sth. that won't happen in practice.)

So I decided to nest the function call and all its definition in a subthread, wait for its finish, join and then call the thread again with the next chunk:

while (read FILE, $buffer, $chunksize){
my $thr = threads->create(\&thrWorker,$buffer);
$thr->join();
}

sub thrWorker{
# Do the stuff here!
}  

Which might have been a solution, if the thread would join! But it actually does not. If I run it with $thr->detach(); everything works fine, besides I get hundrets of threads at the same time, which is not a good idea, and in this case, I need to run them consecutively.

So I took some Investigation on this join issue and got some voices that ther might be an issue with perl 5.16.1 so I updated to 5.16.2 but it still never joins. Anywhere in a Mailing list I cant remember I read from somebody managed to get Threads to join with CPAN module Thread::Queue but this didn't worked for me either.

So I gave up with threads and tried to fork this thing. But with fork it seems like the total number of "forks" is limited? Anyway it went fine till the 13th to 20th iteration and then gave up with the message it couldn't fork anymore.

my $pid = fork();
if( $pid == 0 ){
       thrWorker($buffer);
    exit 0;
}

I also tried it with CPAN modules Parallel::ForkManager and Proc::Fork but that didn't help.

So now I'm somehow stuck and cant help myself out. Maybe somebody else can! Any suggestions greatly appreciated!

  1. How can I get this thing to work with threads or child processes?
  2. Or at least how can I force perl freeing memory so I can do this in the same process?

Some additional information on my system: OS: Windows 7 64bit / Ubuntu Server 12.10 Perl on Windows: Strawberry Perl 5.16.2 64bit

One of my first posts on Stackoverflow. Hope I did it right :-)

Это было полезно?

Решение

I recommend reading: this

I usually use Thread::Queue to manage the input of thread. Sample code:

my @threads = {};
my $Q = new Thread::Queue;

# Start the threads
for (my $i=0; $i<NUM_THREADS; $i++) {
    $threads[$i] = 
        threads->new(\&insert_1_thread, $Q);
}

# Get the list of sites and put in the work queue
foreach $row ( @{$ref} ) {
    $Q->enqueue( $row->[0] );
    #sleep 1 while $Q->pending > 100;
} # foreach $row

# Signal we are done
for (my $i=0; $i<NUM_THREADS; $i++) {
    $Q->enqueue( undef ); }

$count = 0;
# Now wait for the threads to complete before going on to the next step
for (my $i=0; $i<NUM_THREADS; $i++) {
    $count += $threads[$i]->join(); }

And for the worker thread:

sub insert_1_thread {
my ( $Q ) = @_;
my $tid = threads->tid;
my $count = 0;
Log("Started thread #$tid");

while( my $row = $Q->dequeue ) {
    PROCESS ME...
    $count++;
} # while

Log("Thread#$tid, done");
return $count;

} # sub insert_1_thread

Другие советы

I don't know if it is a solution for you, but you could create an array of chunk objects and process them in parallel like this:

#!/usr/bin/perl

package Object; {
    use threads;
    use threads::shared;        

    sub new(){
        my $class=shift;
        share(my %this);
        return(bless(\%this,$class));
    }

    sub set {
       my ($this,$value)=@_;    
        lock($this);
#       $this->{"data"}=shared_clone($value);
        $this->{"data"}=$value;
    }

    sub get {
        my $this=shift; 
        return $this->{"data"};
    }
}


package main; {

use strict;
use warnings;

use threads;
use threads::shared;

    my @objs;
    foreach (0..2){
        my $o = Object->new();
        $o->set($_);
        push @objs, $o; 
    }

    threads->create(\&run,(\@objs))->join();

    sub run {
        my ($obj) = @_;     
        $$obj[$_]->get() foreach(0..2);        
    }
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top