Using AWS Lambda functions with the Salesforce Bulk API

Posted by Johan on Tuesday, September 12, 2017

One common task when integrating Salesforce with customers system is to import data, either as a one time task or regularly.

This can be done in several ways depending on the inhouse technical level and the simplest way might be to use the Import Wizard or the Data Loader. If you want to do it regularly in a batch fashion and are fortunate enough to have AWS infrastructure available using Lambda functions is an alternative.

Recently I did this as a prototype and I will share my findings here.

I will not go into details about AWS and Lambda, I used this tutorial to get started with Lambda functions but most of it didn’t concern the Salesforce parts but rather AWS specifics like IAM.

I found this Heroku project for using the bulk api.

The full python code looks like this:

from __future__ import print_function
from base64 import b64decode
import boto3
import uuid
import csv
import os
from salesforce_bulk import SalesforceBulk, CsvDictsAdapter
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')
username = os.environ['SF_USERNAME']
encrypted_password = os.environ['SF_PASSWORD']
encrypted_security_token = os.environ['SF_SECURITYTOKEN']
password = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_password))['Plaintext'].decode('ascii')
security_token = boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_security_token))['Plaintext'].decode('ascii')
mapping_file_bucket = os.environ['MAPPING_FILE_BUCKET']
mapping_file_key = os.environ['MAPPING_FILE_KEY']

def bulk_upload(csv_path, mapping_file_path):
    with open(csv_path, mode='r') as infile:
        logger.info('Trying to login to SalesforceBulk')
        job = None
        try:
            bulk = SalesforceBulk(username=username, password=password, security_token=security_token)
            job = bulk.create_insert_job("Account", contentType='CSV')

            # Mapping file
            mapping_file = open(mapping_file_path, 'rb')
            bulk.post_mapping_file(job, mapping_file.read())

            accounts = csv.DictReader(infile)
            csv_iter = CsvDictsAdapter(iter(accounts))
            batch = bulk.post_batch(job, csv_iter)
            bulk.wait_for_batch(job, batch)
            bulk.close_job(job)
            logger.info('Done. Accounts uploaded.')
        except Exception as e:
            if job:
                bulk.abort_job(job)
            raise e
        return 'OK'

def handler(event, context):
    for record in event['Records']:
        # Incoming CSV file
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        download_path = '/tmp/{}{}'.format(uuid.uuid4(), key)
        s3_client.download_file(bucket, key, download_path)

        # Mapping file
        mapping_file_path = '/tmp/{}{}'.format(uuid.uuid4(), mapping_file_key)
        s3_client.download_file(mapping_file_bucket, mapping_file_key, mapping_file_path)

        return bulk_upload(download_path, mapping_file_path)

Make sure to add the following environment variables in Lambda before executing SF_USERNAME - your SF username SF_PASSWORD - your SF password (encrypted) SF_SECURITYTOKEN - your SF security token (encrypted) MAPPING_FILE_BUCKET - the bucket in where to find the mapping file MAPPING_FILE_KEY - the path to the mapping file in that bucket

I also added a method (In my own clone of the project here) to be able to provide the mapping file as part of the payload, I’ll make sure to create a pull request for this later.

The nice thing with using the Bulk API is that you get the monitoring directly in Salesforce, just go to to see the status of your job(s).

I haven’t added the listen to S3-trigger yet but it’s the next part of the tutorial so shouldn’t be a problem.

Cheers, Johan