new BdpTaskAdapter()
- Source:
- Implements:
- License:
- MIT Licensed
Copyright (c) 2021 Chi Yang(楊崎)
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- MIT Licensed
Copyright (c) 2021 Chi Yang(楊崎)
Members
(inner) concurrencyDelayCount :number
- Description:
In case of setting a high concurrency number that causes submitting too many jobs at one time. We added 1 to a delay count each time a job is sent. The job will be held for a delay interval times this delay count before it is processed. In this way, even if there are so many jobs in a queue are submitted concurrently, the actual commands are delayed and processed one by one with the time interval. Once a job is finished, there will be a small waiting time to determine whether the job submitting events are frequent. If the event is not frequent, meaning that the
delayCount
remains the same after the waiting duration, the delayCount will be reset to 0.
- Source:
- Default Value:
- 1
In case of setting a high concurrency number that causes submitting too many jobs at one time.
We added 1 to a delay count each time a job is sent. The job will be held for a delay interval times this delay count before it is processed.
In this way, even if there are so many jobs in a queue are submitted concurrently, the actual commands are delayed and processed one by one with the time interval.
Once a job is finished, there will be a small waiting time to determine whether the job submitting events are frequent.
If the event is not frequent, meaning that the delayCount
remains the same after the waiting duration, the delayCount will be reset to 0.
Type:
- number
(inner) delayInterval :number
- Description:
This is a delay interval in milliseconds for concurrent job submission. This prevents submitting too many jobs concurrently, especially when there are too many concurrent jobs to be submitted. This can be set via options.
- Source:
- Default Value:
- 300
This is a delay interval in milliseconds for concurrent job submission. This prevents submitting too many jobs concurrently, especially when there are too many concurrent jobs to be submitted. This can be set via options.
Type:
- number
(inner, readonly) isStopping :boolean
- Description:
Using this property to understand whether the adapter is stopping. You may use it to prevent extra waiting in while loops.
- Source:
Using this property to understand whether the adapter is stopping. You may use it to prevent extra waiting in while loops.
Type:
- boolean
(inner) previousStatus :BatchStatus
This object stores the progress of batch tasks.
Type:
(inner) queue :queue
A queue object from https://www.npmjs.com/package/queue
Type:
- queue
(inner) taskLogs :Map.<string, JobDefinition>
- Description:
This variable stores all the task objects.
- Source:
- Default Value:
- {}
This variable stores all the task objects.
Type:
- Map.<string, JobDefinition>
Methods
(async) emitJobStatus(jobId, exitCode, signal)
- Description:
This function emits the 'finish' event when the job exit code is 0 or the 'error' event when else. Note that calling this function does not stop the job process. This function only notify the adapter that the job is finished.
- Source:
Parameters:
Name | Type | Description |
---|---|---|
jobId |
* | The job ID. |
exitCode |
* | The job exit code |
signal |
* | (optional) The job exit signal. |
getAllJobIds() → {Array.<string>}
Returns:
Returns all job Ids.
- Type
- Array.<string>
getJobById(jobId) → {JobDefinition}
- Description:
This function returns the job definition object by giving the corresponding jobId.
- Source:
Parameters:
Name | Type | Description |
---|---|---|
jobId |
* | The job id |
Returns:
Returns the job definition object.
- Type
- JobDefinition
initialize() → {TaskAdapterInstance}
- Description:
This function initializes the whole adapter process and must be called to get the Adapter object instance.
- Source:
Returns:
An object that can be directly used by the developers. Used to run batch jobs.
- Type
- TaskAdapterInstance
(async) parseRecipe(jobObj, argumentRecipe) → {JobDefinition}
- Description:
This function can be called inside BdpTaskAdapter.jobOverrides when you implement the jobOverrides function. This function basically parses the recipe file (in yaml format) and overrides the JobDefinition object. Then, it returns the updated job object. (Note: the original job object will be mutated.)
- Source:
Parameters:
Name | Type | Description |
---|---|---|
jobObj |
JobDefinition | The job object |
argumentRecipe |
string | The recipe file path |
Returns:
- Type
- JobDefinition
(async, private, inner) _addJobs() → {Array.<JobDefinition>}
- Description:
An async function that construct each task object in the this.jobStore
- Source:
Returns:
This process should fill the this.taskLog
object with each process.
- Type
- Array.<JobDefinition>
(async, private, inner) _checkStatus() → {BatchStatus}
- Description:
This function checks job status and return the latest status object. If finished/exited jobs detected, emit finish/error event on the this.runningJobs.get(jobId).jobEmitter eventEmitter.
- Source:
Returns:
The updated batch status.
- Type
- BatchStatus
(private, inner) _handleProxy(jobId, Proxy)
- Description:
This private funciton handles the proxy registration and unregistration.
- Source:
Parameters:
Name | Type | Description |
---|---|---|
jobId |
string | The job ID |
Proxy |
Proxy | (Optional) The proxy object to register. When not giving this proxy, this function unregisters the proxy. |
(async, private, inner) _jobAfterExitCallback(jobId, fileHandler)
- Description:
This internal function is called after the
jobExitCallback
function. This function stores the resulting stdout/stderr files and updates the status of all jobs.
- Source:
Parameters:
Name | Type | Description |
---|---|---|
jobId |
string | |
fileHandler |
FileHandler |
(async, private, inner) _jobBeforeExitCallback(jobId, fileHandler)
- Description:
This internal function will be called right after a job has exited (finished or errored) and before the
jobExitCallback
function. The function gets the remaining stdout/stderr messages, cleans up the streaming resources and unregisters the proxy.
- Source:
Parameters:
Name | Type | Description |
---|---|---|
jobId |
string | |
fileHandler |
FileHandler |
(async, private, inner) _loadProgress()
- Description:
This function loads the index.txt file in the
options.taskLogFolder
to check if jobs are finished with theexitCode
of 0 (normal exit). If the job is finished, then it is skipped. This is used for pipeline resuming if the running pipelines stop unexpectedly.
- Source:
(async, private, inner) _monitorJobs(queue)
- Description:
This function is used to monitor job status and periodically checks job status.
- Source:
Parameters:
Name | Type | Description |
---|---|---|
queue |
The real queue object to deal the |
(inner) _printStatus(currentStatus)
Parameters:
Name | Type | Description |
---|---|---|
currentStatus |
BatchStatus | The current status object with the fields {pending, queued, running, finishing, exit, done, total}; This function is used to print job progress in a batch task to stdout and stderr. You may customize this function for your prferred style. |
(async, private, inner) _resumeJobs()
- Description:
This function trying to fetch remote jobs. This is perticularly useful when adapter process somehow closed and re-run the same adapter process. Instead of directly re-creation of non-fnihsing jobs, adapter trys to fetch the job histroy. Then, the adapter can see if there is a need to re-submit jobs.
- Source:
(async, private, inner) _runJob(jobId) → {promise}
Parameters:
Name | Type | Description |
---|---|---|
jobId |
string | The auto-generated and uniqe job ID |
Returns:
A promise object that will be resovled when the task is finished normally and will be rejected when the task has errors (exitCode !== 0) or leaves unexpectedly.
- Type
- promise
(async, private, inner) _runTasks()
(private, inner) _sendMsgToBdpClient(action, content)
- Description:
This private function sends messages to bdp-server with process.send.
- Source:
Parameters:
Name | Type | Description |
---|---|---|
action |
string | The message action type. The current possible values are 'registerProxy' or 'unregisterProxy'. |
content |
object | The message content object of any types. |
(async, private, inner) _waitingJobStatus(timeout)
- Description:
This internal function waits until all job object are resolved (having their own exit codes).
- Source:
Parameters:
Name | Type | Description |
---|---|---|
timeout |
* |
(async, private, inner) _writeJobLogs()
(inner) setOptions(options) → {AdapterOption}
- Description:
You may using this in your adapter class, such as
super._setOptions(options)
.
- Source:
Parameters:
Name | Type | Description |
---|---|---|
options |
AdapterOption |
Returns:
- Type
- AdapterOption