Question

I'm using the Perl client of beanstalkd. I need a simple way to not enqueue the same work twice.

I need something that needs to basically wait until there are K elements, and then groups them together. To accomplish this, I have the producer:

insert item(s) into DB
insert a queue item into beanstalkd

And the consumer:

while ( 1 ) {
   beanstalkd.retrieve
   if ( DB items >= K )
       func_to_process_all_items
   kill job
}

This is linear in the number of requests/processing, but in the case of:

insert 1 item
... repeat many times ...
insert 1 item

Assuming all these insertions happened before a job was retrieved, this would add N queue items, and it would do something as such:

check DB, process N items
check DB, no items
... many times ...
check DB, no items

Is there a smarter way to do this so that it does not insert/process the later job requests unnecessarily?

Was it helpful?

Solution

Will this work for you?:

  1. Create two Tubes "buffer" and "live". Your producer always only adds to the "buffer" tube.
  2. Create two workers one watches the "buffer" and the other watches the "live" that call the blocking reserve() call
  3. Whenever the "buffer" worker returns on reserve, it buries the job if there are less than K items. If there are exactly K, then it "kicks" all K jobs and transfers them to the "live" tube.
  4. The "live" watcher will now return on its own reserve()

You just need to take care that a job does not ever return to the buffer queue from the buried state. A failsafe way to do this might be to delete it and then add it to live.

The two separate queues are only for cleaner separation. You could do the same with a single queue by burying everyjob until there are K-1 and then on the arrival of the K-th job, kicking all of them live.

OTHER TIPS

I had a related requirement. I only wanted to process a specific job once within a few minutes, but the producer could queue several instances of the same job. I used memcache to store the job identifier and set the expiry of the key to just a few minutes.

When a worker tried to add the job identifier to memcache, only the first would succeed - on failure to add the job id, the worker would delete the job. After a few minutes, the key expires from memcache and the job can be processed again.

Not particularly elegant, but it works.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top