/**
* @license Apache-2.0
*
* Copyright (c) 2018 The Stdlib Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

// MODULES //

var fork = require( 'child_process' ).fork;
var path = require( 'path' );
var logger = require( 'debug' );
var objectKeys = require( './../../../keys' );
var getOpts = require( './options.js' );


// VARIABLES //

var debug = logger( 'parallel:exec' );
var WORKER_FILEPATH = path.resolve( __dirname, './worker/index.js' );


// MAIN //

/**
* Executes scripts in parallel.
*
* @private
* @param {StringArray} files - script absolute file paths
* @param {Options} opts - options
* @param {PositiveInteger} opts.concurrency - number of scripts to execute concurrently
* @param {PositiveInteger} opts.workers - number of workers
* @param {string} opts.cmd - executable file/command
* @param {boolean} opts.ordered - boolean indicating whether to preserve order of script output
* @param {(NonNegativeInteger|null)} opts.uid - process user identity
* @param {(NonNegativeInteger|null)} opts.gid - process group identity
* @param {string} opts.encoding - `stdio` encoding
* @param {NonNegativeInteger} opts.maxBuffer - max child process `stdio` buffer size
* @param {Callback} clbk - callback to invoke after executing all scripts
*/
function exec( files, opts, clbk ) {
	var numClosed;
	var workers;
	var pending;
	var fopts;
	var args;
	var proc;
	var pids;
	var pid;
	var idx;
	var err;
	var i;

	debug( 'Options: %s.', JSON.stringify( opts ) );
	numClosed = 0;

	debug( 'Creating %d workers...', opts.workers );
	workers = {};
	args = [];
	fopts = getOpts( opts );
	for ( i = 0; i < opts.workers; i++ ) {
		debug( 'Creating child process...' );
		proc = fork( WORKER_FILEPATH, args, fopts );

		proc.on( 'error', onError( proc ) );
		proc.on( 'close', onClose( proc ) );
		proc.on( 'exit', onExit( proc ) );
		proc.on( 'disconnect', onDisconnect( proc ) );
		proc.on( 'message', onMessage( proc ) );

		debug( 'Child process created. pid: %d.', proc.pid );
		workers[ proc.pid ] = proc;
	}
	pids = objectKeys( workers );
	debug( '%d workers created.', pids.length );

	debug( 'Running %d scripts concurrently...', opts.concurrency );
	pending = {};
	idx = -1;
	for ( i = 0; i < opts.concurrency; i++ ) {
		pid = pids[ i%pids.length ];
		next( workers[ pid ] ); // eslint-disable-line callback-return
	}

	/**
	* Instructs a child process to run the next script.
	*
	* @private
	* @param {Object} child - child process
	* @returns {void}
	*/
	function next( child ) {
		var numPending;
		idx += 1;
		if ( idx >= files.length ) {
			numPending = objectKeys( pending ).length;
			if ( numPending > 0 ) {
				debug( '%d scripts are pending.', numPending );
				return;
			}
			debug( 'All scripts have finished.' );
			return close();
		}
		debug( 'Instructing child process to run script: %s. pid: %d.', files[ idx ], child.pid );
		child.send( files[ idx ] );
		pending[ files[ idx ] ] = true;

		debug( '%d of %d scripts have been processed.', idx, files.length );
	}

	/**
	* Returns a callback to be invoked upon receiving a message from a child process.
	*
	* @private
	* @param {Object} child - child process
	* @returns {Callback} callback
	*/
	function onMessage( child ) {
		return listener;

		/**
		* Callback invoked upon receiving a message from a child process.
		*
		* @private
		* @param {string} filepath - script filepath
		*/
		function listener( filepath ) {
			debug( 'Child process message: %s. pid: %d.', filepath, child.pid );

			// Remove the script from the listing of pending scripts:
			delete pending[ filepath ];

			// Indicate that the child process is ready for its next task:
			next( child );
		}
	}

	/**
	* Returns a callback to be invoked upon child process close.
	*
	* @private
	* @param {Object} child - child process
	* @returns {Callback} callback
	*/
	function onClose( child ) {
		return listener;

		/**
		* Callback invoked upon child process close.
		*
		* @private
		* @param {(number|null)} code - exit code
		* @param {(string|null)} signal - termination signal
		*/
		function listener( code, signal ) {
			debug( 'Child process closed. Code: %d. Signal: %s. pid: %d.', code, signal, child.pid );
			processExit( code, signal );
			childClosed();
		}
	}

	/**
	* Callback invoked if a child closes.
	*
	* @private
	*/
	function childClosed() {
		numClosed += 1;
		debug( '%d of %d child processes have closed.', numClosed, opts.workers );
		if ( numClosed === opts.workers ) {
			done(); // eslint-disable-line callback-return
		}
	}

	/**
	* Returns a callback to be invoked upon child process exit.
	*
	* @private
	* @param {Object} child - child process
	* @returns {Callback} callback
	*/
	function onExit( child ) {
		return listener;

		/**
		* Callback invoked upon child process exit.
		*
		* @private
		* @param {(number|null)} code - exit code
		* @param {(string|null)} signal - termination signal
		*/
		function listener( code, signal ) {
			debug( 'Child process exited. Code: %d. Signal: %s. pid: %d.', code, signal, child.pid );
			processExit( code, signal );
		}
	}

	/**
	* Closes all workers.
	*
	* @private
	* @param {Error} [error] - error object
	*/
	function close( error ) {
		var pids;
		var pid;
		var i;
		if ( error && !err ) {
			err = error;
		}
		debug( 'Instructing child processes to close...' );
		pids = objectKeys( workers );
		for ( i = 0; i < pids.length; i++ ) {
			pid = pids[ i ];
			debug( 'Instructing child process (pid: %d) to close...', pid );
			workers[ pid ].send( 'close' );
		}
	}

	/**
	* Returns a callback to be invoked upon child process disconnect.
	*
	* @private
	* @param {Object} child - child process
	* @returns {Callback} callback
	*/
	function onDisconnect( child ) {
		return listener;

		/**
		* Callback invoked upon child process disconnect.
		*
		* @private
		*/
		function listener() {
			debug( 'Child process disconnected. pid: %d.', child.pid );
		}
	}

	/**
	* Returns a callback to be invoked upon encountering a child process error.
	*
	* @private
	* @param {Object} child - child process
	* @returns {Callback} callback
	*/
	function onError( child ) {
		return listener;

		/**
		* Callback invoked upon a child process error.
		*
		* @private
		* @param {Error} error - error object
		*/
		function listener( error ) {
			debug( 'Child process error: %s. pid: %d.', error.message, child.pid );
			close( error );
		}
	}

	/**
	* Processes process exit values. If provided a non-zero exit code or termination signal, instructs the process to close.
	*
	* @private
	* @param {(number|null)} code - exit code
	* @param {(string|null)} signal - termination signal
	* @returns {void}
	*/
	function processExit( code, signal ) {
		var error;
		if ( err ) {
			return;
		}
		if ( code !== null && code !== 0 ) {
			error = new Error( 'Child process failed with exit code: '+code+'.' );
		} else if ( signal !== null ) {
			error = new Error( 'Child process failed due to termination signal: '+signal+'.' );
		}
		if ( error ) {
			error.code = code;
			error.signal = signal;
			return close( error );
		}
	}

	/**
	* Callback invoked once all tasks are finished.
	*
	* @private
	* @returns {void}
	*/
	function done() {
		if ( err ) {
			return clbk( err );
		}
		clbk();
	}
}


// EXPORTS //

module.exports = exec;