Fivetran Platform Connector Sample Querieslink
Updated November 21, 2023
Calculate monthly active rows (MAR) per connectorlink
The Monthly Active Rows (MAR) count sent through the Fivetran Platform Connector includes the active rows recorded up to the current date. The count resets at the beginning of each month. For example, if your MAR count for schema_A.table_A
is 5500 on January 1st but is 5800 on January 31st, your January MAR for schema_A.table_A
is 5800. Your MAR count then drops to 0 on February 1st. To learn more about MAR, see our pricing documentation.
NOTE: If you use these queries during your free trial, we recommend that you add every data source that you plan to use with Fivetran and let each connector run for 7 days under a typical load. This will give you a more accurate idea of how well Fivetran meets your business needs.
In the queries below, replace <fivetran_load_database>
with your destination. Replace <fivetran_log_schema_name>
with the schema name of your Fivetran Platform connector.
Calculate MAR grouped by schema (connector), destination, and monthlink
Redshift and Snowflake MARlink
We have tested the following query for Redshift and Snowflake destinations:
select schema_name,
destination_id,
date_trunc('month', measured_date) as measured_month,
sum(incremental_rows) as MAR
from incremental_mar
where free_type = 'PAID'
group by schema_name, destination_id, measured_month
order by measured_month, schema_name
content_copy
BigQuery MARlink
We have tested the following query for BigQuery destinations:
select schema_name,
destination_id,
date_trunc(measured_date, month) as measured_month,
sum(incremental_rows) as MAR
from anthony_fivetran_log.incremental_mar
where free_type = 'PAID'
group by schema_name, destination_id, measured_month
order by measured_month, schema_name
content_copy
Calculate MAR by tablelink
select schema_name,
destination_id,
table_name,
connector_id,
date_trunc('month', measured_date) as measured_month,
sum(incremental_rows) as incremental_rows
from incremental_mar
where free_type = 'PAID'
group by schema_name, destination_id, measured_month, table_name, connector_id
order by measured_month, schema_name,table_name;
content_copy
Check connector statuslink
Check sync start and end timeslink
select connector_id,
message_event,
time_stamp as process_start_time
from <fivetran_destination_database>.<fivetran_log_schema>.log
where message_event = 'sync_start' OR message_event = 'sync_end'
order by time_stamp desc;
content_copy
Troubleshoot errors and warningslink
select connector_id, time_stamp, event, message_data
from <fivetran_destination_database>.<fivetran_log_schema>.log
where event = 'WARNING' or event = 'SEVERE'
order by time_stamp desc;
content_copy
Check records modified since last synclink
The sample queries below return the volume of data that has been inserted, updated, or deleted since your last successful sync. They also return the timestamp of your connector's last record modification. Query results are at the connector level.
Use the sample query for your destination:
NOTE: If you want to filter your results based on data modification type (for example, view inserts only), use the
operationType
field in themessage_data
JSON object.
BigQuerylink
with parse_json as (
SELECT
time_stamp,
JSON_EXTRACT(message_data, '$.schema') AS connector_schema,
CAST(JSON_EXTRACT(message_data, '$.count') AS int64) AS row_volume,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM fivetran_log.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
select
connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN row_volume ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
content_copy
Redshiftlink
with parse_json as (
SELECT
time_stamp,
json_extract_path_text(message_data, 'schema') AS connector_schema,
json_extract_path_text(message_data, 'count')::integer AS row_volume,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM fivetran_log.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
select
connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN row_volume ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
content_copy
Snowflakelink
WITH parse_json AS (
SELECT
time_stamp,
PARSE_JSON(message_data) AS message_data,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM fivetran_log.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
select
message_data:schema as connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN message_data:count::integer ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
content_copy
Check daily modified recordslink
The sample queries below return the volume of data that has been inserted, updated, or deleted each day. Query results are at the table level.
Use the sample query for your destination:
NOTE: If you want to filter your results based on data modification type (for example, view inserts only), use the
operationType
field in themessage_data
JSON object.
BigQuery daily recordslink
SELECT
DATE_TRUNC(cast(time_stamp AS date), day) AS date_day,
JSON_EXTRACT(message_data, '$.schema') AS schema,
JSON_EXTRACT(message_data, '$.table') AS table,
SUM(cast(JSON_EXTRACT(message_data, '$.count') AS int64)) AS row_volume
FROM fivetran_log.log
WHERE DATE_DIFF(cast(CURRENT_DATE() AS date), cast(time_stamp AS date), DAY) < 30
AND message_event = 'records_modified'
GROUP BY date_day, schema, table
ORDER BY date_day DESC;
content_copy
Redshift daily recordslink
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
JSON_EXTRACT_PATH_TEXT(message_data, 'schema') AS schema,
JSON_EXTRACT_PATH_TEXT(message_data, 'table') AS "table",
SUM(JSON_EXTRACT_PATH_TEXT(message_data, 'count')::integer) AS row_volume
FROM fivetran_log.log
WHERE
DATEDIFF(day, time_stamp, current_date) < 30
AND message_event = 'records_modified'
GROUP BY date_day, schema, "table"
ORDER BY date_day DESC;
content_copy
Snowflake daily recordslink
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
PARSE_JSON(message_data) AS message_data
FROM fivetran_log.log
WHERE DATEDIFF(DAY, time_stamp, current_date) < 30
AND message_event = 'records_modified'
)
SELECT
date_day,
message_data:schema AS "schema",
message_data:table AS "table",
SUM(message_data:count::integer) AS row_volume
FROM parse_json
GROUP BY date_day, "schema", "table"
ORDER BY date_day DESC;
content_copy
Audit user actions within connectorlink
The following sample query returns various user actions that have been made within a connector for audit-trail purposes. This can be helpful when trying to trace a user action to a log event such as a schema change, sync frequency update, manual update, broken connection, and more.
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
dayname(time_stamp) as dn,
DAYOFWEEK(time_stamp) as dow,
id,
time_stamp as event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM
<fivetran_destination_database>.<fivetran_log_schema>.log
),
t AS (
SELECT
id,
event_time,
dn as weekday,
dow,
message_event,
connector_id,
message_data:actor as acting_user
FROM parse_json
)
SELECT
*
FROM t
WHERE acting_user IS NOT NULL
content_copy
Overview of event averages by daylink
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
dayname(time_stamp) as dn,
DAYOFWEEK(time_stamp) as dow,
id,
time_stamp as event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <fivetran_destination_database>.<fivetran_log_schema>.log
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connector_id,
COUNT(id) AS event_count
FROM parse_json
GROUP BY date_day, dn, dow, connector_id, message_event
),
ev AS (
SELECT
t.connector_id,
t.message_event,
t.dn AS weekday,
ROUND(AVG(t.event_count)) AS av_event_count,
ROUND(ROUND(AVG(t.event_count)) + ROUND(AVG(t.event_count)) * .2) AS high_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) - ROUND(AVG(t.event_count)) * .2) AS low_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) * .2) AS event_var_increment,
ROUND(STDDEV(t.event_count)) AS standard_deviation
FROM t
GROUP BY t.connector_id, t.message_event,t.dow, t.dn
ORDER BY t.connector_id, t.message_event,t.dow
)
SELECT * FROM ev
content_copy
Assign your own variance logic and monitor your environment at event levellink
NOTE: This requires that the
fivetran_log_event_averages
table be present.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
dayname(time_stamp) as dn,
DAYOFWEEK(time_stamp) as dow,
id,
time_stamp as event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <fivetran_destination_database>.<fivetran_log_schema>.log
),
ec as(
SELECT
count(id) as event_count,
date_day,
dn,
dow,
message_event,
connector_id
FROM parse_json
GROUP BY date_day,dn,dow,connector_id,message_event
ORDER BY connector_id, message_event asc
)
, av as (
select
ev.connector_id,
ev.weekday,
ev.message_event,
ev.av_event_count,
ev.high_event_variance_value,
ev.low_event_variance_value,
ev.event_var_increment,
ev.standard_deviation
from <fivetran_destination_database>.<fivetran_log_schema>.fivetran_log_event_averages ev
)
select
ec.date_day,
ec.dn as weekday,
ec.connector_id,
ec.message_event,
ec.event_count as total_events,
av.av_event_count,
av.high_event_variance_value,
av.low_event_variance_value,
av.standard_deviation,
av.event_var_increment,
case when ec.event_count > av.high_event_variance_value then 'Event_Variance'
when ec.event_count < av.low_event_variance_value then 'Event_Variance'
else 'Standard'
end as event_variance_flag
from ec
inner join av on av.connector_id = ec.connector_id and av.message_event = ec.message_event and av.weekday = ec.dn
order by
ec.date_day,
ec.dow,
ec.connector_id,
ec.message_event
content_copy
Review difference in seconds between write_to_table_start
and write_to_table_end
eventslink
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
time_stamp as event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <fivetran_destination_database>.<fivetran_log_schema>.log
WHERE connector_id = ''
), t as (
SELECT
id,
event_time,
message_event,
connector_id,
message_data:table AS "table",
RANK() OVER ( ORDER BY connector_id,"table",event_time ASC) as rn ,
datediff(second,lag(event_time,1) over (order by connector_id,"table",event_time asc),event_time) as seconds_diff
FROM parse_json
where message_event in ('write_to_table_start','write_to_table_end')
GROUP BY id,connector_id,event_time,message_event,"table"
ORDER BY connector_id,"table",event_time asc
)
select
t.id,
t.event_time,
t.message_event,
t.connector_id,
t."table",
case when t.message_event = 'write_to_table_start'
and t.seconds_diff > 0
then 0 else t.seconds_diff
end as diff
from t
;
content_copy
Review records_modified
count data by tablelink
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
dayname(time_stamp) as dn,
DAYOFWEEK(time_stamp) as dow,
id,
time_stamp as event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <fivetran_destination_database>.<fivetran_log_schema>.log
), t as (
SELECT
id,
event_time,
dn as weekday,
dow,
message_event,
connector_id,
message_data:operationType as optype,
message_data:table AS logtable,
message_data:count as rowsimpacted
FROM parse_json
where message_event = 'records_modified'
and logtable <> 'fivetran_audit'
GROUP BY id,connector_id,event_time,dow,weekday,message_event,logtable,optype,rowsimpacted
ORDER BY connector_id,logtable asc
)
select
connector_id,
message_event,
weekday,
optype,
logtable as avtable,
cast(round(avg(t.rowsimpacted)) as int) as avgrow,
round(round(avg(t.rowsimpacted)) + round(avg(t.rowsimpacted)) * .2) as high_variance_value,
round(round(avg(t.rowsimpacted)) - round(avg(t.rowsimpacted)) * .2) as low_variance_value,
round(round(avg(t.rowsimpacted)) * .2 ) as var_increment,
ifnull(round(stddev(t.rowsimpacted)),0) as standard_deviation
from t
group by connector_id,message_event,dow,weekday,avtable,optype
order by connector_id,avtable,dow,optype
content_copy
Assign your own variance logic and monitor your environment at table levellink
NOTE: This requires that the
fivetran_records_modified_averages
table be present.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
dayname(time_stamp) as dn,
DAYOFWEEK(time_stamp) as dow,
id,
time_stamp as event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <fivetran_destination_database>.<fivetran_log_schema>.log
), t as (
SELECT
id,
date_day,
dn,
dow,
event_time,
message_event,
connector_id,
message_data:operationType as optype,
message_data:table AS logtable,
message_data:count as rowsimpacted
FROM parse_json
where message_event = 'records_modified'
and logtable <> 'fivetran_audit'
GROUP BY id,connector_id,date_day,dn,dow,event_time,message_event,logtable,optype,rowsimpacted
ORDER BY connector_id,logtable asc
)
, av as (
select
rma.weekday,
rma.connector_id,
rma.optype,
rma.avtable,
rma.avgrow,
rma.high_variance_value,
rma.low_variance_value,
rma.var_increment,
rma.standard_deviation
from <fivetran_destination_database>.<fivetran_log_schema>.fivetran_records_modified_averages rma
)
select
t.id,
t.dn as weekday,
t.date_day,
t.event_time,
t.message_event,
t.optype,
t.connector_id,
t.logtable,
cast(t.rowsimpacted as int) as rowsimpacted,
av.avgrow,
av.high_variance_value,
av.low_variance_value,
av.var_increment,
av.standard_deviation,
case when t.rowsimpacted > av.high_variance_value then 'Variance'
when t.rowsimpacted < av.low_variance_value then 'Variance'
else 'Standard'
end as varianceflag
from t
inner join av on t.connector_id = av.connector_id and t.dn = av.weekday and t.logtable = av.avtable and t.optype = av.optype
order by t.connector_id,t.dow,t.event_time,t.logtable asc
content_copy
Check metadatalink
IMPORTANT: You must have an Enterprise plan or higher to query metadata.
The Fivetran Platform Connector provides access to metadata for data synced by Fivetran, which helps you understand the mapping between the source and destination. The data retrieved can be easily consumed in BI tools, data catalogs, or through direct SQL queries.
The data retrieved helps organizations:
- Understand data synced by Fivetran
- Audit and enforce access control
- Retrieve metadata changes
The following queries return normalized tables with information on source and destination connections, schemas, tables, and columns.
Check which data moved through Fivetranlink
The query includes source/destination mapping so it can be filtered by source.connectorId
.
SELECT * FROM fivetran_log.connector c
JOIN fivetran_log.source_schema_metadata ssm
ON c.connector_id = ssm.connector_id
JOIN fivetran_log.source_table_metadata stm
ON stm.schema_id = ssm.id
JOIN fivetran_log.source_column_metadata scm
ON scm.table_id = stm.id
JOIN fivetran_log.schema_lineage sl
ON ssm.id = sl.source_schema_id
JOIN fivetran_log.table_lineage tl
ON stm.id = tl.source_table_id
JOIN fivetran_log.column_lineage cl
ON scm.id = cl.source_column_id
JOIN fivetran_log.destination_schema_metadata dsm
ON sl.destination_schema_id = dsm.id
JOIN fivetran_log.destination_table_metadata dtm
ON tl.destination_table_id = dtm.id
JOIN fivetran_log.destination_column_metadata dcm
ON cl.destination_column_id = dcm.id
LEFT JOIN fivetran_log.source_foreign_key_metadata sfkm
ON sfkm.column_id = scm.id
content_copy
What is this data a reference tolink
SELECT * FROM fivetran_log.source_column_metadata scm
JOIN fivetran_log.column_lineage cl
ON scm.id = cl.source_column_id
WHERE cl.destination_column_id = %column_id%
content_copy
What downstream assets are impacted by this datalink
SELECT * FROM fivetran_log.destination_column_metadata dcm
JOIN fivetran_log.column_lineage cl
ON dcm.id = cl.destination_column_id
WHERE cl.source_column_id = %column_id%
content_copy