20 Dec 2018 | Analyst Recipe

Use Lambda Functions to Move Data From Redshift

Charles Wang
Charles Wang
Use Lambda Functions to Move Data From Redshift
Learn how to incrementally migrate data from Redshift tables using AWS Lambda.

Use Lambda Functions to Move Data From Redshift

At Fivetran, we pipe data into several different data warehouses in order to accommodate a wide range of technology stacks and use cases. Different data warehouses come with different characteristics and tradeoffs, though they can also be broadly similar in performance.

Sometimes our customers need to move modest amounts of data from Redshift to Snowflake or BigQuery. Since Redshift is a part of the Amazon Web Services (AWS) cloud platform, anyone who uses Redshift can also access AWS Lambda.

Let’s walk through how, with some configuration in AWS and a bit of Python, you can use the Fivetran Lambda connector to stream data from your Redshift cluster into the data warehouse of your choice. Please note that this is meant specifically for smaller-scale, ad-hoc migrations. For full schema replications, you should consider more conventional approaches. This tutorial assumes that the reader has a working knowledge of Python and SQL.

The idea here is to demonstrate a lightweight process to incrementally add data from a small-ish (i.e. sub-gigabyte scale) table to your destination data warehouse. AWS Lambda has a 6MB payload limit, so it is not practical to migrate very large tables this way.

Designate Your Source Redshift Table

Given the 6MB payload limit applied to AWS Lambda, you want a relatively small table that can be migrated in a reasonable amount of time. Get a sense of how large your table is and how many sub-6MB increments it will take to load it. Using your SQL query tool (I use SQL Workbench/J to access Redshift), run the following SQL, where ‘[table_name]’ is the table name enclosed in single quotes:

SELECT "table", size, tbl_rows FROM SVV_TABLE_INFO WHERE "table" = '[table_name]'

If you do not have permission to run this query, ask the designated superuser at your organization. This will tell you how many rows the table has and its size in megabytes. Later, you will use this to adjust the LIMIT clause of the SQL query in your script accordingly.

Make note of the following details:

  1. Database name
  2. Host
  3. Port
  4. User
  5. Password
  6. Table name
  7. Primary key(s) if you’re migrating to Snowflake; BigQuery doesn’t care about primary keys
  8. Cursor column(s) – identify a column or set of columns with unique values that you can order sequentially and use to bookmark your progress
  9. LIMIT increment

Retain these parameters for your script later on. You can either store them in a separate file or, more practically, enter them as “Secret” parameters on Fivetran and as test events on AWS Lambda. The advantage of doing the latter is that you can reuse the very same Lambda function without modification (only different Secrets) in multiple connectors.

Write a Python Script

from datetime import datetime, date
from time import struct_time, mktime
import decimal
import pg8000


# Connect via pg8000
def get_connection(database, host, port, user, password):
    conn = None
    try:
        conn = pg8000.connect(database=database, host=host, port=port, user=user, password=password, ssl=True)
    except Exception as err:
        print(err)
    return conn


# Handle Python data types such as datetime and decimal
def encode_json(data):
    if isinstance(data, datetime):
        return str(data)
    if isinstance(data, date):
        return str(data)
    if isinstance(data, decimal.Decimal):
        return float(data)
    if isinstance(data, struct_time):
        return datetime.fromtimestamp(mktime(data))
    return data


# Handler function
def lambda_handler(request, context):
    # 1. Import AWS and database credentials
    # These and other parameters should be wrapped up in 'request,' which is relayed from the connector's 'secrets'
    dbname = request['secrets']['dbname']
    host = request['secrets']['host']
    port = int(request['secrets']['port'])
    user = request['secrets']['user']
    password = request['secrets']['password']

Check out the full python script here.

The script I’ve provided is intended for Python 3.7, although it is backward compatible with Python 3.6 and Python 2.7 as well. You will also need pg8000 for accessing redshift. The advantage of pg8000 over the more popular psycopg2 is that it isn’t compiled, which makes installing the dependencies in AWS Lambda easier

