Supported Operations
Fivetran Connector SDK supports the following data delivery operations for use within the update() method:
Each call to upsert(), update(), or delete() sends exactly one row through the SDK’s gRPC stream. Your connector is responsible for iterating over the records returned by the source system, deciding which operation makes sense for each record, and building the dictionary of column values that should be written. The SDK formats the dictionary and forwards it to Fivetran.
Destination naming and normalization when using upsert() and update()
The values you provide for the table and column keys are renamed based on our renaming rules so that the corresponding names in the destination may differ from how they are set in your code.
If you want the table and column names in the destination to exactly match the names you set in your code, we recommend adhering to the renaming rules ensuring the names align with the pattern and character set of transformed names. This means that names of the tables and columns in your source may not exactly match the corresponding names in the destination.
upsert()
Writes a single new or updated record to the target table by primary key.
Signature
op.upsert(table="name", data=data)
Parameters
| Name | Type | Description |
|---|---|---|
table | str | Name of target table. |
data | dict | Dictionary contains key-value pairs of target table column names and values, respectively. Value data type can be any of supported data types. |
Example
data = {"id": 1, "name": "John Doe"}
op.upsert(table="users", data=data)
Notes
- Columns present in target table but not present in data passed in operation become
NULL. - Column passed in row yet not present in target table writes column value for passed row and all existing rows in target table get NULL value for that column.
- Data types must match schema or be inferable by SDK.
update()
Updates existing row by primary key without creating new record.
Signature
op.update(table="name", modified=data)
Parameters
| Name | Type | Description |
|---|---|---|
table | str | Name of target table. |
modified | dict | Dictionary contains key-value pairs of target table column names and values, respectively. Value data type can be any of supported data types. |
Example
modified = {"id": 1, "status": "inactive"}
op.update(table="users", modified=modified)
Notes
- All columns of composite primary keys must be included in
modifieddictionary for correct row update. - This operation does not write data with new primary keys.
- Columns present in target table but not present in data passed in this method are left unchanged.
delete()
Marks rows as deleted by setting _fivetran_deleted = TRUE in target table.
Signature
op.delete(table="name", keys=data)
Parameters
| Name | Type | Description |
|---|---|---|
table | str | Name of target table. |
keys | dict | Dictionary contains key-value pairs of target table column names and values, respectively. All columns of composite primary keys must be included in dictionary. Value data type can be any of supported data types. |
Example
op.delete(table="users", keys={"id": 1})
checkpoint()
Saves current sync state by updating state: dict with new_state and signaling that data sent up until this point can be safely written to destination.
Signature
op.checkpoint(state=new_state)
Example
new_state = {"table1_cursor": "2024-08-14T02:01:00Z"}
op.checkpoint(state=new_state)
Notes
- You must design the structure of your connector's state to suit your connector's needs and use it to prevent data reprocessing in failed/long syncs.
- To achieve incremental syncs, you must identify a column to use as a cursor for each table and store the latest value of the cursor for each table in your connector's state to keep track of what data has been sent to the destination. At the start of a sync, the connector needs to read the cursor for each table from the state and use it to fetch only data the connector hasn't previously synced.
- State values are not stored automatically; only contents of
new_stateare applied as they are passed. - State must be passed as JSON string representing single JSON object (i.e., decoded result must be dictionary — not array, string, or number).
- All but simplest connectors need to use
checkpoint()so that connection does not reprocess data, especially when long sync fails due to any underlying reason. See our recommendations on checkpointing for large data sets. - Full connection re-sync can be run in Fivetran dashboard. To re-sync only affected table(s), Modify the Connections State endpoint should be used. Connectors must allow modifying state to re-sync particular table(s).