Fivetran Log Connector Sample Querieslink
Calculate monthly active rows (MAR) per connectorlink
The Monthly Active Rows (MAR) count sent through the Fivetran Log Connector includes the active rows recorded up to the current date. The count resets at the beginning of each month. For example, if your MAR count for schema_A.table_A
is 5500 on January 1st but is 5800 on January 31st, your January MAR for schema_A.table_A
is 5800. Your MAR count then drops to 0 on February 1st. To learn more about MAR, see our consumption-based pricing documentation.
Note: If you use these queries during your free trial, we recommend that you add every data source that you plan to use with Fivetran and let each connector run for 7 days under a typical load. This will give you a more accurate idea of how well Fivetran meets your business needs.
In the queries below, replace <fivetran_load_database>
and <fivetran_log_schema_name>
with the values from your destination. Replace <fivetran_log_schema_name>
with the name of your connector in the Fivetran dashboard.
Calculate MAR by schema (connector)link
select
av.schema_name,
MAX(date(av.measured_at)) as last_measurement_date,
SUM(av.monthly_active_rows) as monthly_active_rows
from <fivetran_load_database>.<fivetran_log_schema_name>.active_volume av
Inner Join (
select
max(av.measured_at) max_measured_at,
av.schema_name,
av.table_name,
av.destination_id,
av.connector_id
from <fivetran_load_database>.<fivetran_log_schema_name>.active_volume av
group by 2,3,4,5) mav
On date(mav.max_measured_at) = date(av.measured_at)
and mav.schema_name = av.schema_name
and mav.table_name = av.table_name
and mav.connector_id = av.connector_id
and mav.destination_id = av.destination_id
and av.schema_name != <fivetran_log_schema_name>
group by av.schema_name;
Calculate MAR by tablelink
select
av.schema_name,
av.table_name,
av.destination_id,
av.connector_id,
date(av.measured_at) as last_measurement_date,
date(date_trunc(month,av.measured_at)) as utc_month,
av.monthly_active_rows
from <fivetran_destination_database>.<fivetran_log_schema>.active_volume av
Inner Join (
select
max(av.measured_at) max_measured_at,
av.schema_name,
av.table_name,
av.destination_id,
av.connector_id
from <fivetran_destination_database>.<fivetran_log_schema>.active_volume av
group by 2,3,4,5) mav
On date(mav.max_measured_at) = date(av.measured_at)
and mav.schema_name = av.schema_name
and mav.table_name = av.table_name
and mav.connector_id = av.connector_id
and mav.destination_id = av.destination_id;
Check connector statuslink
Check sync start and end timeslink
select connector_id,
message_event,
time_stamp as process_start_time
from <fivetran_destination_database>.<fivetran_log_schema>.log
where message_event = 'sync_start' OR message_event = 'sync_end'
order by time_stamp desc;
Troubleshoot errors and warningslink
select connector_id, time_stamp, event, message_data
from <fivetran_destination_database>.<fivetran_log_schema>.log
where event = 'WARNING' or event = 'SEVERE'
order by time_stamp desc;
Check records modified since last synclink
The sample queries below return the volume of data that has been inserted, updated, or deleted since your last successful sync. They also return the timestamp of your connector's last record modification. Query results are at the connector level.
Use the sample query for your destination:
Note: If you want to filter your results based on data modification type (for example, view inserts only), use the
operationType
field in themessage_data
JSON object.
BigQuerylink
with parse_json as (
SELECT
time_stamp,
JSON_EXTRACT(message_data, '$.schema') AS connector_schema,
CAST(JSON_EXTRACT(message_data, '$.count') AS int64) AS row_volume,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM fivetran_log.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
select
connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN row_volume ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
Redshiftlink
with parse_json as (
SELECT
time_stamp,
json_extract_path_text(message_data, 'schema') AS connector_schema,
json_extract_path_text(message_data, 'count')::integer AS row_volume,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM fivetran_log.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
select
connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN row_volume ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
Snowflakelink
WITH parse_json AS (
SELECT
time_stamp,
PARSE_JSON(message_data) AS message_data,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM fivetran_log.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
select
message_data:schema as connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN message_data:count::integer ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
Check daily modified recordslink
The sample queries below return the volume of data that has been inserted, updated, or deleted each day. Query results are at the table level.
Use the sample query for your destination:
Note: If you want to filter your results based on data modification type (for example, view inserts only), use the
operationType
field in themessage_data
JSON object.
BigQuerylink
SELECT
DATE_TRUNC(cast(time_stamp AS date), day) AS date_day,
JSON_EXTRACT(message_data, '$.schema') AS schema,
JSON_EXTRACT(message_data, '$.table') AS table,
SUM(cast(JSON_EXTRACT(message_data, '$.count') AS int64)) AS row_volume
FROM fivetran_log.log
WHERE DATE_DIFF(cast(CURRENT_DATE() AS date), cast(time_stamp AS date), DAY) < 30
AND message_event = 'records_modified'
GROUP BY date_day, schema, table
ORDER BY date_day DESC;
Redshiftlink
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
JSON_EXTRACT_PATH_TEXT(message_data, 'schema') AS schema,
JSON_EXTRACT_PATH_TEXT(message_data, 'table') AS "table",
SUM(JSON_EXTRACT_PATH_TEXT(message_data, 'count')::integer) AS row_volume
FROM fivetran_log.log
WHERE
DATEDIFF(day, time_stamp, current_date) < 30
AND message_event = 'records_modified'
GROUP BY date_day, schema, "table"
ORDER BY date_day DESC;
Snowflakelink
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
PARSE_JSON(message_data) AS message_data
FROM fivetran_log.log
WHERE DATEDIFF(DAY, time_stamp, current_date) < 30
AND message_event = 'records_modified'
)
SELECT
date_day,
message_data:schema AS "schema",
message_data:table AS "table",
SUM(message_data:count::integer) AS row_volume
FROM parse_json
GROUP BY date_day, "schema", "table"
ORDER BY date_day DESC;