Automate Connector Management With Python
Use Case
You want to get started writing programmatic scripts in Python that interact with the Fivetran REST API. A common use case for this is to automate processes around creating, updating, or running connectors on a custom schedule. You are looking for an example of how to get started.
Recommendation
Fivetran recommends starting off with a simple example to familiarize yourself with how to interact with our API using Python. This code example walks through the process to sync a connector manually using the API, which follows this diagram.
Code example
import requests
from requests.auth import HTTPBasicAuth
import json
# Modify A Fivetran Connector
# Documentation: https://fivetran.com/docs/rest-api/connectors#modifyaconnector
# Replace with your API key and Secret
api_key = "Your API Key"
api_secret = "Your API Secret"
# Create Base64 Encoded Basic Auth Header
auth = HTTPBasicAuth(api_key, api_secret)
headers = {
'Authorization': 'Basic ' + api_key,
'Content-Type': 'application/json'
}
##Step 1: Find the group/destination the connector is in
# Create Query Parameters For Pagination
# Documentation: https://fivetran.com/docs/rest-api/getting-started/pagination
limit = 1000
params = {"limit": limit}
url = "https://api.fivetran.com/v1/groups"
response = requests.get(url=url, auth=auth, params=params).json()
group_list = response["data"]['items']
while "next_cursor" in response["data"]:
params = {"limit": limit, "cursor": response["data"]["next_cursor"]}
response_paged = requests.get(url=url, auth=auth, params=params).json()
if any(response_paged["data"]["items"]) == True:
group_list.extend(response_paged["data"]['items'])
response = response_paged
json.dumps(group_list)
my_group_id = None
my_group = input("Enter the group or destination name that your connector is in: ")
for group in group_list:
for id, name in group.items():
if name == my_group:
my_group_id = group['id']
group_id = my_group_id
if group_id is None:
raise Exception("The group {} was not found.".format(
my_group
)
)
print("{} Group ID = ".format(my_group) + group_id)
#Step 2: Find the connector in your group
# Create Query Parameters For Pagination
# Documentation: https://fivetran.com/docs/rest-api/getting-started/pagination
limit = 1000
params = {"limit": limit}
url = "https://api.fivetran.com/v1/groups/{}/connectors".format(group_id)
response = requests.get(url=url, auth=auth, params=params).json()
conn_list = response["data"]['items']
while "next_cursor" in response["data"]:
params = {"limit": limit, "cursor": response["data"]["next_cursor"]}
response_paged = requests.get(url=url, auth=auth, params=params).json()
if any(response_paged["data"]["items"]) == True:
conn_list.extend(response_paged["data"]['items'])
response = response_paged
my_connector_id = None
my_connector = input("Enter the connector/schema name that you want to sync: ")
for connector in conn_list:
for id, schema in connector.items():
if schema == my_connector:
my_connector_id = connector['id']
if my_connector_id is None:
raise Exception("The connector {} was not found.".format(
my_connector
)
)
print("{} Connector ID = ".format(my_connector) + my_connector_id)
#Step 3: Sync the connector
url = "https://api.fivetran.com/v1/connectors/{}/force".format(my_connector_id)
response = requests.post(url=url,auth=auth).json()
print(json.dumps(response))