Is there any method to limit concurrency of promises using Q promises library?

This question is kinda related to How can I limit Q promise concurrency?

but the problem is that I'm trying to do something like this:

for (var i = 0; i <= 1000; i++) {
  return Q.all([ task1(i), task2(i) ]); // <-- limit this to 2 at a time.
}

The real use case is:

  1. Fetch posts from DB
  2. Loop every post in DB like posts.forEach(function(post) {}
  3. For every post do task1, task2, task3 (retrieve social counters, retrieve comments count, etc)
  4. Save new post data in DB.

But the problem is that node is executing all tasks for all posts at the same time, like asking facebook for the "likes count" for 500 posts at the same time.

How i can limit Q.all() so only 2 posts at a time are executing their tasks? Or what other possible solutions can apply here?

Note: Most of the tasks (if not all) rely on request library

有帮助吗?

解决方案

Thanks to Dan, his answer and his help to integrate it with my code, it can be done using his gist and a snipplet like this:

var qlimit = require('../libs/qlimit');

var test = function(id) {
  console.log('Running ' + id);
  return Q.nfcall(request, 'some dummy url which takes some time to process, for example a php file with sleep(5)').spread(function(response, body) {
    console.log('Response ' + id);
    return body;
  });
}

test = qlimit.limitConcurrency(test, 1);

var data = [0, 1, 2];

data.forEach(function(id) {
  console.log('Starting item ' + id);
  Q.all([ test(id) ]);
});

This way you get something like:

  • Starting item 0
  • Starting item 1
  • Starting item 2
  • Running 0
  • Response 0
  • Running 1
  • Response 1
  • Running 2
  • Response 2

Which clearly is 1 request at a time.

The whole point that i was missing in the implementation is that you need to re-declare the function using limitConcurrency BEFORE starting the loop, not inside it.

其他提示

I asked a very similar question a few days ago: Node.js/Express and parallel queues

The solution I've found (see my own answer) was to use Caolan's async. It allows you to create "operation queues", and you can limit how many can run concurrently: see the "queue" method.

In your case, Node's main loop would pull elements from Q and create a task in the queue for each of them. You could also limit this (so not to basically re-create the queue outside of Q), for example by adding N new elements to the queue only when the last one is being executed (the "empty" callback for the "queue" method).

Here is the code I use to throttle q promises.

I just ripped it out of a project that I needed it for. If there is more people interested I could part it out into a module or something.

Check out methods spex.page and spex.sequence. They were designed to implement any possible strategy for data throttling + load balancing for promises.

See below couple examples from the project's documentation.

Balanced Page Source

The example below uses method page to initiate a sequence of 5 pages, and then logs the resolved data into the console. The source function serves each page with a half-second delay.

var $q = require('q');    
var spex = require('spex')($q);

function source(index, data, delay) {
    return new $q.Promise(function (resolve, reject) {
        setTimeout(function () {
            resolve([
                "page-" + index, // simple value;
                $q.resolve(Date.now()) // promise value;
            ])
        }, 500); // wait 1/2 second before serving the next page;
    });
}

function logger(index, data, delay) {
    console.log("LOG:", data);
}

spex.page(source, {dest: logger, limit: 5})
    .then(function (data) {
        console.log("FINISHED:", data);
    });

Output:

LOG: [ 'page-0', 1446050705823 ]
LOG: [ 'page-1', 1446050706327 ]
LOG: [ 'page-2', 1446050706834 ]
LOG: [ 'page-3', 1446050707334 ]
LOG: [ 'page-4', 1446050707839 ]
FINISHED: { pages: 5, total: 10, duration: 2520 }

Balanced Sequence Receiver

In the following example we have a sequence that returns data while the index is less than 5, and the destination function that enforces 1 second delay on processing each data resolved from the source.

var $q = require('q');    
var spex = require('spex')($q);

function source(index, data, delay) {
    console.log("SOURCE:", index, data, delay);
    if (index < 5) {
        return $q.resolve(index);
    }
}

function dest(index, data, delay) {
    console.log("DEST:", index, data, delay);
    return new $q.Promise(function (resolve, reject) {
        setTimeout(function () {
            resolve();
        }, 1000);
    });
}

spex.sequence(source, dest)
    .then(function (data) {
        console.log("DATA:", data);
    });

Output:

SOURCE: 0 undefined undefined
DEST: 0 0 undefined
SOURCE: 1 0 1011
DEST: 1 1 1001
SOURCE: 2 1 1001
DEST: 2 2 1001
SOURCE: 3 2 1000
DEST: 3 3 1000
SOURCE: 4 3 1001
DEST: 4 4 1001
SOURCE: 5 4 1000
DATA: { total: 5, duration: 5013 }
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top