How to Avoid Getting Duplicate Data When Querying Iceberg or Delta Tables?
Issue
I get duplicate data when querying Iceberg or Delta tables in my data lake.
Environment
Destination: Managed Data Lake Service
Resolution
Before querying Iceberg or Delta tables in your data lake, ensure that your catalog is properly configured. The catalog tracks the specific files required to generate query results. Using a catalog is essential when working with Iceberg or Delta tables.
Querying raw Parquet files instead of the catalog may result in duplicate data, depending on your snapshot retention period.
To ensure accurate results, always query through the catalog. The example below demonstrates the correct approach. Your queries must not reference the underlying Parquet files.
Examples in Python
- Iceberg + AWS Glue + Spark. You must also install the AWS CLI. - import pyspark from pyspark.sql import SparkSession import os conf = ( pyspark.SparkConf() .setAppName('spark_app_glue') .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.iceberg:iceberg-aws-bundle:1.7.1') .set('spark.sql.defaultCatalog', 'glue_catalog') .set('spark.sql.catalog.glue_catalog.warehouse', 's3://path_to_datalake') .set('spark.sql.catalog.glue_catalog', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.glue_catalog.type','glue') .set('spark.sql.catalog.glue_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') .set('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running") spark.sql("select * from fivetran_log.connector").show() spark.stop()
- Iceberg + Fivetran Managed + Spark - import pyspark from pyspark.sql import SparkSession import os conf = ( pyspark.SparkConf() .setAppName('spark_app_fivetran') .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1') .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') .set('spark.sql.defaultCatalog', 'fmc') .set('spark.sql.catalog.fmc', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.fmc.warehouse','group_id') .set('spark.sql.catalog.fmc.region','your-aws-region') .set('spark.sql.catalog.fmc.type', 'rest') .set('spark.sql.catalog.fmc.header.X-Iceberg-Access-Delegation','vended-credentials') .set('spark.sql.catalog.fmc.uri', 'https://polaris.fivetran.com/api/catalog/') .set('spark.sql.catalog.fmc.oauth2-server-uri', 'https://polaris.fivetran.com/api/catalog/v1/oauth/tokens') .set('spark.sql.catalog.fmc.credential','clientId:clientSecret') .set('spark.sql.catalog.fmc.scope','PRINCIPAL_ROLE:ALL') ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running") spark.sql("select * from fivetran_log.connector").show() spark.stop()
- Iceberg + AWS Glue + PyIceberg. PyIceberg can be installed via pip and is slightly different than PySpark. You must also install the AWS CLI. In this example, we have configured the - ~/.pyiceberg.ymlfile as such:- catalog: my-glue: type: glue glue.id: awsAccountNumber s3.region: your-s3-region glue.region: your-glue-region- The python file: - import os import pyiceberg from pyiceberg.catalog import load_catalog # This script will need a properly configured .pyiceberg.yanl file (cat ~/.pyiceberg.yaml) namespace = 'fivetran_log' table = 'connector' catalog = load_catalog('my-glue') connector_table_load = catalog.load_table(f'{namespace}.{table}') connector_table_scan = connector_table_load.scan() connector_table = connector_table_scan.to_arrow() df = connector_table.to_pandas() print(df)
- Iceberg + Fivetran Managed + PyIceberg. PyIceberg can be installed via pip and is slightly different than PySpark. In this example, we have configured the - ~/.pyiceberg.ymlfile as such:- catalog: fmc: uri: "https://polaris.fivetran.com/api/catalog" credential: "clientId:clientSecret" warehouse: group_id scope: PRINCIPAL_ROLE:ALL oauth2-server-uri: "https://polaris.fivetran.com/api/catalog/v1/oauth/tokens"- The python file: - import os import pyiceberg from pyiceberg.catalog import load_catalog # This script will need a properly configured .pyiceberg.yanl file (cat ~/.pyiceberg.yaml) namespace = 'fivetran_log' table = 'connector' catalog = load_catalog('fmc') connector_table_load = catalog.load_table(f'{namespace}.{table}') connector_table_scan = connector_table_load.scan() connector_table = connector_table_scan.to_arrow() df = connector_table.to_pandas() print(df)