Hooks are custom code that can run at different stages during the execution of a flow to modify the behavior of export and import processes. You can think of them as Celigo platform events that are consumed during the flow to allow custom processing. Hooks based on stacks are hosted and run on your own server or an AWS Lambda function. When you use a stack-based hook, integrator.io makes a call to your server or AWS Lambda function at the appropriate stage of the flow, and the server or function then executes the code. This allows for more flexibility and control over the execution environment, and can be useful for more complex tasks or when you want to use resources or libraries on your own server..
Tip
Press the Home key to return to the top of this article (Fn + Left Arrow for Mac).
The pre save page hook is invoked on a page of records before the page is sent to subsequent steps in your flow. This hook can be used to add, update, or delete records. This hook is a great place to execute logic on batches of records at the same time. For example:
-
Filter or group records in batches
-
Apply calculations in batches
-
Perform complex data transformations in batches
-
Validate data and return errors for records
/* * preSavePageFunction stub: * * The name of the function can be changed to anything you like. * * The function will be passed one 'options' argument that has the following fields: * 'bearerToken' - a one-time bearer token which can be used to invoke selected integrator.io API routes. * 'data' - an array of records representing one page of data. A record can be an object {} or array [] depending on the data source. * 'errors' - an array of errors where each error has the structure {code: '', message: '', source: '', retryDataKey: ''}. * 'files' - file exports only. files[i] contains source file metadata for data[i]. i.e. files[i].fileMeta.fileName. * 'retryData' - a dictionary object containing the retry data for all errors: {retryDataKey: { data: , stage: '', traceKey: ''}}. * '_exportId' - the _exportId currently running. * '_connectionId' - the _connectionId currently running. * '_flowId' - the _flowId currently running. * '_integrationId' - the _integrationId currently running. * '_parentIntegrationId' - the parent of the _integrationId currently running. * 'pageIndex' - 0 based. context is the batch export currently running. * 'lastExportDateTime' - delta exports only. * 'currentExportDateTime' - delta exports only. * 'settings' - all custom settings in scope for the export currently running. * 'testMode'- boolean flag that executes script only on test mode and preview/send actions. * * The function needs to return an object that has the following fields: * 'data' - your modified data. * 'errors' - your modified errors. * 'abort' - instruct the batch export currently running to stop generating new pages of data. * 'newErrorsAndRetryData' - return brand new errors linked to retry data: [{retryData: , errors: []}]. * * Throwing an exception will signal a fatal error and stop the flow. */ exports.preSavePageFunction = function (options, callback) { // sample code that simply passes on what has been exported return callback(error, { data: options.data, errors: options.errors, abort: false, newErrorsAndRetryData: [] }) }
This preSavePage hook function replaces a potentially large child object with an error message if its size exceeds 500,000 characters (approximately 1 MB) in each data record.
function swapHugeSubObjectWithInlineErrorMsg (options, callback) { options.data.forEach(function(d) { let size = JSON.stringify(d.potentiallyBigSubObject).length if (size > 500000) { d.potentiallyBigSubObject = { errorMessage: "potentiallyBigSubObject > 1MB" } } }) return callback(null, { data: options.data, errors: options.errors, abort: false }) }
The preMap hook is invoked on a page of records before the records are mapped from source to destination structures. This hook can be used to validate, update, or ignore records before mapping rules are run. Changes made to source records in this hook will only persist for the duration of the import, and will not carry over to downstream applications in your flow. This hook is a great place to execute logic on batches of records to optimize the field mapping in Flow builder. For example:
-
Reformat fields or object structures to avoid complex mappings
-
Perform calculations on lists to avoid tedious mapping expressions
-
Load the destination record to pre-populate data needed by mapping logic
/* * preMapFunction stub: * * The name of the function can be changed to anything you like. * * The function will be passed one 'options' argument that has the following fields: * 'bearerToken' - a one-time bearer token which can be used to invoke selected integrator.io API routes. * 'data' - an array of records representing the page of data before it has been mapped. A record can be an object {} or array [] depending on the data source. * '_importId' - the _importId currently running. * '_connectionId' - the _connectionId currently running. * '_flowId' - the _flowId currently running. * '_integrationId' - the _integrationId currently running. * '_parentIntegrationId' - the parent of the _integrationId currently running. * 'settings' - all custom settings in scope for the import currently running. * 'testMode'- boolean flag that executes script only on test mode and preview/send actions. * * The function needs to return an array, and the length MUST match the options.data array length. * Each element in the array represents the actions that should be taken on the record at that index. * Each element in the array should have the following fields: * 'data' - the modified/unmodified record that should be passed along for processing. * 'errors' - used to report one or more errors for the specific record. Each error must have the following fields: {code: '', message: '', source: '' } * Returning an empty object {} for a specific record will indicate that the record should be ignored. * Returning both 'data' and 'errors' for a specific record will indicate that the record should be processed but errors should also be logged. * Throwing an exception will fail the entire page of records. */ exports.preMapFunction = function (options, callback) { return callback(error, options.data.map((d) = { return { data: d }}) }
This function will calculate and set the total
field on each record in the data
array, before the mapping has been applied.
function calculateTotalsUsingPreMap(options, callback) { return options.data.map(record => { // Initialize total to 0 for each record let total = 0; // Check if the record has items and iterate over them if (record.items && Array.isArray(record.items)) { record.items.forEach(item => { // Calculate total based on item properties, e.g., quantity and price total += (item.quantity || 0) * (item.price || 0); }); } // Set the total on the record record.total = total; // Return the modified record return (null, { data: record }); }); }
The postMap hook is invoked on a page of records after the records are mapped from source to destination structures. This hook can be used to validate, update or ignore records before they are submitted to the destination application. Changes made to source records in this hook will only persist for the duration of the import, and will not carry over to downstream applications in your flow. This hook is a great place to execute logic on batches of records to optimize the final visual payload building experience. For example:
-
Reformat fields or object structures to avoid complex handlebars expressions
-
Perform calculations on lists to avoid tedious handlebars expressions
-
Load the destination record to dynamically change the fields being submitted
/* * postMapFunction stub: * * The name of the function can be changed to anything you like. * * The function will be passed one argument 'options' that has the following fields: * 'bearerToken' - a one-time bearer token which can be used to invoke selected integrator.io API routes. * 'preMapData' - an array of records representing the page of data before it was mapped. A record can be an object {} or array [] depending on the data source. * 'postMapData' - an array of records representing the page of data after it was mapped. A record can be an object {} or array [] depending on the data source. * '_importId' - the _importId currently running. * '_connectionId' - the _connectionId currently running. * '_flowId' - the _flowId currently running. * '_integrationId' - the _integrationId currently running. * '_parentIntegrationId' - the parent of the _integrationId currently running. * 'settings' - all custom settings in scope for the import currently running. * 'testMode'- boolean flag that executes script only on test mode and preview/send actions. * * The function needs to return an array, and the length MUST match the options.data array length. * Each element in the array represents the actions that should be taken on the record at that index. * Each element in the array should have the following fields: * 'data' - the modified/unmodified record that should be passed along for processing. * 'errors' - used to report one or more errors for the specific record. Each error must have the following fields: {code: '', message: '', source: '' } * Returning an empty object {} for a specific record will indicate that the record should be ignored. * Returning both 'data' and 'errors' for a specific record will indicate that the record should be processed but errors should also be logged. * Throwing an exception will fail the entire page of records. */ exports.postMapFunction = function (options, callback) { return callback(error, options.postMapData.map((d) = { return { data: d }})) }
This function will calculate and set the total
field on each record in the postMapData
array, after the mapping has been applied.
function calculateTotalsUsingPostMap(options, callback) { return options.postMapData.map(record => { // Initialize total to 0 for each record let total = 0; // Check if the record has items and iterate over them if (record.items && Array.isArray(record.items)) { record.items.forEach(item => { // Calculate total based on item properties, e.g., quantity and price total += (item.quantity || 0) * (item.price || 0); }); } // Set the total on the record record.total = total; // Return the modified record return callback(null, { data: record }); }); }
The postSubmit hook is invoked on a page of records after the records are submitted to the destination application. This hook can be used to enhance error messages and/or modify the response objects returned by the destination application. Changes made to the response object are localized, and must be mapped back into the source record using a ‘response mapping’ to be visible in the flow. This hook is a great place to execute logic on batches of records to mitigate errors and/or to optimize response structures needed by subsequent steps in the flow. For example:
-
Ignore errors that have a specific code or message
-
Enhance confusing or cryptic error messages returned by external APIs
-
Delete sensitive data returned in the response
_json
/* * postSubmitFunction stub: * * The name of the function can be changed to anything you like. * * The function will be passed one 'options' argument that has the following fields: * 'bearerToken' - a one-time bearer token which can be used to invoke selected integrator.io API routes. * 'preMapData' - an array of records representing the page of data before it was mapped. A record can be an object {} or array [] depending on the data source. * 'postMapData' - an array of records representing the page of data after it was mapped. A record can be an object {} or array [] depending on the data source. * 'responseData' - an array of responses for the page of data that was submitted to the import application. An individual response will have the following fields: * 'statusCode' - 200 is a success. 422 is a data error. 403 means the connection went offline. * 'errors' - [{code: '', message: '', source: ''}] * 'ignored' - true if the record was filtered/skipped, false otherwise. * 'id' - the id from the import application response. * '_json' - the complete response data from the import application. * 'dataURI' - if possible, a URI for the data in the import application (populated only for errored records). * '_importId' - the _importId currently running. * '_connectionId' - the _connectionId currently running. * '_flowId' - the _flowId currently running. * '_integrationId' - the _integrationId currently running. * '_parentIntegrationId' - the parent of the _integrationId currently running. * 'settings' - all custom settings in scope for the import currently running. * 'testMode'- boolean flag that executes script only on test mode and preview/send actions. * * The function needs to return the responseData array provided by options.responseData. The length of the responseData array MUST remain unchanged. Elements within the responseData array can be modified to enhance error messages, modify the complete _json response data, etc... * Throwing an exception will fail the entire page of records. */ exports.postSubmitFunction = function (options, callback) { return callback(error, options.responseData) }
This function enhances cryptic error messages returned by an external application.
function enhanceErrorMessages(options, callback) { return options.responseData.map(response => { if (response.errors && response.errors.length > 0) { response.errors = response.errors.map(error => { // Example: Customize the error message for a specific error code if (error.code === 'SPECIFIC_ERROR_CODE') { error.message = 'A more descriptive and user-friendly error message.'; } return error; }); } return callback(null, response); }); }
The post aggregate hook is invoked after the final aggregated file is uploaded to the destination service. This hook is used to get information about the final file that was aggregated and uploaded to the external destination. This hook will not execute when the skip aggregation field is set to true.
/* * postAggregateFunction stub: * * The name of the function can be changed to anything you like. * * The function will be passed one 'options' argument that has the following fields: * 'bearerToken' - a one-time bearer token which can be used to invoke selected integrator.io API routes. * 'postAggregateData' - a container object with the following fields: * 'success' - true if data aggregation was successful, false otherwise. * '_json' - information about the aggregated data transfer. For example, the name of the aggregated file on the FTP site. * 'code' - error code if data aggregation failed. * 'message' - error message if data aggregation failed. * 'source' - error source if data aggregation failed. * '_importId' - the _importId currently running. * '_connectionId' - the _connectionId currently running. * '_flowId' - the _flowId currently running. * '_integrationId' - the _integrationId currently running. * '_parentIntegrationId' - the parent of the _integrationId currently running. * 'settings' - all custom settings in scope for the import currently running. * 'testMode'- boolean flag that executes script only on test mode and preview/send actions. * * The function doesn't need a return value. * Throwing an exception will signal a fatal error. */ exports.postAggregateFunction = function (options, callback) { return callback(error) }
This function sends the id
of a successfully uploaded file to a listener for subsequent processing.
const axios = require('axios') function sendSuccessfulFileIdToListener(options, callback) { // Check if aggregation was successful and _json contains the 'id' if (options.postAggregateData.success && options.postAggregateData._json && options.postAggregateData._json.id) { // Extract the 'id' from the _json object const id = options.postAggregateData._json.id; // Send the 'id' to the listener axios.post(`https://api.integrator.io/v1/exports/${id}/data`, {id: id}, { headers: { Authorization: `Bearer ${options.bearerToken}` }) .then(function (response) { return callback() }) .catch(function (error) { return callback(err) }) } }
Comments
0 comments
Please sign in to leave a comment.