Datalake Salesforce Export
I need to set-up flows to run on a nightly basis to feed data from our Salesforce HealthCloud instance to our AWS S3 datalake.
Ideally, I would like to pull the delta on objects (i.e. Accounts, Contacts, etc) nightly, but I am not sure the best approach to pulling all fields from each object. I haven't used the real time export before, but from reading the article in the community it seems like this triggers anytime there is an update to the object instead of being able to schedule it and pull all updates on a nightly basis.
The second approach seems like it would be to write a large SOQL query, but that doesn't seem ideal either and cumbersome to update anytime a new field is added to the object.
What would be the best approach to pulling data from objects on a nightly basis and pushing to AWS S3?
-
Hi Dave. With respect to the "nightly pull", I would use the scheduling feature to pick when you want it to run and then the "Delta" processing feature of the export step. That should handle the frequency that you want. The node would look something like this.
Unfortunately, Salesforce has an odd limit imposed if you use the "Fields(All)" directive so that it only will allow 200 records. The way around that is to write out the SOQL. Yes, it is cumbersome of the way they have configured their sql-like syntax.0 -
Dave Guderian you will have to basically do what you did for HubSpot.
- Flow 1: Get the list of Salesforce fields on the object you want. You can build a SOQL query to get that, but I've found the REST api is even better because it gives you more data about the fields and lets you know if they are queryable. Here is an article I made on how to describe an option via Salesforce REST api: https://docs.celigo.com/hc/en-us/articles/19906581226779-Connect-to-Salesforce-REST-relative-URIs-via-proxy. This flow will need to update the settings (or the SOQL query field directly) on flow #2 and your SOQL query will need to reference the settings.
- Flow 2 will run the SOQL query.
While the fields(all) method works, it only works for 200 results and it doesn't get the next 200-400 results, etc. You have to explicitly state all the fields you want which is why you need to make it like you have for HubSpot.
0 -
Thank you both for the feedback.
Tyler- I was able to follow the steps to get the information to flow 2, but I don't understand how to update the SOQL query with the fields (i.e. name) that I am fetching in flow 1.
Do I need to change the salesforce connection again in step 2, or can i use a standard export?
Export from Step 2 below, but not sure how to pass in the names from step 1.
Flow 1 Names:
Flow 1 Steps:
0 -
Dave Guderian I'll do you one better. Install this integration zip and it should create as many flows as you need in order to sync over all your Salesforce data to S3. Basically, it has a flow that creates other flows based on the integration settings you put in. It gives you a drop down of Salesforce objects to select to sync over and will automatically get all the fields for each object and then update each SOQL query. As for the scheduling, you just schedule the first flow to run and then all the others will run after it.
0 -
Tyler-
Thank you so much for taking the time to put this together. I am trying to get it up and running but am getting some errors. I'm not sure what the error is indicating or if perhaps one of the fields is not appropriately populating prior to hitting the branching?
Here is the step I am receiving the error on.
Here is the error (it looks like the SF ID isnt being populated):
I put a "stop" in just before the branching so I could catch the payload before the branch:
{
"object": "CONTACT",
"io_custom_trace_key": "CONTACT",
"_integrationId": "6603013086459fbe7173e9aa",
"_flowId": "660301984f60bcf1086f2df1",
"ioConnectionId": "65696293b489e6652ef0a0a2",
"salesforceConnectionId": "6537e90cbf1f317bc0f17b43",
"s3ConnectionId": "655e3fd30d546c156dae60eb",
"s3Bucket": "clearwater-sandbox-datalake",
"s3Path": "raw/source=salesforce-health-cloud",
}0 -
I think I may have figured it out... I changed "sandbox": true and that seemed to work. Any other changes you can think of off hand that I may need to make?
0 -
Dave Guderian I guess you probably installed this in sandbox? I hadn't thought about that, but it makes sense it failed since I made this in production. Nothing else I can think of.
0 -
Thanks Tyler. Looks like I am running into a similar issue on the second to last step on flow 1, but I'm not sure where to make the change on this step.
0 -
Dave Guderian I made some updates and the link should download you the new version. I should have mapped sandbox everywhere because by not mapping sandbox, it defaults to production. So there was a mismatch in what environments resources were being made.
0 -
Thanks Tyler. I just downloaded it again from the link and it now it only has the one flow in the .zip file. Is that correct? Also, the only connection was the integrator.io connection.
0 -
Dave Guderian 3rd time is the charm. Try the new version in the link above.
0 -
Tyler-
You're amazing! This is totally cool! I do have a couple of follow-up questions (as I'm sure you could have guessed)!
Question 1:
I see my file in AWS (so flow 1 worked), but I don't see that flow 2 actually ran (I set just to pull the ACCOUNT object). When would you expect to see flow 2 run?
Can you elaborate on what is actually driving the placement of the file in AWS. Is it the file key that is being set within flow 1, or is it the actual file key on the AWS connection? I am assuming that since flow 2 didn't run, it must be the file key in flow 1 that is driving the placement of the file in AWS.
Lastly, I need to get the files going to the right overall spot in AWS! Is this the correct syntax to use based on what we discussed in office hours?
raw/source=salesforce-health-cloud/table={{settings.flow.object}}/year={{dateFormat "YYYY" date}}/month={{dateFormat "MM" date}}/day={{dateFormat "DD" date}}/account_{{timestamp "YYYY-MM-DDThhmmss" "UTC"}}Z.json
Your the best Tyler. Thanks again for all the help on this!
0 -
Dave Guderian the all generated flows should run after the first flow. The last step in the first flow is to trigger the generated flows to run. Maybe run the first flow again and see if it goes now? I've been testing fresh installs and mine are running. Maybe debug that import step and see what the responses are.
The first flow is updating and creating the exports, imports, and flow so it is driving what the import resource looks like. All it's doing is populating the file key field that you see within the UI on the S3 import step.
For your file path, I would have something like this:
Update import:
{{record._PARENT.s3Path}}/table={{lowercase record._PARENT.object}}/year=\{{timestamp \"YYYY\"}}/month=\{{timestamp \"MM\"}}/day=\{{timestamp \"DD\"}}/{{lowercase record._PARENT.object}}_\{{timestamp \"YYYY-MM-DDThhmmss[Z]\"}}.json
Create import:
{{record.s3Path}}/table={{lowercase record.object}}/year=\{{timestamp \"YYYY\"}}/month=\{{timestamp \"MM\"}}/day=\{{timestamp \"DD\"}}/{{lowercase record.object}}_\{{timestamp \"YYYY-MM-DDThhmmss[Z]\"}}.json
0 -
Tyler- This did work correctly. I just didnt wait long enough apparently to see the additional flow come up (it created a new account flow that ran for flow 2). I also used the handlebars/syntax you suggested and the files are landing in the appropriate places in S3, so everything appears to be working correctly from that standpoint.
That said, I did have one error that popped up when flow 1 ran for one of the objects. Is this because there are too many fields being pulled in the SOQL, and if so is there a work-around for this?
0 -
Dave Guderian give the new version a shot.
0 -
Thanks Tyler. That worked great and the flow ran without any errors.
Last question (I think! 😁). What (if any) changes do I need to make to the flows for the initial batch upload (i.e. all current/historical data in the system) and then what (if any) changes do I need to make for the incremental nightly loads?
I cant say thanks enough for the time you put into this. I know other customers will benefit from this as well if they are moving data from SF to AWS!
0 -
Dave Guderian when the first flow runs, it will run the flows that it created and updated. Each export on each flow is analyzed to determine if the export should be a export type of delta or an export type of all. I determine if the export should be a delta if any of the following fields are on the Salesforce object: ["SystemModstamp","LastModifiedDate","CreatedDate","LoginTime","Timestamp","UpdatedDate"]. If none of those fields are on the Salesforce object, then it gets set to an export type = all and will not run on a delta. When a new flow is created for an object that supports a delta export, the flow will run and pull data back to 2000. On subsequent runs, it will run off of delta data. If you want to trigger a resync of all tables, you can use the new button I added into the integration settings. Clicking this checkbox and saving the form will trigger all flows to run and pull data again back to 2000. To get the new version, use the above link again.
0 -
Tyler-
I am consistently getting some errors with one of my objects. Is the integration looking at all fields on the object, or is there maybe a filter that would need to be applied to only pull a field if its active? I'm just not clear what this error is indicating (note its on the second flow that would import into AWS S3)?
0 -
Dave Guderian is that field deprecated in Salesforce? I don't have any field filters now, but it could be easily added. I'm curious what is different about that specific field that makes it ineligible to query.
0 -
Tyler - I believe its because of the child relationship. For example, in order to return values I have to query:
select id, (Select id from RenewedInsurancePolicies) from insurancepolicy
0 -
Dave Guderian the screenshot you sent doesn't match up to the error? The errors says it's an issue with a field named "CoverageCode".
0 -
Thats my fault Tyler. This is the correct error.
0 -
What's the api name for that field? I have a similar field setup and it works fine. Here is my field from the describe lookup call. Can you do the describe call for this object and send me what the metadata for that field looks like?
{
"aggregatable": true,
"aiPredictionField": false,
"autoNumber": false,
"byteLength": 18,
"calculated": false,
"calculatedFormula": null,
"cascadeDelete": false,
"caseSensitive": false,
"compoundFieldName": null,
"controllerName": null,
"createable": true,
"custom": true,
"defaultValue": null,
"defaultValueFormula": null,
"defaultedOnCreate": false,
"dependentPicklist": false,
"deprecatedAndHidden": false,
"digits": 0,
"displayLocationInDecimal": false,
"encrypted": false,
"externalId": false,
"extraTypeInfo": null,
"filterable": true,
"filteredLookupInfo": null,
"formulaTreatNullNumberAsZero": false,
"groupable": true,
"highScaleNumber": false,
"htmlFormatted": false,
"idLookup": false,
"inlineHelpText": null,
"label": "Contact",
"length": 18,
"mask": null,
"maskType": null,
"name": "ContactCustom__c",
"nameField": false,
"namePointing": false,
"nillable": true,
"permissionable": true,
"picklistValues": [],
"polymorphicForeignKey": false,
"precision": 0,
"queryByDistance": false,
"referenceTargetField": null,
"referenceTo": [
"Contact"
],
"relationshipName": "ContactCustom__r",
"relationshipOrder": null,
"restrictedDelete": false,
"restrictedPicklist": false,
"scale": 0,
"searchPrefilterable": true,
"soapType": "tns:ID",
"sortable": true,
"type": "reference",
"unique": false,
"updateable": true,
"writeRequiresMasterRead": false
}0 -
Will this work? I used a little different method.... not as pretty I know. Let me know, and I can do the describe call if not.
All available metadata for "RenewedFromPolicyId"
Key
Value
🔍 desc.aggregatable
true
🔍 desc.aiPredictionField
false
🔍 desc.autoNumber
false
🔍 desc.byteLength
18
🔍 desc.calculated
false
🔍 desc.calculatedFormula
(Blank)
🔍 desc.cascadeDelete
false
🔍 desc.caseSensitive
false
🔍 desc.compoundFieldName
(Blank)
🔍 desc.controllerName
(Blank)
🔍 desc.createable
true
🔍 desc.custom
false
🔍 desc.defaultValue
(Blank)
🔍 desc.defaultValueFormula
(Blank)
🔍 desc.defaultedOnCreate
false
🔍 desc.dependentPicklist
false
🔍 desc.deprecatedAndHidden
false
🔍 desc.digits
0
🔍 desc.displayLocationInDecimal
false
🔍 desc.encrypted
false
🔍 desc.externalId
false
🔍 desc.extraTypeInfo
(Blank)
🔍 desc.filterable
true
🔍 desc.filteredLookupInfo
(Blank)
🔍 desc.formulaTreatNullNumberAsZero
false
🔍 desc.groupable
true
🔍 desc.highScaleNumber
false
🔍 desc.htmlFormatted
false
🔍 desc.idLookup
false
🔍 desc.inlineHelpText
(Blank)
🔍 desc.label
Insurance Policy ID
🔍 desc.length
18
🔍 desc.mask
(Blank)
🔍 desc.maskType
(Blank)
🔍 desc.name
RenewedFromPolicyId
🔍 desc.nameField
false
🔍 desc.namePointing
false
🔍 desc.nillable
true
🔍 desc.permissionable
true
🔍 desc.polymorphicForeignKey
false
🔍 desc.precision
0
🔍 desc.queryByDistance
false
🔍 desc.referenceTargetField
(Blank)
🔍 desc.referenceTo.0
InsurancePolicy
🔍 desc.relationshipName
RenewedFromPolicy
🔍 desc.relationshipOrder
(Blank)
🔍 desc.restrictedDelete
false
🔍 desc.restrictedPicklist
false
🔍 desc.scale
0
🔍 desc.searchPrefilterable
true
🔍 desc.soapType
tns:ID
🔍 desc.sortable
true
🔍 desc.type
reference
🔍 desc.unique
false
🔍 desc.updateable
true
🔍 desc.writeRequiresMasterRead
false
🔍 part.attributes.type
EntityParticle
🔍 part.attributes.url
/services/data/v60.0/tooling/sobjects/EntityParticle/InsurancePolicy.RenewedFromPolicy
🔍 part.QualifiedApiName
RenewedFromPolicyId
🔍 part.Label
Insurance Policy ID
🔍 part.DataType
reference
🔍 part.ReferenceTo.referenceTo.0
InsurancePolicy
🔍 part.Length
18
🔍 part.Precision
0
🔍 part.Scale
0
🔍 part.IsAutonumber
false
🔍 part.IsCaseSensitive
false
🔍 part.IsDependentPicklist
false
🔍 part.IsEncrypted
false
🔍 part.IsIdLookup
false
🔍 part.IsHtmlFormatted
false
🔍 part.IsNillable
true
🔍 part.IsUnique
false
🔍 part.IsCalculated
false
🔍 part.InlineHelpText
(Blank)
🔍 part.FieldDefinition.attributes.type
FieldDefinition
🔍 part.FieldDefinition.attributes.url
/services/data/v60.0/tooling/sobjects/FieldDefinition/InsurancePolicy.RenewedFromPolicy
🔍 part.FieldDefinition.DurableId
InsurancePolicy.RenewedFromPolicy
🔍 part.EntityDefinition.attributes.type
EntityDefinition
🔍 part.EntityDefinition.attributes.url
/services/data/v60.0/tooling/sobjects/EntityDefinition/InsurancePolicy
🔍 part.EntityDefinition.DurableId
InsurancePolicy
0 -
Dave Guderian can we cover in office hours next week? I'm not too sure here.
0
Please sign in to leave a comment.
Comments
26 comments