Fivetran users who signed up on or after July 22, 2025, no longer have access to Function connections. Older Fivetran users can continue to create and use Function connections as before. All our users can use Connector SDK to build custom connectors. Connector SDK provides a streamlined development experience, and we'll host your custom connector for you.
Code samples to help you build your Lambda Function. The pieces of code show what an AWS Lambda function implementation looks like in different programming languages.
We have included sample functions for you to use to accelerate your function development:
A sample function request and the generated response to help you understand Fivetran's request and response format and how to use the different nodes to write your cloud function.
// Main.java
package com.fivetran.function;
import com.amazonaws.SdkClientException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Main implements RequestHandler {
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());
public Response handleRequest(final Request request, final Context context) {
return this.apiResponse(request);
}
Response apiResponse(Request request) {
Record record1 = new Record(Instant.parse("2017-12-31T05:12:05Z"), 1001L, "$1200", "$12");
Record record2 = new Record(Instant.parse("2017-12-31T06:12:04Z"), 1001L, "$1200", "$12");
Record record3 = new Record(Instant.parse("2017-12-31T05:12:05Z"), 1000L, "$1200", "$12");
Record record4 = new Record(Instant.parse("2017-12-31T06:12:04Z"), 1000L, "$1200", "$12");
HashMap newState = new HashMap<>();
newState.put("transactionCursor", Instant.parse("2018-01-01T00:00:00Z"));
request.state = newState;
//insert records
List records = new ArrayList<>();
records.add(record1);
records.add(record2);
Map insert = new HashMap<>();
insert.put("transactions", records);
//delete records
records = new ArrayList<>();
records.add(record3);
records.add(record4);
Map delete = new HashMap<>();
delete.put("transactions", records);
// schema
List primaryKey = new ArrayList<>();
primaryKey.add("order_id");
primaryKey.add("date");
Map> transactionsSchema = new HashMap<>();
transactionsSchema.put("primary_key", primaryKey);
Map schema = new HashMap<>();
schema.put("transactions", transactionsSchema);
this.pushToS3(request.bucket, request.file, this.jsonString(insert, delete));
return new Response(request.state, schema, true);
}
private String jsonString(final Map insert, final Map delete) {
try {
return Main.MAPPER.writeValueAsString(Map.of("insert", insert, "delete", delete));
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed while generating json", e);
}
}
private void pushToS3(final String bucket, final String objKey, final String json) {
try {
//we need to specify the bucket region here if bucket is not global & `lambda` in not the same region as s3 bucket
AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(Regions.AP_SOUTH_1).build();
s3Client.putObject(bucket, objKey, json);
} catch (SdkClientException e) {
throw new RuntimeException(String.format("Failed while pushing %s object to %s bucket", objKey, bucket), e);
}
}
}
// Record.java
package com.fivetran.function;
import java.time.Instant;
public class Record {
public Instant date;
public Long order_id;
public String amount;
public String discount;
public Record(Instant date, Long order_id, String amount, String discount) {
this.date = date;
this.order_id = order_id;
this.amount = amount;
this.discount = discount;
}
}
// Request.java
package com.fivetran.function;
public class Request {
public Object state;
public Object secrets;
public String bucket;
public String file;
}
// Response.java
package com.fivetran.function;
import java.util.Map;
public class Response {
public Object state;
public Map schema;
public boolean hasMore;
public Response(
Object state,
Map schema,
boolean hasMore) {
this.state = state;
this.schema = schema;
this.hasMore = hasMore;
}
}
// pom.xml
4.0.0com.fivetran.functionsamplejar1.0samplehttp://maven.apache.org1111org.apache.maven.pluginsmaven-shade-plugin3.2.2falsepackageshadeorg.apache.maven.pluginsmaven-compiler-plugin99com.amazonawsaws-lambda-java-core1.2.1com.amazonawsaws-lambda-java-events3.11.0com.amazonawsaws-lambda-java-log4j21.5.1com.fasterxml.jackson.corejackson-databind2.13.3com.fasterxml.jackson.datatypejackson-datatype-jsr3102.13.3com.amazonawsaws-java-sdk-s31.12.264
Node.js
exports.handler = (request, context, callback) => {
callback(null, update(request.state, request.secrets,request.bucket,request.file));
};
function update(state, secrets,bucket,key) {
// Fetch records using api calls
let [insertTransactions, deleteTransactions, newTransactionsCursor] = apiResponse(state, secrets);
// Populate insert and delete in records
var records=JSON.stringify({
insert: {
transactions: insertTransactions
},
delete: {
transactions: deleteTransactions
}
})
// Store records in s3 bucket
putObjectToS3(bucket,key,records)
// Return response
return ({
state: {
transactionsCursor: newTransactionsCursor
},
schema : {
transactions : {
primary_key : ['order_id', 'date']
}
},
hasMore : false
});
}
function apiResponse(state, secrets) {
var insertTransactions = [
{"date":'2017-12-31T05:12:05Z', "order_id":1001, "amount":'$1200', "discount":'$12'},
{"date":'2017-12-31T06:12:04Z', "order_id":1001, "amount":'$1200', "discount":'$12'},
];
var deleteTransactions = [
{"date":'2017-12-31T05:12:05Z', "order_id":1000, "amount":'$1200', "discount":'$12'},
{"date":'2017-12-31T06:12:04Z', "order_id":1000, "amount":'$1200', "discount":'$12'},
];
return [insertTransactions, deleteTransactions, '2018-01-01T00:00:00Z'];
}
// Function to store data in s3 bucket
var AWS = require('aws-sdk');
function putObjectToS3(bucket, key, data){
var s3 = new AWS.S3();
var params = {
Bucket : bucket,
Key : key,
Body : data
}
s3.putObject(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log(data); // successful response
});
}
Python
import json
import boto3
def lambda_handler(request, context):
# Fetch records using api calls
(insertTransactions, deleteTransactions, newTransactionCursor) = api_response(request['state'], request['secrets'])
# Populate records in insert
insert = {}
insert['transactions'] = insertTransactions
delete = {}
delete['transactions'] = deleteTransactions
state = {}
state['transactionsCursor'] = newTransactionCursor
transactionsSchema = {}
transactionsSchema['primary_key'] = ['order_id', 'date']
schema = {}
schema['transactions'] = transactionsSchema
response = {}
# Add updated state to response
response['state'] = state
# Add all the records to be inserted in response
response['schema'] = schema
# Add hasMore flag
response['hasMore'] = False
response_s3={}
# Add all the records to be inserted in response_s3
response_s3['insert'] = insert
# Add all the records to be marked as deleted in response_s3
response_s3['delete'] = delete
json_str = bytes(json.dumps(response_s3).encode("utf-8"))
# Push data in s3
push_data_s3(request, json_str)
return response
def api_response(state, secrets):
# your api call goes here
insertTransactions = [
{"date":'2017-12-31T05:12:05Z', "order_id":1001, "amount":'$1200', "discount":'$12'},
{"date":'2017-12-31T06:12:04Z', "order_id":1001, "amount":'$1200', "discount":'$12'},
]
deleteTransactions = [
{"date":'2017-12-31T05:12:05Z', "order_id":1000, "amount":'$1200', "discount":'$12'},
{"date":'2017-12-31T06:12:04Z', "order_id":1000, "amount":'$1200', "discount":'$12'},
]
newTransactionCursor='2018-01-01T00:00:00Z'
return (insertTransactions, deleteTransactions, newTransactionCursor)
#Function to store data in s3
def push_data_s3(request, json_str):
client = boto3.client('s3')
client.put_object(Body=json_str, Bucket=request["bucket"], Key=request["file"])
From the main.py file, create a .zip file and then upload it to the Lambda console. For more information, see Lambda deployment packages. For more information about the Marketstack API, see our README.