Sign up for our newsletter.
Receive updates about new integrations and company updates.
don't worry, we won't spam you
Analyst Recipe

How to use cloud functions to ETL custom data sources into your warehouse

For a demo of using a cloud function to sync Justin Bieber's twitter feed into a data warehouse, check out this webinar.

At Fivetran, our core product is a set automated connectors that sync all your company's data into your data warehouse with zero configuration and minimal setup. This strategy works well for standard data sources: everything from Salesforce to MySQL to S3 can be synced to your data warehouse in one click. We've spent years building great connectors that know how to capture incremental changes accurately from each source and deliver great schemas in your data warehouse.

But there are some data sources where our strategy of building standardized connectors doesn't work:

  • Custom APIs
  • Obscure APIs that have only a few users
  • Data formats that are not self-describing, like Protobuf

How can Fivetran support custom data sources like these? Our answer is our cloud-functions connector. Here's how it works:

  • You write a tiny function that fetches data from the custom source
  • Host this function on a serverless computing platform: Google Cloud Functions, AWS Lambda or Azure Functions
  • Fivetran calls your function every 15 minutes to fetch new data
  • Fivetran deduplicates this data and merges it into your warehouse

To see how this works, let's walk through a simple example. We're going to pull data from Justin Bieber's twitter feed; this example is a little silly, but it contains just enough complexity to make a good demo.

Request format

Every Fivetran request follows this format:

{
    "secrets": {...},
    "state": {...}
}

The secrets object allows you to store secrets like database passwords and API keys. In this example, we're going to store our Twitter consumerKey and consumerSecret:

Setup form

The state is how you implement incremental updates. The first time Fivetran calls your function, state will be {}. Your function will include the new state in its response alongside the new data. The next time we call your function, we'll pass back this new state. In this example, we'll use state to keep track of since_id, an integer representing the ID of the most recent tweet we've synced.

The code for this example is pretty simple; there's a standard wrapper defined by Google Cloud functions:

/**
 * @param {!Object} req Cloud Function request context.
 * @param {!Object} res Cloud Function response context.
 */
exports.handler = (req, res) => {
    ...
}

AWS Lambda and Azure Functions are similar. Our first step is to get a token from api.twitter.com/oauth/authenticate using the secrets from req.body.secrets:

let post = https.request({
    method: 'POST',
    hostname: 'api.twitter.com',
    path: '/oauth2/token',
    headers: {
        'Authorization': 'Basic ' + Buffer.from(`${req.body.secrets.consumerKey}:${req.body.secrets.consumerSecret}`, 'utf8').toString('base64')
    }
}, postRes => {
    /* parse response, call next step */
})

The second step is to use this token and req.body.state.since_id to fetch Justin's latest tweets:

var path = '/1.1/statuses/user_timeline.json?screen_name=justinbieber'
if (req.body.state.since_id != null) {
    path += '&since_id=' + req.body.state.since_id
}
let get = https.get({
    hostname: 'api.twitter.com',
    path: path,
    headers: {
        'Authorization': 'Bearer ' + auth.access_token
    }
}, getRes => {
    /* parse response and call next step */
})

The last step is to construct a response in the format Fivetran expects. The response format looks like this:

{
    "state": {...},
    "insert": {...},
    "schema": {...}
}

insert contains the rows you want to add to your data warehouse. Fivetran will infer column names and types based on the data you send us, but if you specify a primary key in schema we'll use it to deduplicate the data. Here's how we construct the response for Justin's tweets:

let tweets = []
let users = []
for (let t of timeline) {
    tweets.push({
        id: t.id,
        user_id: t.user.id,
        text: t.text
    })
    users.push({
        id: t.user.id,
        name: t.user.name,
        screen_name: t.user.screen_name
    })
}
res.status(200).send({
    state: {
        since_id: timeline[0].id
    },
    insert: {
        tweets: tweets,
        users: users
    },
    schema : {
        tweets : {
            primary_key : ['id']
        },
        users: {
            primary_key: ['id']
        }
    }
})

An example response looks like this:

{
    "state": {
        "since_id": 1004056816274190300
    },
    "insert": {
        "tweets": [
            {
                "id": 1004056816274190300,
                "user_id": 27260086,
                "text": "#cupidmovie https://t.co/11sV1zrpYa"
            },
            ...many more tweets...
        ],
        "users": [
            {
                "id": 27260086,
                "name": "Justin Bieber",
                "screen_name": "justinbieber"
            },
            ...many duplicates of Justin...
        ]
    },
    "schema": {
        "tweets": {
            "primary_key": ["id"]
        },
        "users": {
            "primary_key": ["id"]
        }
    }
}

Fivetran will deduplicate this response and ingest it into your warehouse using a merge operation on the primary key, and you'll end up with two tables in your warehouse:

TWEETS

id user_id text
1004056816274190300 27260086 #cupidmovie https://t.co/11sV1zrpYa
... ... ...

USERS

id name screen_name
27260086 Justin Bieber justinbieber

For the complete code of this example, check out our github repository. And if you're looking to bring together your company's data into a data warehouse with minimum fuss, check out Fivetran.

previous article
next article

Centralize your data in minutes, not months.