Install the dependencies locally using pip install per usual.

The handler that performs the actual migration is called lambda_handler in this script. Your script will perform the following actions (also thusly numbered in the script):

  1. Import AWS and database credentials
  2. Set your state
  3. Connect to Redshift
  4. Query Redshift. You will ORDER BY your cursor and apply the appropriate LIMIT increment. The pg8000 package we are using is a wrapper for SQL, so there will be SQL embedded in your Python code.
  5. Generate the JSON response and save your state. The response is a JSON object in the format described here. You won’t need to return a 200 status, unlike with Google Cloud Functions.

You should run the script a few times locally to see what happens. Note that you will have to temporarily add lambda_handler() to the bottom of your script to run it directly as the main program, as when uploaded the script is just a collection of functions.

Set Up an AWS Lambda Function

Make sure you have access to the following in AWS:

  1. IAM policies with access to Redshift and Lambda
  2. An IAM role with the above policies attached

If you don’t, then get access. You will need to create a Lambda function as well. Detailed instructions can be found in our documentation here.

Once you create the Lambda, choose the IAM role with Redshift, and Lambda access as the “Execution role.” In “Basic Settings,” you should set the timeout to the maximum possible: 15 minutes. You’re only billed for actual time run, so there is no reason to set a low time limit.

Set the runtime to Python 3.7 (or 3.6 or 2.7), and make sure handler is in the format [name_of_file].[name_of_handler_function].

Return to the directory where you have your Python script. Enter your command line, and use the following command to place your Python dependencies into the same directory as your script:

pip install pg8000 --target .

Make sure to include the period at the end – it is not a typo. In our example, we only need to install pg8000. The boto3 package comes pre-installed on the AWS Lambda Python runtimes.

Now, you will zip up your script and the pg8000 folder (you don’t need the folder ending in “dist-info”).

I zipped them by highlighting, right-clicking, and using the “Compress” action on a Mac; on a Windows machine, right-clicking and using “Send to” and “Compressed (zipped) folder” will accomplish the same. You can also use the command line per instructions here.

Upload the zipped archive containing these items via the drop-down menu under Code entry type on the page where you are configuring your lambda.

Set Up Fivetran Lambda Function Connector

Create an AWS Lambda connector as you would any other connector. Enter the ARN from the role you’re using, and enter the name of the lambda function into the respective text boxes. Make note of the “External ID” as you will need it shortly.

You will have to revisit the IAM role you are using. Go to Trust relationships and click on Edit trust relationship. You will need to add the following JSON to the Statement key:

{
  "Effect": "Allow",
  "Principal": {
    "Service": "lambda.amazonaws.com"
  },
  "Action": "sts:AssumeRole"
}

Furthermore, make sure that the value for Statement.Condition.StringEquals.sts:ExternalId is your External Id, in double quotes. My External Id was “accompanying_legged,” so my Policy Document looked like this:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::834469178297:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "accompanying_legged"
        }
      }
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Now you should be all set! You can run the syncs as often as you need and halt them when they are no longer needed.

As noted before, the full Python source code can be found here. I have taken pains to thoroughly document every function and hope that you find it helpful; feel free to get in touch with me at charles@fivetran.com with any questions. If you don’t use Fivetran at present but are interested, feel free to request a demo here.

Are You A Data Expert?

Start a free trial today.

Discover the smartest solution for data-driven results.
We have detecting that you are using an adblocking plugin in your browser. We don't show ads, but we rely on advertising services, so it might restrict you from completing important functions or seeing important content. Please make sure you whitelist our website in your adblocking plugin.
Fivetran uses cookies to enhance your user experience and improve the quality of our website. Unless you disable cookies, you consent to the placement and use of cookies as described in our Privacy Policy by continuing to use this website.
Adblock Detection