Connector Memory Management
Each Connector SDK connection runs inside a container with a memory limit. When connector code accumulates large amounts of data in Python objects (lists, DataFrames, and dicts) before delivering them to Fivetran, Python memory usage grows proportionally to the dataset size. For large sources, this can exceed the allowed memory threshold. Understanding common memory patterns and how to measure them locally helps you write connectors that stay within these limits.
Hybrid Deployment (HD) connections run on your own infrastructure and are not subject to the memory constraints of the Connector SDK.
Common causes of high memory usage
- Collecting all data before upserting — Fetching all pages, rows, or records into a
listordictbefore processing means the entire dataset lives in memory simultaneously. - Loading large files into memory — Reading a full CSV, JSON, or Parquet file into a DataFrame in one shot can consume memory proportional to the file size.
- Unbatched database queries — Using
fetchall()loads the complete result set on the client side. Large tables can exhaust available memory before a single row is delivered to Fivetran. - In-memory joins or aggregations — Performing large joins, GROUP BY operations, or deduplication logic entirely in Python, rather than pushing it down to the source, can cause temporary memory spikes proportional to the data involved.
- Caching or memoizing large responses — Storing entire API responses, decoded blobs, or lookup tables in module-level variables or caches that persist for the lifetime of the sync.
- Parallel fetching without backpressure — Spawning many threads or async tasks that each fetch and buffer data independently can multiply memory usage by the number of concurrent workers.
- Large binary or nested objects — Decoding large binary fields, such as images, documents, encoded payloads, or deeply nested JSON structures into Python objects, can use significantly more memory than the raw payload size suggests.
How to measure memory usage
There are three complementary approaches: use Option 1 to pinpoint which lines are allocating the most memory using tracemalloc, Option 2 to confirm the overall memory spike at key checkpoints using psutil, and Option 3 to generate a CPU flamegraph using py-spy to identify which functions are driving the most work. Once you have an idea of what might be causing the issue, you can instrument your connector and run it locally with fivetran debug to identify where memory is growing.
Option 1: Pinpoint allocations with tracemalloc
tracemalloc is built into Python — no installation required. It traces memory allocations and shows you exactly which lines of code are responsible.
Expand for instructions
Add the following helper function near the top of your connector file
import tracemalloc
def print_top_memory(snapshot, label: str, limit: int = 10):
stats = snapshot.statistics("lineno")
log.info(f"--- Top {limit} memory consumers [{label}] ---")
for stat in stats[:limit]:
log.info(str(stat))
Start tracing and take snapshots at key points inside your update function — before and after the operations you suspect are using the most memory
def update(configuration: dict, state: dict):
tracemalloc.start()
data = fetch_data(configuration)
print_top_memory(tracemalloc.take_snapshot(), "after fetch")
for record in data:
op.upsert("my_table", record)
print_top_memory(tracemalloc.take_snapshot(), "after upsert")
tracemalloc.stop()
op.checkpoint(state)
Run your connector locally
fivetran debug
Sample output
INFO --- Top 10 memory consumers [after upsert] ---
INFO connector.py:146: size=10.0 GiB, count=10, average=1024 MiB
INFO connector.py:145: size=576 B, count=8, average=72 B
INFO .../fivetran_connector_sdk/logger.py:35: size=1365 B, count=27, average=51 B
INFO .../python3.14/tracemalloc.py:534: size=56 B, count=1, average=56 B
INFO .../python3.14/threading.py:362: size=56 B, count=1, average=56 B
In this example, connector.py:146 is allocating 10 GiB across 10 objects at an average of 1024 MiB each — a clear sign that line 146 is the source of the memory spike. Each field in the output tells you something different:
- size — total memory held by all objects allocated at that line
- count — number of objects allocated at that line still alive in memory
- average — mean size per object (
size / count); a high average points to large individual objects, while a high count with a low average points to many small objects accumulating
Lines from the SDK and Python standard library (fivetran_connector_sdk, tracemalloc, threading) are expected and can be ignored.
Lines with a large size value at each snapshot are your biggest allocators. A line that grows significantly between two snapshots points to the operation responsible for the spike. Refer to the resolution steps outlined in the How to reduce memory usage section.
Make sure you remove the tracemalloc calls before deploying your connector to production.
If you need to isolate exactly how much memory a specific operation added between two snapshots, tracemalloc also supports comparing snapshots directly using snapshot2.compare_to(snapshot1, "lineno").
Option 2: Measure process memory with psutil
psutil gives you a coarse, process-level reading of how much memory your connector is consuming at a given point. This is useful once you have a hypothesis and want to confirm the size of a spike at a specific checkpoint.
Expand for instructions
Install the psutil library if you don't already have it
pip install psutil
Add the following helper function near the top of your connector file
import os
import psutil
def log_memory_usage(label: str):
process = psutil.Process(os.getpid())
memory_gb = process.memory_info().rss / (1024 ** 3)
log.info(f"Memory usage [{label}]: {memory_gb:.2f} GB")
Call it at key points inside your update function — before and after the operations you suspect are using the most memory
def update(configuration: dict, state: dict):
log_memory_usage("before upsert")
for record in fetch_data(configuration):
op.upsert("my_table", record)
log_memory_usage("after upsert")
op.checkpoint(state)
Run your connector locally
fivetran debug
Sample output
INFO Memory usage [before upsert]: 0.05 GB
... (debug logs) ...
INFO Memory usage [after upsert]: 6.66 GB
In this example, memory jumped from 0.05 GB to 6.66 GB during the upsert operation — a clear sign that data is being accumulated in memory before being delivered to Fivetran. Refer to the resolution steps outlined in the How to reduce memory usage section.
Make sure you remove the log_memory_usage calls before deploying your connector to production.
Option 3: Visualize execution with py-spy
py-spy is a sampling profiler that generates a CPU flamegraph of your connector's execution. While it profiles CPU time rather than memory directly, it can help identify functions that are doing the most work — such as tight processing loops or repeated parsing — which are often the same hotspots driving high memory usage.
For setup instructions and how to run py-spy locally, see Local Performance Profiling.
Once you have the flamegraph, wide boxes in the run_update section point to the functions consuming the most CPU time. Cross-reference these with the memory snapshots from Options 1 and 2 to confirm where the problem lies.
How to reduce memory usage
The most common cause is accumulating data in memory before delivering it to Fivetran. Restructure your connector to process and upsert records in small batches as you fetch them, and checkpoint after each batch.
The general pattern to follow is:
- Fetch a small chunk of data from the source.
- Process and upsert each record in the chunk immediately.
- Checkpoint to save sync progress.
- Move on to the next chunk.
Use the findings from tracemalloc and psutil to identify which operations are responsible for the memory spike, then apply the relevant fix from the following sections.
Accumulating records before upserting
Anti-pattern: Collecting all records into a list before calling op.upsert().
# BAD: entire dataset is loaded into memory at once
def update(configuration: dict, state: dict):
all_records = []
page = 1
while True:
response = fetch_page(configuration, page)
if not response["items"]:
break
all_records.extend(response["items"]) # memory keeps growing
page += 1
for record in all_records:
op.upsert("my_table", record)
op.checkpoint(state)
Fix: Upsert and checkpoint as you go.
# GOOD: process and upsert each page immediately
def update(configuration: dict, state: dict):
page = 1
while True:
response = fetch_page(configuration, page)
if not response["items"]:
break
for record in response["items"]:
op.upsert("my_table", record)
state["last_page"] = page
op.checkpoint(state) # checkpoint after each page
page += 1
Loading large files entirely into memory
Anti-pattern: Reading a full CSV or JSON file into a pandas DataFrame before processing.
# BAD: entire file is read into memory
import pandas as pd
def update(configuration: dict, state: dict):
df = pd.read_csv("large_file.csv") # can consume several GBs for large files
for _, row in df.iterrows():
op.upsert("my_table", row.to_dict())
op.checkpoint(state)
Fix: Use batched reading with Polars or Dask to process the file in chunks.
# GOOD: read and process the CSV in batches using Polars
import polars as pl
def update(configuration: dict, state: dict):
csv_reader = pl.read_csv_batched("large_file.csv", low_memory=True)
while True:
batches = csv_reader.next_batches(1)
if not batches:
break
for row in batches[0].iter_rows(named=True):
op.upsert("my_table", row)
op.checkpoint(state)
For Dask and pandas/PyArrow alternatives, see the high-volume CSV example.
Fetching all database rows without batching
Anti-pattern: Using fetchall() on a database cursor, which loads the entire result set into memory.
# BAD: fetches all rows at once
def update(configuration: dict, state: dict):
cursor = connection.cursor()
cursor.execute("SELECT * FROM large_table")
rows = cursor.fetchall() # entire table in memory
for row in rows:
op.upsert("my_table", dict(zip([d[0] for d in cursor.description], row)))
op.checkpoint(state)
Fix: Use a server-side (named) cursor with fetchmany() to stream rows in batches.
# GOOD: stream rows in batches using a named server-side cursor
BATCH_SIZE = 5000
def update(configuration: dict, state: dict):
with connection.cursor("ft_server_side_cursor") as cursor:
cursor.execute("SELECT * FROM large_table")
while True:
rows = cursor.fetchmany(BATCH_SIZE)
if not rows:
break
col_names = [desc[0] for desc in cursor.description]
for row in rows:
op.upsert("my_table", dict(zip(col_names, row)))
op.checkpoint(state)
For a complete implementation, see the server-side cursors example.
For additional recommendations, see our best practices for handling large datasets.