BdpTaskAdapter

BdpTaskAdapter

new BdpTaskAdapter()

Source:
Implements:
Author:
License:
  • MIT Licensed Copyright (c) 2021 Chi Yang(楊崎) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Members

(inner) concurrencyDelayCount :number

Description:
  • In case of setting a high concurrency number that causes submitting too many jobs at one time. We added 1 to a delay count each time a job is sent. The job will be held for a delay interval times this delay count before it is processed. In this way, even if there are so many jobs in a queue are submitted concurrently, the actual commands are delayed and processed one by one with the time interval. Once a job is finished, there will be a small waiting time to determine whether the job submitting events are frequent. If the event is not frequent, meaning that the delayCount remains the same after the waiting duration, the delayCount will be reset to 0.

Source:
Default Value:
  • 1

In case of setting a high concurrency number that causes submitting too many jobs at one time. We added 1 to a delay count each time a job is sent. The job will be held for a delay interval times this delay count before it is processed. In this way, even if there are so many jobs in a queue are submitted concurrently, the actual commands are delayed and processed one by one with the time interval. Once a job is finished, there will be a small waiting time to determine whether the job submitting events are frequent. If the event is not frequent, meaning that the delayCount remains the same after the waiting duration, the delayCount will be reset to 0.

Type:
  • number

(inner) delayInterval :number

Description:
  • This is a delay interval in milliseconds for concurrent job submission. This prevents submitting too many jobs concurrently, especially when there are too many concurrent jobs to be submitted. This can be set via options.

Source:
Default Value:
  • 300

This is a delay interval in milliseconds for concurrent job submission. This prevents submitting too many jobs concurrently, especially when there are too many concurrent jobs to be submitted. This can be set via options.

Type:
  • number

(inner, readonly) isStopping :boolean

Description:
  • Using this property to understand whether the adapter is stopping. You may use it to prevent extra waiting in while loops.

Source:

Using this property to understand whether the adapter is stopping. You may use it to prevent extra waiting in while loops.

Type:
  • boolean

(inner) previousStatus :BatchStatus

Description:
  • This object stores the progress of batch tasks.

Source:

This object stores the progress of batch tasks.

Type:

(inner) queue :queue

Description:
  • A queue object from https://www.npmjs.com/package/queue

Source:

A queue object from https://www.npmjs.com/package/queue

Type:
  • queue

(inner) taskLogs :Map.<string, JobDefinition>

Description:
  • This variable stores all the task objects.

Source:
Default Value:
  • {}

This variable stores all the task objects.

Type:

Methods

(async) emitJobStatus(jobId, exitCode, signal)

Description:
  • This function emits the 'finish' event when the job exit code is 0 or the 'error' event when else. Note that calling this function does not stop the job process. This function only notify the adapter that the job is finished.

Source:
Parameters:
Name Type Description
jobId *

The job ID.

exitCode *

The job exit code

signal *

(optional) The job exit signal.

getAllJobIds() → {Array.<string>}

Description:
  • This function returns all jobIds.

Source:
Returns:

Returns all job Ids.

Type
Array.<string>

getJobById(jobId) → {JobDefinition}

Description:
  • This function returns the job definition object by giving the corresponding jobId.

Source:
Parameters:
Name Type Description
jobId *

The job id

Returns:

Returns the job definition object.

Type
JobDefinition

initialize() → {TaskAdapterInstance}

Description:
  • This function initializes the whole adapter process and must be called to get the Adapter object instance.

Source:
Returns:

An object that can be directly used by the developers. Used to run batch jobs.

Type
TaskAdapterInstance

(async) parseRecipe(jobObj, argumentRecipe) → {JobDefinition}

Description:
  • This function can be called inside BdpTaskAdapter.jobOverrides when you implement the jobOverrides function. This function basically parses the recipe file (in yaml format) and overrides the JobDefinition object. Then, it returns the updated job object. (Note: the original job object will be mutated.)

Source:
Parameters:
Name Type Description
jobObj JobDefinition

The job object

argumentRecipe string

The recipe file path

