Functions
A Function connector allows you to code a custom data connector as an extension of Fivetran. For example, if you have a custom data source or a private API that we don't support, you can develop a serverless ELT data pipeline using our Function connectors.
Building a custom data pipeline from scratch is complicated. It’s even harder to maintain. When you use a Function connector with your custom function, you only have to write the cloud function to extract the data from your source. We will load and transform the data in your destination. You can use our Function connectors and serverless platforms to build robust data pipelines faster than traditional monolithic orchestration platforms.
A developer can use our templates to create data pipelines that deploy and manage themselves. Cloud functions allow Fivetran to work with code to support custom data sources. Fivetran executes the code in your account on the cloud platform and syncs the returned data to your destination.
TIP: Depending on your needs and requirements, you may prefer to use the Fivetran Connector SDK to build your custom connectors instead of using Functions. See how Connector SDK and Functions differ before you decide.
Our Function connectors have the same intrinsic capabilities as standard Fivetran connectors:
- Incremental updates
- Source data type inference
- Automatic schema updates
- Data de-deduplication
- Destination ingestion optimizations
- Logs and alerts to monitor events and troubleshoot issues
- Soft delete
Use Fivetran's Function connectors if:
- Fivetran doesn't have a connector for your source
- You are using private APIs or custom applications
- You are using a source or API that Fivetran is unlikely to support in the near future
- You want to sync unsupported file formats that require pre-processing
- You have sensitive data that needs filtering or anonymizing before entering the destination
Additional benefits of using a cloud function with Fivetran:
- Your function runs within your Virtual Private Cloud
- Unified logging and alerting system
- Multi-language support
- Built-in setup tests
IMPORTANT: Function connectors count towards MAR. You will also have to pay for the function to be run by the infrastructure provider.
Supported cloud function services
Fivetran integrates with the following cloud function services:
Architecture
The following diagram provides a high-level overview of how Fivetran executes a cloud function:
Sync overview
Authorize: Authorize Fivetran to the cloud service using the IAM role authorization mechanism.
Initial sync: Fivetran begins the initial sync of data by supplying an empty state (
{}
). Your function responds with an initial set of data and a set of cursors.Parse: Fivetran parses the data returned by your function. The data must be in the standard response format.
Process: Fivetran creates a table in the destination for each entity. Fivetran typecasts and maps the elements in the entity to a column in the table. Fivetran also transforms data types that are not natively supported in the destination into data types that are supported by the destination.
Load: Fivetran automatically creates schemas (one per connector). Each mapped source object can translate to more than one normalized table. Fivetran creates the tables based on what you specify in the function’s response. Fivetran populates these tables with the initial dump of data.
Update: Fivetran periodically calls your function to pull the next set of data from it. Fivetran creates new tables if you add them in your function response.
Soft delete: Fivetran marks all the records of specified destination tables as deleted by setting the
_fivetran_deleted
column totrue
. We recommend that you use this option only during full re-syncs of specified tables and not during incremental syncs.
Request format
Fivetran's request has a standard format. It is a JSON object with the following fields:
agent
is an informational object.state
is a JSON object that contains cursors from the previous successful function execution. It is key to performing incremental updates. A cursor is a bookmark that marks the data Fivetran has already synced (for example, a timestamp, ID, or index). For the initial sync,state
is an empty JSON object{}
. Fivetran expects an updatedstate
object in every response.IMPORTANT: The
state
object can't be NULL. If you perform a full re-sync, thestate
resets to{}
. If there is an error during a full re-sync, Fivetran doesn't save intermediate data. To ensure data integrity, you must trigger the full re-sync again.secrets
(optional) is a JSON object that contains access keys or API keys for the upstream APIs. Secrets allow you to store information (API tokens or database passwords) that you don’t want to check in to your code. We use encryption at rest to store the secrets. We pass the secrets into your function every time we call the function. Enter the secrets using a JSON format in the connector setup form.customPayload
(optional) is a JSON object as a set of key-value pairs that can be used to specify custom information. We pass the custom payloads into your function every time we call the function.setup_test
(optional) is a boolean that lets the function know that Fivetran has invoked the function for the setup tests. The function runs a lightweight job to test the connectivity and returns a JSON object with thehasMore
field set tofalse
.NOTE: We don't add this field to the request during syncs.
sync_id
(optional) is the Fivetran sync identifier (UUID). You can find thesync_id
in your connector's dashboard logs and use it to debug and link function logs with connector logs.IMPORTANT: When the
setup_test
field is set totrue
, we add thesetup-test
value to thesync_id
field in the request. We call the function once with thesync_id
, and if we get an error, we then call the function without thesync_id
.
Example request
{
"agent" : "<function_connector_name>/<external_id>/<schema>",
"state": {
"cursor": "2018-01-01T00:00:00Z"
},
"secrets": {
"apiToken": "abcdefghijklmnopqrstuvwxyz_0123456789"
},
"sync_id": "468b681-c376-4117-bbc0-25d8ae02ace1"
}
In this example,
function_connector_name
is the connector name. It can be one of the following:- Fivetran AWS Lambda Connector
- Fivetran Azure Functions Connector
- Fivetran Google Cloud Functions Connector
external_id
is the unique ID tied to your connector. You can find the ID in your connector setup form.schema
is the destination schema name you enter when you first set up your connector.
Response format
The response is a JSON object with following root fields:
state
contains the updated state value(s). Your response must always return an updatedstate
to checkpoint the data fetched in the request.insert
specifies the entities and records to be inserted. Fivetran reads the data and infers the data type and the number of columns.delete
(optional) specifies the entities and records to be deleted. Use this field to mark records as deleted. Fivetran doesn't delete the record; instead it marks the record as deleted by setting_fivetran_deleted
column value totrue
. If you specify the delete field, you must also specify the schema field.Fivetran creates the
_fivetran_deleted
column in the destination table, only if your function response has thedelete
field.schema
(optional) specifies primary key columns for each entity. You must be very consistent with theschema
field and the primary key columns to avoid any unwanted behavior. If you don’t specify the schema, Fivetran appends the data.hasMore
is a boolean indicator for Fivetran to make a follow-up call to fetch the next set of data. Fivetran will keep making calls whilehasMore = true
until it receiveshasMore = false
. In either case, your response should return an updated state to checkpoint the data fetched in that request and avoid an infinite loop.softDelete
(optional) specifies the list of entities to be soft deleted. Fivetran marks the records of these entities as deleted by setting the value of the_fivetran_deleted
column totrue
. We recommend that you use this field if you do not want to specify the individual records to be deleted in thedelete
field. If table is specified in bothdelete
andsoftDelete
, delete section will be of no effect. ThesoftDelete
parameter must be included in the function response payload even if your data is being ingested through S3 Files.NOTE: If your connector continuously runs for six hours, we will reschedule the sync and save the intermediate progress.
Example response
{
"state": {
"transaction": "2018-01-02T00:00:00Z",
"campaign": "2018-01-02T00:00:01Z"
},
"insert": {
"transaction": [
{"id":1, "amount": 100},
{"id":2, "amount": 50}
],
"campaign": [
{"id":101, "name": "Christmas"},
{"id":102, "name": "New Year"}
]
},
"delete": {
"transaction": [
{"id":3},
{"id":4}
],
"campaign": [
{"id":103},
{"id":104}
]
},
"schema" : {
"transaction": {
"primary_key": ["id"]
},
"campaign": {
"primary_key": ["id"]
}
},
"hasMore" : true,
"softDelete":["transaction"]
}
In this example,
state
contains thetransaction
andcampaign
cursorstransaction
andcampaign
are entities. Fivetran creates two tables:TRANSACTION
with columnsid
andamount
CAMPAIGN
with columnsid
andname
The function inserts:
- records
1
and2
into theTRANSACTION
table - records
101
and102
into theCAMPAIGN
table
- records
The function marks:
- records
3
and4
as deleted from theTRANSACTION
table - records
103
and104
as deleted from theCAMPAIGN
table
- records
hasMore
is set totrue
to indicate that there more records. Fivetran immediately makes a following call to fetch the next set of values.softDelete
marks all the records of theTRANSACTION
table as deleted by setting the value of its_fivetran_deleted
column totrue
Idempotence
Fivetran expects the cloud function you write will be idempotent: repeated calls with the same state
return (roughly) the same data. An idempotent function doesn't track state internally. If an update fails at any point, Fivetran will repeat the function execution with the previous state
. Fivetran will only save the new state
returned by the function after the destination is successfully updated.
Custom error handling
Cloud functions may fail due to various reasons, including code execution errors, runtime issues, or internal errors. You can add an error handling mechanism in your function response to report an error on your Fivetran dashboard.
Design your function to report an error:
Use the
errorMessage
field in your response to indicate function execution errors. Fivetran creates an Error in the connector dashboard with your custom error message. For example, for the following response, Fivetran creates aThis is an error
error in your dashboard:{ "errorMessage": "This is an error" }
Use the
errorType
andstackTrace
fields to pass additional information about the error. You must specify the value oferrorMessage
field to use theerrorType
andstackTrace
fields. For example, for the following response, Fivetran creates an Error alert on your dashboard with the error type and stack trace details:{ "errorMessage": "name 'response' is not defined", "errorType": "NameError", "stackTrace": [ [ "/var/task/sample_function.py", 35, "function_handler", "response['errorMessage'] = \"This is an error\"" ] ] }
Schema information
Fivetran creates a schema for the Function connector. You can customize your schema name when you first set up your connector. All tables for this connector are created in this schema. The destination schema name becomes your connector’s name and must be unique.
NOTE: You can only use lowercase letters (a-z), numbers (0-9), and underscores (_) to name your destination schema. Do not begin names with a number.
Additionally, Fivetran also creates _fivetran_batch
and _fivetran_index
columns:
_fivetran_batch
is incremented every time Fivetran fetches data from the function. Its initial value is 0._fivetran_index
is the index at which a record is inserted for each batch.
Fivetran makes the _fivetran_batch
and _fivetran_index
columns as primary key columns if:
- your function doesn't specify the
schema
or - your function specifies a
schema
but doesn't specify theprimary_key
in theschema
NOTE: If your function doesn't specify a
primary_key
, the data will be inserted as new rows to your destination and will count towards your Monthly Active Rows (MAR). Otherwise, Fivetran will use the specifiedprimary_key
to perform a merge and update the data in your destination.
NOTE: Each function connector loads data into a unique schema to ensure one connector never accidentally overwrites data from another. You can use one function connector to load data into multiple tables within one schema.
Schema definition and data types
Your cloud function defines the destination schema, and Fivetran is not responsible for maintaining the schema. You must ensure that your cloud function consistently delivers data to the destination schema in the same format.
Fivetran supports a standard list of data types for all our destinations. We select an appropriate data type for the data stored in that column before writing to your destination. For more information about data types and type inference, see our Data types documentation.
Naming conventions
We follow our standard schema naming conventions.
Syncing empty tables and columns
Function connectors don't support the creation of empty tables and columns in your destination.
Connector state management Beta
You can manage the state of your function connector by using our API endpoints:
Sample functions
See the Fivetran Cloud Functions GitHub repo for sample functions you can use to build your own cloud functions. In the GitHub repo, we have included sample functions to fetch data from the following data sources:
Transport API - Download zip file
You can download our sample Transport - API (London Tube) function. Use your cloud platform's zip deployment feature to deploy the pre-built function and retrieve London Tube data to your Fivetran destination.
Select your language to download the sample function (.zip file):
Custom development service
If you need assistance with developing a customized function connector, our Services team may be able to help. Contact your Account Representative for more information.