Understanding State Management
The state object is a crucial mechanism for maintaining sync progress and enabling incremental data synchronization. It lets your connector save cursors, timestamps, and other progress indicators so that a sync can resume from where the previous one left off, rather than starting from the beginning.
How state works
Fivetran sends and receives state as a JSON string. When decoded in Python, the top-level value must be a JSON object (a dictionary). In your connector.py code, you:
- Use the deserialized state provided in the
update()methods to retrieve previous sync progress. - Serialize cursor values into the state object with
checkpoint()operations to store them. A cursor is the column or object you use to track sync progress. - Use cursor values from state to determine what data has already been processed.
State structure and examples
The state object should contain meaningful progress indicators. Here are common patterns:
Simple timestamp cursor
{
"last_sync_timestamp": "2024-01-15T10:30:00Z"
}
Multiple table cursors
{
"users_cursor": "2024-01-15T10:30:00Z",
"orders_cursor": "2024-01-15T09:45:00Z",
"products_cursor": "2024-01-15T11:15:00Z"
}
Complex state with pagination
{
"company_cursor": "2024-08-14T02:01:00Z",
"department_cursor": {
"1": "2024-08-14T03:00:00Z",
"2": "2024-08-14T06:00:00Z"
},
"page_offset": "eyJwYWdlIjoxMH0="
}
Implementing state in your connector
Basic state usage
def update(configuration: dict, state: dict):
# Retrieve cursor from state, with fallback for first sync
cursor = state.get('last_sync_timestamp', '1970-01-01T00:00:00Z')
# Process data from cursor onwards
for record in fetch_data_since(cursor):
op.upsert(table="my_table", data=record)
# Save progress with checkpoint
op.checkpoint(state={
"last_sync_timestamp": get_current_timestamp()
})
Multi-table state management
def update(configuration: dict, state: dict):
# Initialize state for multiple tables
cursors = {
# Use an older date as the default, or set to your API's earliest supported date
'users': state.get('users_cursor', '2000-01-01T00:00:00Z'),
'orders': state.get('orders_cursor', '2000-01-01T00:00:00Z')
}
# Process each table
for table_name, cursor in cursors.items():
for record in fetch_table_data_since(table_name, cursor):
op.upsert(table=table_name, data=record)
# Update cursor for this table
cursors[table_name] = get_current_timestamp()
# Checkpoint all cursors
op.checkpoint(state=cursors)
Local development with state
When running a connector locally with fivetran debug, a state.json file is created in <project_directory>/files/state.json. This file:
- Stores the current state during and between debug runs
- Can be manually edited to test different starting points
- Can be manually created to start debugging from a specific state
- Is automatically updated after each checkpoint operation
Use the fivetran reset command to delete both state.json and warehouse.db. If you delete or clear state.json, the next fivetran debug run receives an empty state, {}. Your connector then behaves as if it's running for the first time and re-fetches all data from the source. This simulates an initial sync, also known as a historical sync.
Example local state file
{
"last_sync_timestamp": "2024-01-15T10:30:00Z",
"processed_records": 1250
}
Production state management
For connectors deployed to Fivetran, you can manage state directly using the Fivetran REST API:
- Retrieve Connection State - Get the current state.
- Update Connection State - Modify state for any reason, for example, reset a single table's cursor rather than clearing all state.
Certain actions in the Fivetran dashboard also affect state. When you select Resync all historical data on the Setup tab of your connection, we clear the stored state. The next sync receives an empty state, {}, so your connection re-fetches all data from the source as if it is running for the first time.
State is not visible directly in the Fivetran dashboard, but you can log it at specific points during the sync, such as after a checkpoint, and it appears in the sync logs.
API example: partial re-sync
# Retrieve current state
curl -X GET "https://api.fivetran.com/v1/connections/{connection_id}/state" \
-H "Authorization: Basic {api_key}"
# Update state to re-sync specific data
curl -X PATCH "https://api.fivetran.com/v1/connections/{connection_id}/state" \
-H "Authorization: Basic {api_key}" \
-H "Content-Type: application/json" \
-d '{
"state": {
"users_cursor": "2024-01-01T00:00:00Z"
}
}'
Security considerations
Never store sensitive information in state such as:
- API tokens or passwords
- Encryption keys
- Personally identifiable information (PII)
State is not encrypted and may appear in logs during troubleshooting. Use the configuration JSON file for sensitive parameters.
State size limits
State object size limit: 10 MB
If you exceed this limit, your connection will fail with an error. Monitor state size and optimize if necessary.
Debugging state issues
- Reset local state for debugging by executing the following in your project root folder:
fivetran reset
- Debug with specific state, to test out incremental sync:
Manually create files/state.json first. The fivetran debug command automatically reads this file.
If you create state.json in another location, specify its path with the --state flag.
fivetran debug
After the command finishes, your state gets updated based on the checkpoints in your code.
Common state problems
- State not persisting - Check for
checkpoint()calls. Long syncs should periodically callcheckpoint()to persist state. - Incorrect resume point - Verify state structure
- State corruption - Use API to reset state
- Large state objects - Optimize state structure