Returns:
Type
JobDefinition

(async, private, inner) _addJobs() → {Array.<JobDefinition>}

Description:
  • An async function that construct each task object in the this.jobStore

Source:
Returns:

This process should fill the this.taskLog object with each process.

Type
Array.<JobDefinition>

(async, private, inner) _checkStatus() → {BatchStatus}

Description:
  • This function checks job status and return the latest status object. If finished/exited jobs detected, emit finish/error event on the this.runningJobs.get(jobId).jobEmitter eventEmitter.

Source:
Returns:

The updated batch status.

Type
BatchStatus

(private, inner) _handleProxy(jobId, Proxy)

Description:
  • This private funciton handles the proxy registration and unregistration.

Source:
Parameters:
Name Type Description
jobId string

The job ID

Proxy Proxy

(Optional) The proxy object to register. When not giving this proxy, this function unregisters the proxy.

(async, private, inner) _jobAfterExitCallback(jobId, fileHandler)

Description:
  • This internal function is called after the jobExitCallback function. This function stores the resulting stdout/stderr files and updates the status of all jobs.

Source:
Parameters:
Name Type Description
jobId string
fileHandler FileHandler

(async, private, inner) _jobBeforeExitCallback(jobId, fileHandler)

Description:
  • This internal function will be called right after a job has exited (finished or errored) and before the jobExitCallback function. The function gets the remaining stdout/stderr messages, cleans up the streaming resources and unregisters the proxy.

Source:
Parameters:
Name Type Description
jobId string
fileHandler FileHandler

(async, private, inner) _loadProgress()

Description:
  • This function loads the index.txt file in the options.taskLogFolder to check if jobs are finished with the exitCode of 0 (normal exit). If the job is finished, then it is skipped. This is used for pipeline resuming if the running pipelines stop unexpectedly.

Source:

(async, private, inner) _monitorJobs(queue)

Description:
  • This function is used to monitor job status and periodically checks job status.

Source:
Parameters:
Name Type Description
queue

The real queue object to deal the this.runProcess function of each job.

(inner) _printStatus(currentStatus)

Source:
Parameters:
Name Type Description
currentStatus BatchStatus

The current status object with the fields {pending, queued, running, finishing, exit, done, total}; This function is used to print job progress in a batch task to stdout and stderr. You may customize this function for your prferred style.

(async, private, inner) _resumeJobs()

Description:
  • This function trying to fetch remote jobs. This is perticularly useful when adapter process somehow closed and re-run the same adapter process. Instead of directly re-creation of non-fnihsing jobs, adapter trys to fetch the job histroy. Then, the adapter can see if there is a need to re-submit jobs.

Source:

(async, private, inner) _runJob(jobId) → {promise}

Description:
  • An async function that used to run a single job.

Source:
Parameters:
Name Type Description
jobId string

The auto-generated and uniqe job ID

Returns:

A promise object that will be resovled when the task is finished normally and will be rejected when the task has errors (exitCode !== 0) or leaves unexpectedly.

Type
promise

(async, private, inner) _runTasks()

Description:
  • The core function to initiate the queue to process tasks.

Source:

(private, inner) _sendMsgToBdpClient(action, content)

Description:
  • This private function sends messages to bdp-server with process.send.

Source:
Parameters:
Name Type Description
action string

The message action type. The current possible values are 'registerProxy' or 'unregisterProxy'.

content object

The message content object of any types.

(async, private, inner) _waitingJobStatus(timeout)

Description:
  • This internal function waits until all job object are resolved (having their own exit codes).

Source:
Parameters:
Name Type Description
timeout *

(async, private, inner) _writeJobLogs()

Description:
  • Writing task progress to the the progress-index.txt file

Source:

(inner) setOptions(options) → {AdapterOption}

Description:
  • You may using this in your adapter class, such as super._setOptions(options).

Source:
Parameters:
Name Type Description
options AdapterOption
Returns:
Type
AdapterOption

BdpTaskAdapter

new BdpTaskAdapter(globalOption)

