Technical Reference
This section documents methods and operations used by Connector SDK as well as the Connector
object that needs to be declared in your connector.
Need to get your connector up and running quickly?
Our team of Professional Services experts is available to help you get your first Connector SDK connector delivering for your business, free of charge. File a support ticket to get started.
Save time nowTechnical details - required imports
Before you start implementing your connector, ensure you include the following imports at the top of your connector.py
file:
from fivetran_connector_sdk import Connector # For supporting Connector operations like Update() and Schema()
from fivetran_connector_sdk import Operations as op # For supporting Data operations like Upsert(), Update(), Delete() and checkpoint()
from fivetran_connector_sdk import Logging as log # For enabling Logs in your connector code
Technical details - methods
Our Connector SDK supports the following methods.
Update()
This is a required method.
update(configuration: dict, state: dict)
must contain the yield
statement with operations to send your data to Fivetran.
Update()
is called when the sync starts. Fivetran passes two dictionaries to the method:
- The configuration dictionary contains any secrets or payloads you configure when deploying the connector.
- The state dictionary is empty for the first sync or for any full re-sync. In all other cases, it contains whatever state you have chosen to checkpoint during the prior sync. In some of our more complex examples, e.g.,
weather
, you can see how this is used to track state for your data connector and achieve incremental syncs efficiently.
Schema()
This is an optional method.
schema(configuration: dict)
must return a JSON object containing the following keys:
- The
table
key is required and provides the name of the table. - The
primary_key
is optional but recommended. The value is a list of one or more primary keys. The content of the list is used as the table's primary key; a single entry means a simple primary key while multiple entries are combined to create a composite primary key for the table. We recommend that you provide primary keys for your tables. If you don't, we will use all columns for generating a unique hash to be used as a primary key. - The
column
key is optional, it contains a dictionary of column names and data types.
The Schema()
method lets you configure the schema your connector delivers. We infer the schema for data you send us if you do not define it. However, if you want to set a primary key for a table or configure columns to have specific data types, then use this method.
If you don't provide the primary key to use in a table, Fivetran creates a surrogate primary key column named _fivetran_id
which is a hashed value generated based on the row's set of values. See our system columns documentation for more details.
NOTE: If a new row is received with a different set of columns, we calculate the hash from the new row's values, including values from any new columns. This can lead to duplicate rows or data integrity issues in the destination. In this case, you may have to drop and re-sync the connector to preserve data integrity. Thus, we recommend customers to define primary keys for their tables to avoid unexpected behavior.
IMPORTANT: If you need to change primary key selections for a table, drop the table in your destination and then select Resync all historical data on the connector's Setup tab in your dashboard. Doing so maintains data integrity across all records.
Example of data duplication
Assume Fivetran receives the following row for a table not defined in the schema or defined without a primary key:
_id foo name _fivetran_id 1 abc
John Doe
96DE69AE1728658394E4EAE664431F1A4E7857E4
The generated hashed value would be from the values of the three columns.
Consider we receive the same row with an additional column:
_id foo name bar _fivetran_id 1 abc
John Doe
96DE69AE1728658394E4EAE664431F1A4E7857E4
1 abc
John Doe
xyz
2AC47E18D9FCBC35B6DB94EA4FE4227A3A67A7F8
The generated hashed value would differ from the first row as the hashed value is calculated from the values of all the columns. This would cause the same row to be duplicated in the destination.
The Schema()
method must return a JSON dictionary containing a list of dictionary objects. Each object represents one table.
Syncing empty tables and columns
Fivetran creates tables and columns in your destination for any column declared in the schema()
method, even if there is no data sent for that column.
For more information, see our Features documentation.
Supported data types
The following data types are supported in the Fivetran Connector SDK:
- BOOLEAN
- SHORT
- INT
- LONG
- DECIMAL
- FLOAT
- DOUBLE
- NAIVE_DATE
- NAIVE_DATETIME
- UTC_DATETIME
- BINARY
- XML
- STRING
- JSON
If unspecified, Fivetran infers the data type automatically based on the data values.
NOTE: We cannot implicitly infer
list
objects as JSON. You must explicitly declare them as JSON in theSchema()
method.
Technical details - required object connector
Our Connector SDK requires the following object to be declared in your code.
Your connector.py
file must include an initialization of the Connector object as follows:
If you implement both the
Update()
andSchema()
methods:connector = Connector(update=update, schema=schema)
If you implement only the
Update()
method:connector = Connector(update=update)
Technical details - operations
Our Connector SDK offers the following operations to deliver data to Fivetran:
Upsert()
upsert (table=”three”, data=data)
Writes data to the target table, using the defined primary keys of the table to either create a new row or update an existing row. Columns present in your table and not present in the data passed in the method will be populated with NULL
.
Update()
update (table=”three”, data=data)
Writes data to the table using the primary keys to identify which row to update. This operation does not write data with new primary keys to your destination. Columns present in your table and not present in the data passed in the method will be left unchanged.
Delete()
delete (table=”three”, keys=data)
Sets the fivetran-deleted
column value to TRUE
for rows with the provided primary keys in the target table.
Checkpoint()
checkpoint (state=new_state)
Updates state: dict
with new_state
and tells Fivetran that the data sent up until this point can be safely written to your destination. This is used to enable incremental syncs as well as safely break large syncs ensuring data is delivered to the destination periodically. Fivetran does not save any values in state automatically; only the contents of new_state
are applied as they are passed.
NOTE: All but the most simple connectors will send
checkpoint()
periodically, so you don't reprocess data frequently.
Re-sync connector
You can run a full connector re-sync in your Fivetran dashboard.
If you want to re-sync just the affected table(s), use the REST API to modify the connector's state. Make sure you build your SDK connector in such a way that you can modify the state to re-sync particular table(s).
Technical details - logging
We recommend using logging in your connector code, as it can help in debugging and observability. Your connector.py
file must include logging as follows:
from fivetran_connector_sdk import Logging as log
Logging levels
We support logs at the following three levels in production:
- INFO - for all informational logs such as status, start, pause, exit, etc.
- WARNING - for less severe error conditions that could degrade the flow in the future if not addressed.
- SEVERE - for error conditions and failures that cause significant issues to current flows and execution.
We additionally support one more level for debugging your code locally:
- FINE - for detailed low-level logs needed while testing and building your code.
Logging syntax - examples
Each logging method accepts only one argument. Refer to the following examples for your understanding:
log.fine("Debugging the data transformation process.")
log.info("Connector started successfully.")
log.info("Initial state:" + repr(state))
log.warning("Data source response time is slower than expected.")
log.severe("Failed to connect to the data source.")
log.severe("Failed to connect to the data source.", exception=ConnectionError("Unable to reach the data source server."))
You can check our weather example
, which uses info and fine-level logging in the update()
method for reference.
NOTE: Ensure you are not adding excessive logs by accident. For example, avoid placing a log after each record, as it can increase the log volume and cause logs to be discarded by our system due to logging rate limits.
Logs in CONNECTOR_SDK_LOG
The Fivetran Platform Connector syncs your Connector SDK connection log events into your destination. The logs are available in the CONNECTOR_SDK_LOG
table.