Amazon Kinesis Firehose Setup Guide link
Updated 8 days ago
Follow our setup guide to connect Amazon Kinesis Firehose to Fivetran.
Prerequisiteslink
To connect Amazon Kinesis Firehose to Fivetran, you need:
- An Amazon Kinesis Firehose delivery stream outputting to an S3 bucket
- An S3 bucket containing JSON files with supported encodings
- An AWS account with the ability to grant Fivetran permission to read from that S3 bucket
Setup instructionslink
Find External IDlink
In the connector setup form, find the automatically-generated External ID and make a note of it. You will need it to configure AWS to connect with Fivetran.
NOTE: The automatically-generated External ID is tied to your account. If you close and re-open the setup form, the ID will remain the same. You may wish to keep the tab open in the background while you configure your source for convenience, but closing it is also OK.
Create IAM Policy and Rolelink
In AWS, follow steps 1 and 2 of our S3 setup guide to create a Fivetran-specific IAM policy and role.
If your Kinesis stream output is JSON, skip ahead to Step 5 to finish your setup. If your Kinesis stream output is plain text or comma-separated values, proceed to Step 3.
Create AWS Lambda functionlink
If your Kinesis stream output is plain text or comma-separated values, you must convert it into JSON before writing it to your S3 bucket. Do this by sending your Kinesis stream to an AWS Lambda, which will transform the data into JSON and stream it into your S3 bucket.
To create an AWS Lambda function, log in to your Amazon Lambda console.
Go to Functions, then click Create a Lambda function.
Select Author from scratch.
Enter a name for your function. Set the runtime to Python 3.9.
Set the Code entry type to Edit code inline.
Modify the following example Lambda function as needed and paste it in the box. This function matches the records in the incoming stream to a regular expression. On match, it parses the JSON record, filters out "sector" pairs, includes a timestamp, and passes the record back into the stream for delivery.
from __future__ import print_function import base64 import json import re import datetime parser = '{\"ticker_symbol\"\:\"[A-Z]+\"\,\"sector\"\:"[A-Z]+\"\,\"change\"\:[-.0-9]+\,\"price\"\:[-.0-9]+\}' def lambda_handler(event, context): output = [] for record in event['records']: data = base64.b64decode(record['data']) matchJson = re.match(parser, data, re.I) if (matchJson): serialized_data = json.loads(data) # Filters out 'sector' and its value from the record serialized_data.pop('sector') # Adds a timestamp to the record serialized_data['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") deserialized_data = json.dumps(serialized_data) # Logs result print(deserialized_data) output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(deserialized_data) } output.append(output_record) else: # Failed event. Log the error and pass record on intact output_record = { 'recordID': record['recordId'], 'result': 'ProcessingFailed', 'data': record['data'] } print ('Data does not match pattern: ' + parser) output.append(output_record) return {'records': output}
content_copySet the Handler to "lambda_function.lambda_handler".
Click Create a custom role.
Give your IAM role a name, use the default policy document, and select Allow.
Under Advanced Settings, change the Timeout to 5 minutes.
Click Next.
Configure Kinesis delivery streamlink
Next, configure your Kinesis Firehose delivery stream to work with your Lambda function.
Log in to your Amazon Kinesis Firehose console.
Click Create Delivery Stream.
Give your stream a name.
Click Next.
Set the Record transformation to Enabled. Select either the Lambda function you created in Step 3 or your own custom function.
Click Next.
Choose Amazon S3 as your destination.
Enter your S3 bucket and specify a prefix if you'd like.
Click Next.
Set the S3 buffer conditions and Compression and encryption as desired.
Under IAM role, click Create new, or Choose.
In the IAM role drop-down menu, select Create a new IAM Role. Enter a name for your IAM role. Use the default policy document.
Click Allow.
When returned to the Firehose console, click Next.
Review your configuration details and select Create delivery stream.
If you selected the Lambda provided above when setting up your delivery stream, send test data by clicking Start sending demo data on your Firehose dashboard.
The demo data will be transformed by the Lambda function, appearing in your designated S3 bucket in the following form:
{"timestamp": "2017-07-20 21:55:05", "price": 27.75, "ticker_symbol": "ABC", "change":--0.02}
If you would like to learn more about Lambda functions for Firehose, see AWS' tutorial. For additional help setting up Firehose delivery streams, see AWS' delivery stream documentation.
Finish Fivetran configurationlink
Return to the tab with your Fivetran dashboard.
Enter your chosen destination schema name.
Enter your chosen destination table name.
Enter your S3 bucket.
Enter the Role ARN you found in Step 2.
(Optional) Enter the folder path. The folder path is used to specify a portion of the bucket where you'd like Fivetran to look for files. Any files under the specified folder and all of its nested sub-folders will be examined for files we can upload. If no prefix is supplied, we'll look through the entire bucket for files to sync.
(Optional) Select the JSON Delivery Mode option to choose how Fivetran handles your JSON data. Options are:
- Packed: In this mode, we load all your JSON data to the
_data
column without flattening it. - Unpacked: In this mode, we flatten one level of columns and infer their data types.
- Packed: In this mode, we load all your JSON data to the
Related articleslink
description Connector Overview
settings API Connector Configuration