Description:
  • The constructor recieves the option object which contains information about the runtime settings. This global option will be overridden by task options. The default behavior of this constructor prints the adapter name and authors. Use globalOption.simplifiedTitle = true to specify a simplied title graph. Feel free to change your style.

Source:
Parameters:
Name Type Description
globalOption AdapterOption

An option object. See the setOptions for more information.

Members

(inner) concurrencyDelayCount :number

Description:
  • In case of setting a high concurrency number that causes submitting too many jobs at one time. We added 1 to a delay count each time a job is sent. The job will be held for a delay interval times this delay count before it is processed. In this way, even if there are so many jobs in a queue are submitted concurrently, the actual commands are delayed and processed one by one with the time interval. Once a job is finished, there will be a small waiting time to determine whether the job submitting events are frequent. If the event is not frequent, meaning that the delayCount remains the same after the waiting duration, the delayCount will be reset to 0.

Source:
Default Value:
  • 1

In case of setting a high concurrency number that causes submitting too many jobs at one time. We added 1 to a delay count each time a job is sent. The job will be held for a delay interval times this delay count before it is processed. In this way, even if there are so many jobs in a queue are submitted concurrently, the actual commands are delayed and processed one by one with the time interval. Once a job is finished, there will be a small waiting time to determine whether the job submitting events are frequent. If the event is not frequent, meaning that the delayCount remains the same after the waiting duration, the delayCount will be reset to 0.

Type:
  • number

(inner) delayInterval :number

Description:
  • This is a delay interval in milliseconds for concurrent job submission. This prevents submitting too many jobs concurrently, especially when there are too many concurrent jobs to be submitted. This can be set via options.

Source:
Default Value:
  • 300

This is a delay interval in milliseconds for concurrent job submission. This prevents submitting too many jobs concurrently, especially when there are too many concurrent jobs to be submitted. This can be set via options.

Type:
  • number

(inner, readonly) isStopping :boolean

Description:
  • Using this property to understand whether the adapter is stopping. You may use it to prevent extra waiting in while loops.

Source:

Using this property to understand whether the adapter is stopping. You may use it to prevent extra waiting in while loops.

Type:
  • boolean

(inner) previousStatus :BatchStatus

Description:
  • This object stores the progress of batch tasks.

Source:

This object stores the progress of batch tasks.

Type:

(inner) queue :queue

Description:
  • A queue object from https://www.npmjs.com/package/queue

Source:

A queue object from https://www.npmjs.com/package/queue

Type:
  • queue

(inner) taskLogs :Map.<string, JobDefinition>

Description:
  • This variable stores all the task objects.

Source:
Default Value:
  • {}

This variable stores all the task objects.

Type:

Methods

(async) emitJobStatus(jobId, exitCode, signal)

Description:
  • This function emits the 'finish' event when the job exit code is 0 or the 'error' event when else. Note that calling this function does not stop the job process. This function only notify the adapter that the job is finished.

Source:
Parameters:
Name Type Description
jobId *

The job ID.

exitCode *

The job exit code

signal *

(optional) The job exit signal.

getAllJobIds() → {Array.<string>}

Description:
  • This function returns all jobIds.

Source:
Returns:

Returns all job Ids.

Type
Array.<string>

getJobById(jobId) → {JobDefinition}

Description:
  • This function returns the job definition object by giving the corresponding jobId.

Source:
Parameters:
Name Type Description
jobId *

The job id

Returns:

Returns the job definition object.

Type
JobDefinition

initialize() → {TaskAdapterInstance}

Description:
  • This function initializes the whole adapter process and must be called to get the Adapter object instance.

Source:
Returns:

An object that can be directly used by the developers. Used to run batch jobs.

Type
TaskAdapterInstance

(async) parseRecipe(jobObj, argumentRecipe) → {JobDefinition}

Description:
  • This function can be called inside BdpTaskAdapter.jobOverrides when you implement the jobOverrides function. This function basically parses the recipe file (in yaml format) and overrides the JobDefinition object. Then, it returns the updated job object. (Note: the original job object will be mutated.)

