time-to-botec/squiggle/node_modules/@stdlib/utils/parallel/lib/node/exec.js

318 lines
7.6 KiB
JavaScript
Raw Normal View History

/**
* @license Apache-2.0
*
* Copyright (c) 2018 The Stdlib Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
// MODULES //
var fork = require( 'child_process' ).fork;
var path = require( 'path' );
var logger = require( 'debug' );
var objectKeys = require( './../../../keys' );
var getOpts = require( './options.js' );
// VARIABLES //
var debug = logger( 'parallel:exec' );
var WORKER_FILEPATH = path.resolve( __dirname, './worker/index.js' );
// MAIN //
/**
* Executes scripts in parallel.
*
* @private
* @param {StringArray} files - script absolute file paths
* @param {Options} opts - options
* @param {PositiveInteger} opts.concurrency - number of scripts to execute concurrently
* @param {PositiveInteger} opts.workers - number of workers
* @param {string} opts.cmd - executable file/command
* @param {boolean} opts.ordered - boolean indicating whether to preserve order of script output
* @param {(NonNegativeInteger|null)} opts.uid - process user identity
* @param {(NonNegativeInteger|null)} opts.gid - process group identity
* @param {string} opts.encoding - `stdio` encoding
* @param {NonNegativeInteger} opts.maxBuffer - max child process `stdio` buffer size
* @param {Callback} clbk - callback to invoke after executing all scripts
*/
function exec( files, opts, clbk ) {
var numClosed;
var workers;
var pending;
var fopts;
var args;
var proc;
var pids;
var pid;
var idx;
var err;
var i;
debug( 'Options: %s.', JSON.stringify( opts ) );
numClosed = 0;
debug( 'Creating %d workers...', opts.workers );
workers = {};
args = [];
fopts = getOpts( opts );
for ( i = 0; i < opts.workers; i++ ) {
debug( 'Creating child process...' );
proc = fork( WORKER_FILEPATH, args, fopts );
proc.on( 'error', onError( proc ) );
proc.on( 'close', onClose( proc ) );
proc.on( 'exit', onExit( proc ) );
proc.on( 'disconnect', onDisconnect( proc ) );
proc.on( 'message', onMessage( proc ) );
debug( 'Child process created. pid: %d.', proc.pid );
workers[ proc.pid ] = proc;
}
pids = objectKeys( workers );
debug( '%d workers created.', pids.length );
debug( 'Running %d scripts concurrently...', opts.concurrency );
pending = {};
idx = -1;
for ( i = 0; i < opts.concurrency; i++ ) {
pid = pids[ i%pids.length ];
next( workers[ pid ] ); // eslint-disable-line callback-return
}
/**
* Instructs a child process to run the next script.
*
* @private
* @param {Object} child - child process
* @returns {void}
*/
function next( child ) {
var numPending;
idx += 1;
if ( idx >= files.length ) {
numPending = objectKeys( pending ).length;
if ( numPending > 0 ) {
debug( '%d scripts are pending.', numPending );
return;
}
debug( 'All scripts have finished.' );
return close();
}
debug( 'Instructing child process to run script: %s. pid: %d.', files[ idx ], child.pid );
child.send( files[ idx ] );
pending[ files[ idx ] ] = true;
debug( '%d of %d scripts have been processed.', idx, files.length );
}
/**
* Returns a callback to be invoked upon receiving a message from a child process.
*
* @private
* @param {Object} child - child process
* @returns {Callback} callback
*/
function onMessage( child ) {
return listener;
/**
* Callback invoked upon receiving a message from a child process.
*
* @private
* @param {string} filepath - script filepath
*/
function listener( filepath ) {
debug( 'Child process message: %s. pid: %d.', filepath, child.pid );
// Remove the script from the listing of pending scripts:
delete pending[ filepath ];
// Indicate that the child process is ready for its next task:
next( child );
}
}
/**
* Returns a callback to be invoked upon child process close.
*
* @private
* @param {Object} child - child process
* @returns {Callback} callback
*/
function onClose( child ) {
return listener;
/**
* Callback invoked upon child process close.
*
* @private
* @param {(number|null)} code - exit code
* @param {(string|null)} signal - termination signal
*/
function listener( code, signal ) {
debug( 'Child process closed. Code: %d. Signal: %s. pid: %d.', code, signal, child.pid );
processExit( code, signal );
childClosed();
}
}
/**
* Callback invoked if a child closes.
*
* @private
*/
function childClosed() {
numClosed += 1;
debug( '%d of %d child processes have closed.', numClosed, opts.workers );
if ( numClosed === opts.workers ) {
done(); // eslint-disable-line callback-return
}
}
/**
* Returns a callback to be invoked upon child process exit.
*
* @private
* @param {Object} child - child process
* @returns {Callback} callback
*/
function onExit( child ) {
return listener;
/**
* Callback invoked upon child process exit.
*
* @private
* @param {(number|null)} code - exit code
* @param {(string|null)} signal - termination signal
*/
function listener( code, signal ) {
debug( 'Child process exited. Code: %d. Signal: %s. pid: %d.', code, signal, child.pid );
processExit( code, signal );
}
}
/**
* Closes all workers.
*
* @private
* @param {Error} [error] - error object
*/
function close( error ) {
var pids;
var pid;
var i;
if ( error && !err ) {
err = error;
}
debug( 'Instructing child processes to close...' );
pids = objectKeys( workers );
for ( i = 0; i < pids.length; i++ ) {
pid = pids[ i ];
debug( 'Instructing child process (pid: %d) to close...', pid );
workers[ pid ].send( 'close' );
}
}
/**
* Returns a callback to be invoked upon child process disconnect.
*
* @private
* @param {Object} child - child process
* @returns {Callback} callback
*/
function onDisconnect( child ) {
return listener;
/**
* Callback invoked upon child process disconnect.
*
* @private
*/
function listener() {
debug( 'Child process disconnected. pid: %d.', child.pid );
}
}
/**
* Returns a callback to be invoked upon encountering a child process error.
*
* @private
* @param {Object} child - child process
* @returns {Callback} callback
*/
function onError( child ) {
return listener;
/**
* Callback invoked upon a child process error.
*
* @private
* @param {Error} error - error object
*/
function listener( error ) {
debug( 'Child process error: %s. pid: %d.', error.message, child.pid );
close( error );
}
}
/**
* Processes process exit values. If provided a non-zero exit code or termination signal, instructs the process to close.
*
* @private
* @param {(number|null)} code - exit code
* @param {(string|null)} signal - termination signal
* @returns {void}
*/
function processExit( code, signal ) {
var error;
if ( err ) {
return;
}
if ( code !== null && code !== 0 ) {
error = new Error( 'Child process failed with exit code: '+code+'.' );
} else if ( signal !== null ) {
error = new Error( 'Child process failed due to termination signal: '+signal+'.' );
}
if ( error ) {
error.code = code;
error.signal = signal;
return close( error );
}
}
/**
* Callback invoked once all tasks are finished.
*
* @private
* @returns {void}
*/
function done() {
if ( err ) {
return clbk( err );
}
clbk();
}
}
// EXPORTS //
module.exports = exec;