'use strict';
exports.__esModule = true;
exports.eachSeries = eachSeries;
exports.queue = queue;
/**
* Smaller version of the async library constructs.
*
* @namespace async
*/
/**
* Noop function
*
* @ignore
* @function
* @memberof async
*/
function _noop() {} /* empty */
/**
* Iterates an array in series.
*
* @memberof async
* @param {Array.<*>} array - Array to iterate.
* @param {function} iterator - Function to call for each element.
* @param {function} callback - Function to call when done, or on error.
* @param {boolean} [deferNext=false] - Break synchronous each loop by calling next with a setTimeout of 1.
*/
function eachSeries(array, iterator, callback, deferNext) {
var i = 0;
var len = array.length;
(function next(err) {
if (err || i === len) {
if (callback) {
callback(err);
}
return;
}
if (deferNext) {
setTimeout(function () {
iterator(array[i++], next);
}, 1);
} else {
iterator(array[i++], next);
}
})();
}
/**
* Ensures a function is only called once.
*
* @ignore
* @memberof async
* @param {function} fn - The function to wrap.
* @return {function} The wrapping function.
*/
function onlyOnce(fn) {
return function onceWrapper() {
if (fn === null) {
throw new Error('Callback was already called.');
}
var callFn = fn;
fn = null;
callFn.apply(this, arguments);
};
}
/**
* Async queue implementation,
*
* @memberof async
* @param {function} worker - The worker function to call for each task.
* @param {number} concurrency - How many workers to run in parrallel.
* @return {*} The async queue object.
*/
function queue(worker, concurrency) {
if (concurrency == null) {
// eslint-disable-line no-eq-null,eqeqeq
concurrency = 1;
} else if (concurrency === 0) {
throw new Error('Concurrency must not be zero');
}
var workers = 0;
var q = {
_tasks: [],
concurrency: concurrency,
saturated: _noop,
unsaturated: _noop,
buffer: concurrency / 4,
empty: _noop,
drain: _noop,
error: _noop,
started: false,
paused: false,
push: function push(data, callback) {
_insert(data, false, callback);
},
kill: function kill() {
workers = 0;
q.drain = _noop;
q.started = false;
q._tasks = [];
},
unshift: function unshift(data, callback) {
_insert(data, true, callback);
},
process: function process() {
while (!q.paused && workers < q.concurrency && q._tasks.length) {
var task = q._tasks.shift();
if (q._tasks.length === 0) {
q.empty();
}
workers += 1;
if (workers === q.concurrency) {
q.saturated();
}
worker(task.data, onlyOnce(_next(task)));
}
},
length: function length() {
return q._tasks.length;
},
running: function running() {
return workers;
},
idle: function idle() {
return q._tasks.length + workers === 0;
},
pause: function pause() {
if (q.paused === true) {
return;
}
q.paused = true;
},
resume: function resume() {
if (q.paused === false) {
return;
}
q.paused = false;
// Need to call q.process once per concurrent
// worker to preserve full concurrency after pause
for (var w = 1; w <= q.concurrency; w++) {
q.process();
}
}
};
function _insert(data, insertAtFront, callback) {
if (callback != null && typeof callback !== 'function') {
// eslint-disable-line no-eq-null,eqeqeq
throw new Error('task callback must be a function');
}
q.started = true;
if (data == null && q.idle()) {
// eslint-disable-line no-eq-null,eqeqeq
// call drain immediately if there are no tasks
setTimeout(function () {
return q.drain();
}, 1);
return;
}
var item = {
data: data,
callback: typeof callback === 'function' ? callback : _noop
};
if (insertAtFront) {
q._tasks.unshift(item);
} else {
q._tasks.push(item);
}
setTimeout(function () {
return q.process();
}, 1);
}
function _next(task) {
return function next() {
workers -= 1;
task.callback.apply(task, arguments);
if (arguments[0] != null) {
// eslint-disable-line no-eq-null,eqeqeq
q.error(arguments[0], task.data);
}
if (workers <= q.concurrency - q.buffer) {
q.unsaturated();
}
if (q.idle()) {
q.drain();
}
q.process();
};
}
return q;
}
//# sourceMappingURL=async.js.map