Source:
Parameters:
Name Type Description
jobObj JobDefinition

The job object

argumentRecipe string

The recipe file path

Returns:
Type
JobDefinition

(async, private, inner) _addJobs() → {Array.<JobDefinition>}

Description:
  • An async function that construct each task object in the this.jobStore

Source:
Returns:

This process should fill the this.taskLog object with each process.

Type
Array.<JobDefinition>

(async, private, inner) _checkStatus() → {BatchStatus}

Description:
  • This function checks job status and return the latest status object. If finished/exited jobs detected, emit finish/error event on the this.runningJobs.get(jobId).jobEmitter eventEmitter.

Source:
Returns:

The updated batch status.

Type
BatchStatus

(private, inner) _handleProxy(jobId, Proxy)

Description:
  • This private funciton handles the proxy registration and unregistration.

Source:
Parameters:
Name Type Description
jobId string

The job ID

Proxy Proxy

(Optional) The proxy object to register. When not giving this proxy, this function unregisters the proxy.

(async, private, inner) _jobAfterExitCallback(jobId, fileHandler)

Description:
  • This internal function is called after the jobExitCallback function. This function stores the resulting stdout/stderr files and updates the status of all jobs.

Source:
Parameters:
Name Type Description
jobId string
fileHandler FileHandler

(async, private, inner) _jobBeforeExitCallback(jobId, fileHandler)

Description:
  • This internal function will be called right after a job has exited (finished or errored) and before the jobExitCallback function. The function gets the remaining stdout/stderr messages, cleans up the streaming resources and unregisters the proxy.

Source:
Parameters:
Name Type Description
jobId string
fileHandler FileHandler

(async, private, inner) _loadProgress()

Description:
  • This function loads the index.txt file in the options.taskLogFolder to check if jobs are finished with the exitCode of 0 (normal exit). If the job is finished, then it is skipped. This is used for pipeline resuming if the running pipelines stop unexpectedly.

Source:

(async, private, inner) _monitorJobs(queue)

Description:
  • This function is used to monitor job status and periodically checks job status.

Source:
Parameters:
Name Type Description
queue

The real queue object to deal the this.runProcess function of each job.

(inner) _printStatus(currentStatus)

Source:
Parameters:
Name Type Description
currentStatus BatchStatus

The current status object with the fields {pending, queued, running, finishing, exit, done, total}; This function is used to print job progress in a batch task to stdout and stderr. You may customize this function for your prferred style.

(async, private, inner) _resumeJobs()

Description:
  • This function trying to fetch remote jobs. This is perticularly useful when adapter process somehow closed and re-run the same adapter process. Instead of directly re-creation of non-fnihsing jobs, adapter trys to fetch the job histroy. Then, the adapter can see if there is a need to re-submit jobs.

Source:

(async, private, inner) _runJob(jobId) → {promise}

Description:
  • An async function that used to run a single job.

Source:
Parameters:
Name Type Description
jobId string

The auto-generated and uniqe job ID

Returns:

A promise object that will be resovled when the task is finished normally and will be rejected when the task has errors (exitCode !== 0) or leaves unexpectedly.

Type
promise

(async, private, inner) _runTasks()

Description:
  • The core function to initiate the queue to process tasks.

Source:

(private, inner) _sendMsgToBdpClient(action, content)

Description:
  • This private function sends messages to bdp-server with process.send.

Source:
Parameters:
Name Type Description
action string

The message action type. The current possible values are 'registerProxy' or 'unregisterProxy'.

content object

The message content object of any types.

(async, private, inner) _waitingJobStatus(timeout)

Description:
  • This internal function waits until all job object are resolved (having their own exit codes).

Source:
Parameters:
Name Type Description
timeout *

(async, private, inner) _writeJobLogs()

Description:
  • Writing task progress to the the progress-index.txt file

Source:

(inner) setOptions(options) → {AdapterOption}

Description:
  • You may using this in your adapter class, such as super._setOptions(options).

Source:
Parameters:
Name Type Description
options AdapterOption
Returns:
Type
AdapterOption