Fivetran Platform Connector Sample Queries
NOTE: The sample queries below use the destination name specified as
<Fivetran_destination_database>
and the schema name of a Fivetran Platform Connector (FPC) specified as<FPC_schema_name>
.In your queries, replace
<Fivetran_destination_database>
with your destination name and<FPC_schema_name>
- with the schema name of your Fivetran Platform connection.For an automatically-created Fivetran Platform connection (when you create a destination), the default schema name is
fivetran_metadata
. For a manually-created Fivetran Platform connection, the default schema name isfivetran_log
.
Calculate monthly active rows (MAR) per connector
The Monthly Active Rows (MAR) count sent through the Fivetran Platform Connector includes the active rows recorded up to the current date. The count resets at the beginning of each month. For example, if your MAR count for schema_A.table_A
is 5500 on January 1st but is 5800 on January 31st, your January MAR for schema_A.table_A
is 5800. Your MAR count then drops to 0 on February 1st. To learn more about MAR, see our pricing documentation.
NOTE: If you use these queries during your free trial, we recommend that you add every data source that you plan to use with Fivetran and let each connection run for 7 days under a typical load. This will give you a more accurate idea of how well Fivetran meets your business needs.
Calculate MAR grouped by schema (connection), destination, and month
Redshift and Snowflake MAR
We have tested the following query for Redshift and Snowflake destinations:
SELECT schema_name,
destination_id,
date_trunc('month', measured_date) AS measured_month,
SUM(incremental_rows) AS MAR
FROM <Fivetran_destination_database>.<FPC_schema_name>.incremental_mar
WHERE free_type = 'PAID'
GROUP BY schema_name, destination_id, measured_month
ORDER BY measured_month, schema_name
This query depends on the INCREMENTAL_MAR table.
BigQuery MAR
We have tested the following query for BigQuery destinations:
SELECT schema_name,
destination_id,
date_trunc(measured_date, month) AS measured_month,
SUM(incremental_rows) AS MAR
FROM <Fivetran_destination_database>.<FPC_schema_name>.incremental_mar
WHERE free_type = 'PAID'
GROUP BY schema_name, destination_id, measured_month
ORDER BY measured_month, schema_name
This query depends on the INCREMENTAL_MAR table.
Calculate MAR by table
SELECT schema_name,
destination_id,
table_name,
connector_id,
date_trunc('month', measured_date) AS measured_month,
SUM(incremental_rows) AS incremental_rows
FROM <Fivetran_destination_database>.<FPC_schema_name>.incremental_mar
WHERE free_type = 'PAID'
GROUP BY schema_name, destination_id, measured_month, table_name, connector_id
ORDER BY measured_month, schema_name, table_name;
This query depends on the INCREMENTAL_MAR table.
Calculate monthly transformation model runs
The Fivetran Platform Connector logs the number of times transformation models have run each month. This count includes all successful model runs up to the current date and resets to zero at the beginning of each month. For example, if your count for the Unified RAG
job is 5500 on January 21 and 5800 on January 31, then the total number of successful model runs for January is 5800. On February 1st, the count resets to 0.
Calculate model runs grouped by month, destination, and job name
Redshift and Snowflake
Run the following query with a Redshift or Snowflake destination:
SELECT date_trunc('month', measured_date) AS measured_month,
destination_id,
job_name,
SUM(model_runs) AS model_runs
FROM <Fivetran_destination_database>.<FPC_schema_name>.transformation_runs
WHERE free_type = 'PAID'
GROUP BY measured_month, destination_id, job_name
ORDER BY measured_month, destination_id, job_name;
This query depends on the TRANSFORMATION_RUNS
table.
BigQuery
Run the following query with a BigQuery destination:
SELECT date_trunc(measured_date, month) AS measured_month,
destination_id,
job_name,
SUM(model_runs) AS model_runs
FROM <Fivetran_destination_database>.<FPC_schema_name>.transformation_runs
WHERE free_type = 'PAID'
GROUP BY measured_month, destination_id, job_name
ORDER BY measured_month, destination_id, job_name;
This query depends on the TRANSFORMATION_RUNS
table.
Calculate model runs grouped by destination, project type, and month
SELECT destination_id,
project_type,
date_trunc(measured_date, month) AS measured_month,
SUM(model_runs) AS model_runs
FROM <Fivetran_destination_database>.<FPC_schema_name>.transformation_runs
WHERE free_type = 'PAID'
GROUP BY destination_id, project_type, measured_month
ORDER BY destination_id, project_type, measured_month;
This query depends on the TRANSFORMATION_RUNS
table.
Check connection status
Check sync start and end times
SELECT connector_id,
message_event,
time_stamp AS process_start_time
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE message_event = 'sync_start' OR message_event = 'sync_end'
ORDER BY time_stamp DESC;
This query depends on the LOG table.
Troubleshoot errors and warnings
SELECT connector_id, time_stamp, event, message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE event = 'WARNING' OR event = 'SEVERE'
ORDER BY time_stamp DESC;
This query depends on the LOG table.
Check records modified since last sync
The sample queries below return the volume of data that has been inserted, updated, or deleted since your last successful sync. They also return the timestamp of your connection's last record modification. Query results are at the connection level.
Use the sample query for your destination:
NOTE: If you want to filter your results based on data modification type (for example, view inserts only), use the
operationType
field in themessage_data
JSON object.
BigQuery
WITH parse_json AS (
SELECT
time_stamp,
JSON_EXTRACT(message_data, '$.schema') AS connector_schema,
CAST(JSON_EXTRACT(message_data, '$.count') AS int64) AS row_volume,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
SELECT
connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN row_volume ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
This query depends on the LOG table.
Redshift
WITH parse_json AS (
SELECT
time_stamp,
json_extract_path_text(message_data, 'schema') AS connector_schema,
json_extract_path_text(message_data, 'count')::integer AS row_volume,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
SELECT
connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN row_volume ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
This query depends on the LOG table.
Snowflake
WITH parse_json AS (
SELECT
time_stamp,
PARSE_JSON(message_data) AS message_data,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connector_id) AS last_sync_completed_at
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
SELECT
message_data:schema AS connector_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN message_data:count::integer ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connector_schema
ORDER BY row_volume_since_last_sync DESC
;
This query depends on the LOG table.
Check daily modified records
The sample queries below return the volume of data that has been inserted, updated, or deleted each day. Query results are at the table-level.
Use the sample query for your destination:
NOTE: If you want to filter your results based on data modification type (for example, view inserts only), use the
operationType
field in themessage_data
JSON object.
BigQuery daily records
SELECT
DATE_TRUNC(CAST(time_stamp AS date), day) AS date_day,
JSON_EXTRACT(message_data, '$.schema') AS schema,
JSON_EXTRACT(message_data, '$.table') AS table,
SUM(CAST(JSON_EXTRACT(message_data, '$.count') AS int64)) AS row_volume
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE DATE_DIFF(CAST(CURRENT_DATE() AS date), CAST(time_stamp AS date), DAY) < 30
AND message_event = 'records_modified'
GROUP BY date_day, schema, table
ORDER BY date_day DESC;
This query depends on the LOG table.
Redshift daily records
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
JSON_EXTRACT_PATH_TEXT(message_data, 'schema') AS schema,
JSON_EXTRACT_PATH_TEXT(message_data, 'table') AS "table",
SUM(JSON_EXTRACT_PATH_TEXT(message_data, 'count')::integer) AS row_volume
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE
DATEDIFF(day, time_stamp, current_date) < 30
AND message_event = 'records_modified'
GROUP BY date_day, schema, "table"
ORDER BY date_day DESC;
This query depends on the LOG table.
Snowflake daily records
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE DATEDIFF(DAY, time_stamp, current_date) < 30
AND message_event = 'records_modified'
)
SELECT
date_day,
message_data:schema AS "schema",
message_data:table AS "table",
SUM(message_data:count::integer) AS row_volume
FROM parse_json
GROUP BY date_day, "schema", "table"
ORDER BY date_day DESC;
This query depends on the LOG table.
Audit user actions within connection
The following sample query returns various user actions that have been made within a connector for audit-trail purposes. This can be helpful when trying to trace a user action to a log event such as a schema change, sync frequency update, manual update, broken connection, and more.
with parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM
<Fivetran_destination_database>.<FPC_schema_name>.log
),
t AS (
SELECT
id,
event_time,
dn AS weekday,
dow,
message_event,
connector_id,
message_data:actor AS acting_user
FROM parse_json
)
SELECT
*
FROM t
WHERE acting_user IS NOT NULL
This query depends on the LOG table.
Overview of event averages by day
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connector_id,
COUNT(id) AS event_count
FROM parse_json
GROUP BY date_day, dn, dow, connector_id, message_event
),
ev AS (
SELECT
t.connector_id,
t.message_event,
t.dn AS weekday,
ROUND(AVG(t.event_count)) AS av_event_count,
ROUND(ROUND(AVG(t.event_count)) + ROUND(AVG(t.event_count)) * .2) AS high_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) - ROUND(AVG(t.event_count)) * .2) AS low_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) * .2) AS event_var_increment,
ROUND(STDDEV(t.event_count)) AS standard_deviation
FROM t
GROUP BY t.connector_id, t.message_event,t.dow, t.dn
ORDER BY t.connector_id, t.message_event,t.dow
)
SELECT * FROM ev
This query depends on the LOG table.
Assign your own variance logic and monitor your environment at event level
NOTE: This requires that the
fivetran_log_event_averages
table be present.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
),
ec AS (
SELECT
COUNT(id) AS event_count,
date_day,
dn,
dow,
message_event,
connector_id
FROM parse_json
GROUP BY date_day,dn,dow,connector_id,message_event
ORDER BY connector_id, message_event ASC
)
, av AS (
SELECT
ev.connector_id,
ev.weekday,
ev.message_event,
ev.av_event_count,
ev.high_event_variance_value,
ev.low_event_variance_value,
ev.event_var_increment,
ev.standard_deviation
FROM <Fivetran_destination_database>.<FPC_schema_name>.fivetran_log_event_averages ev
)
SELECT
ec.date_day,
ec.dn AS weekday,
ec.connector_id,
ec.message_event,
ec.event_count AS total_events,
av.av_event_count,
av.high_event_variance_value,
av.low_event_variance_value,
av.standard_deviation,
av.event_var_increment,
CASE WHEN ec.event_count > av.high_event_variance_value THEN 'Event_Variance'
WHEN ec.event_count < av.low_event_variance_value THEN 'Event_Variance'
else 'Standard'
END AS event_variance_flag
FROM ec
INNER JOIN av ON av.connector_id = ec.connector_id AND av.message_event = ec.message_event AND av.weekday = ec.dn
ORDER BY
ec.date_day,
ec.dow,
ec.connector_id,
ec.message_event
This query depends on the LOG table.
Review difference in seconds between write_to_table_start
and write_to_table_end
events
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE connector_id = ''
), t AS (
SELECT
id,
event_time,
message_event,
connector_id,
message_data:table AS "table",
RANK() OVER ( ORDER BY connector_id,"table",event_time ASC) AS rn ,
DATEDIFF(second,lag(event_time,1) OVER (ORDER BY connector_id,"table",event_time ASC),event_time) AS seconds_diff
FROM parse_json
WHERE message_event IN ('write_to_table_start','write_to_table_end')
GROUP BY id,connector_id,event_time,message_event,"table"
ORDER BY connector_id,"table",event_time ASC
)
SELECT
t.id,
t.event_time,
t.message_event,
t.connector_id,
t."table",
CASE WHEN t.message_event = 'write_to_table_start'
AND t.seconds_diff > 0
THEN 0 ELSE t.seconds_diff
END AS diff
FROM t
;
This query depends on the LOG table.
Review modified record count data by table
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
), t AS (
SELECT
id,
event_time,
dn AS weekday,
dow,
message_event,
connector_id,
message_data:operationType AS optype,
message_data:table AS logtable,
message_data:count AS rowsimpacted
FROM parse_json
WHERE message_event = 'records_modified'
AND logtable <> 'fivetran_audit'
GROUP BY id,connector_id,event_time,dow,weekday,message_event,logtable,optype,rowsimpacted
ORDER BY connector_id,logtable ASC
)
SELECT
connector_id,
message_event,
weekday,
optype,
logtable AS avtable,
CAST(ROUND(AVG(t.rowsimpacted)) AS int) AS avgrow,
ROUND(ROUND(AVG(t.rowsimpacted)) + ROUND(AVG(t.rowsimpacted)) * .2) AS high_variance_value,
ROUND(ROUND(AVG(t.rowsimpacted)) - ROUND(AVG(t.rowsimpacted)) * .2) AS low_variance_value,
ROUND(ROUND(AVG(t.rowsimpacted)) * .2 ) AS var_increment,
IFNULL(ROUND(stddev(t.rowsimpacted)),0) AS standard_deviation
FROM t
GROUP BY connector_id,message_event,dow,weekday,avtable,optype
ORDER BY connector_id,avtable,dow,optype
This query depends on the LOG table.
Assign your own variance logic and monitor your environment at table-level
NOTE: This requires that the
fivetran_records_modified_averages
table be present.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
), t AS (
SELECT
id,
date_day,
dn,
dow,
event_time,
message_event,
connector_id,
message_data:operationType AS optype,
message_data:table AS logtable,
message_data:count AS rowsimpacted
FROM parse_json
WHERE message_event = 'records_modified'
AND logtable <> 'fivetran_audit'
GROUP BY id,connector_id,date_day,dn,dow,event_time,message_event,logtable,optype,rowsimpacted
ORDER BY connector_id,logtable ASC
)
, av AS (
SELECT
rma.weekday,
rma.connector_id,
rma.optype,
rma.avtable,
rma.avgrow,
rma.high_variance_value,
rma.low_variance_value,
rma.var_increment,
rma.standard_deviation
FROM <Fivetran_destination_database>.<FPC_schema_name>.fivetran_records_modified_averages rma
)
SELECT
t.id,
t.dn AS weekday,
t.date_day,
t.event_time,
t.message_event,
t.optype,
t.connector_id,
t.logtable,
CAST(t.rowsimpacted AS int) AS rowsimpacted,
av.avgrow,
av.high_variance_value,
av.low_variance_value,
av.var_increment,
av.standard_deviation,
CASE WHEN t.rowsimpacted > av.high_variance_value THEN 'Variance'
WHEN t.rowsimpacted < av.low_variance_value THEN 'Variance'
ELSE 'Standard'
END AS varianceflag
FROM t
INNER JOIN av ON t.connector_id = av.connector_id AND t.dn = av.weekday AND t.logtable = av.avtable AND t.optype = av.optype
ORDER BY t.connector_id,t.dow,t.event_time,t.logtable ASC
This query depends on the LOG table.
dbt transformation data
The following sample query returns dbt transformation data for a given event.
WITH a AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
sync_id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE message_event IN ('dbt_run_start','dbt_run_succeeded','dbt_run_failed')
)
Select
message_data:dbtJobId::string AS dbtJobId,
message_data:dbtJobName::string AS dbtJobName,
message_data:dbtJobType::string AS dbtJobType,
message_data:startTime::timestamp AS startTime,
message_data:endTime::timestamp AS endTime,
message_data:result:stepResults[0]:success::boolean AS success,
message_data:models AS models,
message_data:result:stepResults AS stepResults,
message_data:startupDetails AS startupDetails,
message_data:result:stepResults[0]:knownFailedModels AS knownFailedModels,
message_data:result:stepResults[0]:knownSuccessfulModels AS knownSuccessfulModels,
message_data
FROM a
This query depends on the LOG table.
Transformation data
The following sample query returns transformation data for a given event.
WITH a AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
sync_id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE message_event IN ('transformation_start','transformation_succeeded','transformation_failed')
)
Select
message_data:id::string AS jobId,
message_data:name::string AS jobName,
message_data:transformationType::string AS transformationType,
message_data:startTime::timestamp AS startTime,
message_data:endTime::timestamp AS endTime,
message_data:schedule AS schedule,
message_data:result:stepResults AS stepResults,
message_data:description AS resultsSummary,
message_data
FROM a
This query depends on the LOG table.
Sync events
The following sample query returns all events FROM a given sync.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
sync_id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE sync_id = ''
), t AS (
SELECT
id,
sync_id,
event_time,
message_event,
connector_id,
message_data,
message_data:table AS "table",
message_data:query AS query,
RANK() OVER ( ORDER BY sync_id, connector_id,event_time ASC) AS rn ,
DATEDIFF(second,lag(event_time,1) over (ORDER BY sync_id, connector_id,event_time ASC),event_time) AS seconds_diff
FROM parse_json
GROUP BY id,sync_id,connector_id,event_time,message_event,message_data,"table"
ORDER BY connector_id,"table",event_time ASC
)
SELECT
t.id,
t.sync_id,
t.event_time,
t.message_event,
t.message_data,
t.connector_id,
t.query,
t."table",
CASE WHEN t.message_event = 'write_to_table_start'
and t.seconds_diff > 0
THEN 0 else t.seconds_diff
END AS diff,
t.rn
FROM t
ORDER BY t.sync_id, t.event_time ASC
FROM a
This query depends on the LOG table.
Sync statistics
The following sample query returns sync statistics for PostgreSQL, Oracle, MySQL, and SQL Server connections.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connector_id,
PARSE_JSON(message_data) AS message_data
FROM <Fivetran_destination_database>.<FPC_schema_name>.log
WHERE message_event = 'sync_stats'
),t AS (
SELECT
id,
event_time,
dn AS weekday,
dow,
message_event,
connector_id,
message_data:extract_time_s AS extract_time_s,
message_data:extract_volume_mb AS extract_volume_mb,
message_data:load_time_s AS load_time_s,
message_data:load_volume_mb AS load_volume_mb,
message_data:process_time_s AS process_time_s,
message_data:process_volume_mb AS process_volume_mb,
message_data:total_time_s AS total_time_s
FROM parse_json
)
SELECT * FROM t ORDER BY extract_time_s DESC
This query depends on the LOG table.
API extract_summary data
The following sample query returns extract summary log data.
WITH es as(
SELECT
c.connector_name,
l.time_stamp,
PARSE_JSON(message_data) as md
FROM Fivetran_destination_database. FPC_schema_name.log l
INNER JOIN FPC_schema_name.connector c on l.connector_id = c.connector_id
WHERE message_event = 'extract_summary'
ORDER BY l._fivetran_synced DESC
)
SELECT
connector_name,
time_stamp,
md:status,
md:total_queries,
md:total_rows,
md:total_size,
md:rounded_total_size,
md:objects
FROM es
This query depends on the LOG table.
API extract_summary object data
The following sample query returns extract summary log data for API objects.
WITH es as(
SELECT
c.connector_name,
l.time_stamp,
PARSE_JSON(message_data) as md
FROM Fivetran_destination_database. FPC_schema_name.log l
INNER JOIN FPC_schema_name.connector c on l.connector_id = c.connector_id
WHERE message_event = 'extract_summary'
ORDER BY l._fivetran_synced DESC
)
,eso
AS(
SELECT
connector_name,
time_stamp,
md:status,
md:total_queries,
md:total_rows,
md:total_size,
md:rounded_total_size,
PARSE_JSON(md:objects) as o
FROM es
)
SELECT
eso.connector_name,
value:name AS name,
value:queries AS queries
FROM eso,
LATERAL FLATTEN(input => PARSE_JSON(o))
This query depends on the LOG table.
Check metadata
IMPORTANT: You must have an Enterprise plan or higher to query metadata.
The Fivetran Platform Connector provides access to metadata for data synced by Fivetran, which helps you understand the mapping between the source and destination. The data retrieved can be easily consumed in BI tools, data catalogs, or through direct SQL queries.
The data retrieved helps organizations:
- Understand data synced by Fivetran
- Audit and enforce access control
- Retrieve metadata changes
The following queries return normalized tables with information on source and destination connections, schemas, tables, and columns.
Check which data moved through Fivetran
The query includes source/destination mapping so that it can be filtered by source.connectorId
.
SELECT * FROM <Fivetran_destination_database>.<FPC_schema_name>.connector c
JOIN <Fivetran_destination_database>.<FPC_schema_name>.source_schema_metadata ssm
ON c.connector_id = ssm.connector_id
JOIN <Fivetran_destination_database>.<FPC_schema_name>.source_table_metadata stm
ON stm.schema_id = ssm.id
JOIN <Fivetran_destination_database>.<FPC_schema_name>.source_column_metadata scm
ON scm.table_id = stm.id
JOIN <Fivetran_destination_database>.<FPC_schema_name>.schema_lineage sl
ON ssm.id = sl.source_schema_id
JOIN <Fivetran_destination_database>.<FPC_schema_name>.table_lineage tl
ON stm.id = tl.source_table_id
JOIN <Fivetran_destination_database>.<FPC_schema_name>.column_lineage cl
ON scm.id = cl.source_column_id
JOIN <Fivetran_destination_database>.<FPC_schema_name>.destination_schema_metadata dsm
ON sl.destination_schema_id = dsm.id
JOIN <Fivetran_destination_database>.<FPC_schema_name>.destination_table_metadata dtm
ON tl.destination_table_id = dtm.id
JOIN <Fivetran_destination_database>.<FPC_schema_name>.destination_column_metadata dcm
ON cl.destination_column_id = dcm.id
LEFT JOIN <Fivetran_destination_database>.<FPC_schema_name>.source_foreign_key_metadata sfkm
ON sfkm.column_id = scm.id
This query depends on the following tables:
- SOURCE_SCHEMA_METADATA table
- SOURCE_TABLE_METADATA table
- SOURCE_COLUMN_METADATA table
- SCHEMA_LINEAGE table
- TABLE_LINEAGE table
- COLUMN_LINEAGE table
- DESTINATION_SCHEMA_METADATA table
- DESTINATION_TABLE_METADATA table
- DESTINATION_COLUMN_METADATA table
- SOURCE_FOREIGN_KEY_METADATA table
What is this data a reference to
SELECT * FROM <Fivetran_destination_database>.<FPC_schema_name>.source_column_metadata scm
JOIN <Fivetran_destination_database>.<FPC_schema_name>.column_lineage cl
ON scm.id = cl.source_column_id
WHERE cl.destination_column_id = %column_id%
This query depends on the following tables:
What downstream assets are impacted by this data
SELECT * FROM <Fivetran_destination_database>.<FPC_schema_name>.destination_column_metadata dcm
JOIN <Fivetran_destination_database>.<FPC_schema_name>.column_lineage cl
ON dcm.id = cl.destination_column_id
WHERE cl.source_column_id = %column_id%
This query depends on the following tables:
Check severe messages of SDK connections
SELECT message, event_time, connector_id, sync_id
FROM <Fivetran_destination_database>.<FPC_schema_name>.connector_sdk_log
WHERE level = 'SEVERE' and message_origin = 'connector_sdk'
ORDER BY event_time DESC;
This query depends on the CONNECTOR_SDK_LOG table.