admin管理员组

文章数量:1325761

Is there any method to limit concurrency of promises using Q promises library?

This question is kinda related to How can I limit Q promise concurrency?

but the problem is that I'm trying to do something like this:

for (var i = 0; i <= 1000; i++) {
  return Q.all([ task1(i), task2(i) ]); // <-- limit this to 2 at a time.
}

The real use case is:

  1. Fetch posts from DB
  2. Loop every post in DB like posts.forEach(function(post) {}
  3. For every post do task1, task2, task3 (retrieve social counters, retrieve ments count, etc)
  4. Save new post data in DB.

But the problem is that node is executing all tasks for all posts at the same time, like asking facebook for the "likes count" for 500 posts at the same time.

How i can limit Q.all() so only 2 posts at a time are executing their tasks? Or what other possible solutions can apply here?

Note: Most of the tasks (if not all) rely on request library

Is there any method to limit concurrency of promises using Q promises library?

This question is kinda related to How can I limit Q promise concurrency?

but the problem is that I'm trying to do something like this:

for (var i = 0; i <= 1000; i++) {
  return Q.all([ task1(i), task2(i) ]); // <-- limit this to 2 at a time.
}

The real use case is:

  1. Fetch posts from DB
  2. Loop every post in DB like posts.forEach(function(post) {}
  3. For every post do task1, task2, task3 (retrieve social counters, retrieve ments count, etc)
  4. Save new post data in DB.

But the problem is that node is executing all tasks for all posts at the same time, like asking facebook for the "likes count" for 500 posts at the same time.

How i can limit Q.all() so only 2 posts at a time are executing their tasks? Or what other possible solutions can apply here?

Note: Most of the tasks (if not all) rely on request library

Share Improve this question edited May 23, 2017 at 11:59 CommunityBot 11 silver badge asked Mar 12, 2014 at 23:59 Félix SanzFélix Sanz 1,8924 gold badges16 silver badges27 bronze badges 1
  • For the same task I implemented methods sequence and page within spex library. – vitaly-t Commented Oct 11, 2015 at 1:28
Add a ment  | 

4 Answers 4

Reset to default 3

Thanks to Dan, his answer and his help to integrate it with my code, it can be done using his gist and a snipplet like this:

var qlimit = require('../libs/qlimit');

var test = function(id) {
  console.log('Running ' + id);
  return Q.nfcall(request, 'some dummy url which takes some time to process, for example a php file with sleep(5)').spread(function(response, body) {
    console.log('Response ' + id);
    return body;
  });
}

test = qlimit.limitConcurrency(test, 1);

var data = [0, 1, 2];

data.forEach(function(id) {
  console.log('Starting item ' + id);
  Q.all([ test(id) ]);
});

This way you get something like:

  • Starting item 0
  • Starting item 1
  • Starting item 2
  • Running 0
  • Response 0
  • Running 1
  • Response 1
  • Running 2
  • Response 2

Which clearly is 1 request at a time.

The whole point that i was missing in the implementation is that you need to re-declare the function using limitConcurrency BEFORE starting the loop, not inside it.

I asked a very similar question a few days ago: Node.js/Express and parallel queues

The solution I've found (see my own answer) was to use Caolan's async. It allows you to create "operation queues", and you can limit how many can run concurrently: see the "queue" method.

In your case, Node's main loop would pull elements from Q and create a task in the queue for each of them. You could also limit this (so not to basically re-create the queue outside of Q), for example by adding N new elements to the queue only when the last one is being executed (the "empty" callback for the "queue" method).

Here is the code I use to throttle q promises.

I just ripped it out of a project that I needed it for. If there is more people interested I could part it out into a module or something.

Check out methods spex.page and spex.sequence. They were designed to implement any possible strategy for data throttling + load balancing for promises.

See below couple examples from the project's documentation.

Balanced Page Source

The example below uses method page to initiate a sequence of 5 pages, and then logs the resolved data into the console. The source function serves each page with a half-second delay.

var $q = require('q');    
var spex = require('spex')($q);

function source(index, data, delay) {
    return new $q.Promise(function (resolve, reject) {
        setTimeout(function () {
            resolve([
                "page-" + index, // simple value;
                $q.resolve(Date.now()) // promise value;
            ])
        }, 500); // wait 1/2 second before serving the next page;
    });
}

function logger(index, data, delay) {
    console.log("LOG:", data);
}

spex.page(source, {dest: logger, limit: 5})
    .then(function (data) {
        console.log("FINISHED:", data);
    });

Output:

LOG: [ 'page-0', 1446050705823 ]
LOG: [ 'page-1', 1446050706327 ]
LOG: [ 'page-2', 1446050706834 ]
LOG: [ 'page-3', 1446050707334 ]
LOG: [ 'page-4', 1446050707839 ]
FINISHED: { pages: 5, total: 10, duration: 2520 }

Balanced Sequence Receiver

In the following example we have a sequence that returns data while the index is less than 5, and the destination function that enforces 1 second delay on processing each data resolved from the source.

var $q = require('q');    
var spex = require('spex')($q);

function source(index, data, delay) {
    console.log("SOURCE:", index, data, delay);
    if (index < 5) {
        return $q.resolve(index);
    }
}

function dest(index, data, delay) {
    console.log("DEST:", index, data, delay);
    return new $q.Promise(function (resolve, reject) {
        setTimeout(function () {
            resolve();
        }, 1000);
    });
}

spex.sequence(source, dest)
    .then(function (data) {
        console.log("DATA:", data);
    });

Output:

SOURCE: 0 undefined undefined
DEST: 0 0 undefined
SOURCE: 1 0 1011
DEST: 1 1 1001
SOURCE: 2 1 1001
DEST: 2 2 1001
SOURCE: 3 2 1000
DEST: 3 3 1000
SOURCE: 4 3 1001
DEST: 4 4 1001
SOURCE: 5 4 1000
DATA: { total: 5, duration: 5013 }

本文标签: javascriptConcurrency limit in Q promisesnodeStack Overflow