How to Avoid Getting Duplicate Data When Querying Iceberg Tables?
Issue
I get duplicate data when querying Iceberg tables in my data lake.
Environment
Destination: S3 Data Lake
Resolution
Before querying Iceberg tables in your data lake, ensure that your catalog is properly configured. The catalog (AWS Glue or Fivetran Managed) tracks the specific files required to generate query results. Using a catalog is essential when working with Iceberg tables.
Querying raw Parquet files instead of the catalog may result in duplicate data, depending on your snapshot retention period.
To ensure accurate results, always query through the catalog. The example below demonstrates the correct approach. Your queries must not reference the underlying Parquet files.
Examples in Python
Iceberg + AWS Glue + Spark. You must also install the AWS CLI.
import pyspark from pyspark.sql import SparkSession import os conf = ( pyspark.SparkConf() .setAppName('spark_app_glue') .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.iceberg:iceberg-aws-bundle:1.7.1') .set('spark.sql.defaultCatalog', 'glue_catalog') .set('spark.sql.catalog.glue_catalog.warehouse', 's3://path_to_datalake') .set('spark.sql.catalog.glue_catalog', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.glue_catalog.type','glue') .set('spark.sql.catalog.glue_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') .set('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running") spark.sql("select * from fivetran_log.connector").show() spark.stop()
Iceberg + Fivetran Managed + Spark
import pyspark from pyspark.sql import SparkSession import os conf = ( pyspark.SparkConf() .setAppName('spark_app_fivetran') .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1') .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') .set('spark.sql.defaultCatalog', 'fmc') .set('spark.sql.catalog.fmc', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.fmc.warehouse','group_id') .set('spark.sql.catalog.fmc.region','your-aws-region') .set('spark.sql.catalog.fmc.type', 'rest') .set('spark.sql.catalog.fmc.header.X-Iceberg-Access-Delegation','vended-credentials') .set('spark.sql.catalog.fmc.uri', 'https://polaris.fivetran.com/api/catalog/') .set('spark.sql.catalog.fmc.oauth2-server-uri', 'https://polaris.fivetran.com/api/catalog/v1/oauth/tokens') .set('spark.sql.catalog.fmc.credential','clientId:clientSecret') .set('spark.sql.catalog.fmc.scope','PRINCIPAL_ROLE:ALL') ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running") spark.sql("select * from fivetran_log.connector").show() spark.stop()
Iceberg + AWS Glue + PyIceberg. PyIceberg can be installed via pip and is slightly different than PySpark. You must also install the AWS CLI. In this example, we have configured the
~/.pyiceberg.yml
file as such:catalog: my-glue: type: glue glue.id: awsAccountNumber s3.region: your-s3-region glue.region: your-glue-region
The python file:
import os import pyiceberg from pyiceberg.catalog import load_catalog # This script will need a properly configured .pyiceberg.yanl file (cat ~/.pyiceberg.yaml) namespace = 'fivetran_log' table = 'connector' catalog = load_catalog('my-glue') connector_table_load = catalog.load_table(f'{namespace}.{table}') connector_table_scan = connector_table_load.scan() connector_table = connector_table_scan.to_arrow() df = connector_table.to_pandas() print(df)
Iceberg + Fivetran Managed + PyIceberg. PyIceberg can be installed via pip and is slightly different than PySpark. In this example, we have configured the
~/.pyiceberg.yml
file as such:catalog: fmc: uri: "https://polaris.fivetran.com/api/catalog" credential: "clientId:clientSecret" warehouse: group_id scope: PRINCIPAL_ROLE:ALL oauth2-server-uri: "https://polaris.fivetran.com/api/catalog/v1/oauth/tokens"
The python file:
import os import pyiceberg from pyiceberg.catalog import load_catalog # This script will need a properly configured .pyiceberg.yanl file (cat ~/.pyiceberg.yaml) namespace = 'fivetran_log' table = 'connector' catalog = load_catalog('fmc') connector_table_load = catalog.load_table(f'{namespace}.{table}') connector_table_scan = connector_table_load.scan() connector_table = connector_table_scan.to_arrow() df = connector_table.to_pandas() print(df)