Apache Iceberg with PySpark

In a previous post I have described how to use Apache Iceberg table format with Apache Spark using Scala. I will now describe how to do it with PySpark.

I will use the same testing environment based on MinIO and PostgreSQL and work with Covid vaccinations dataset.

The main difference in PySpark is the way of managing dependencies. In Scala version I could have integrated Iceberg dependency plus AWS (MinIO) and PostgreSQL clients in my Spark application.

I cannot do this with PySpark, instead we should include these dependencies in Spark installation under $SPARK_HOME/jars directory.

Local Setup

Dependencies

I use Docker and extend Apache PySpark image where Spark is already configured: https://hub.docker.com/r/apache/spark-py.

# Dockerfile

FROM apache/spark-py:v3.2.4

# switch to root user to be able to use apt
USER root

RUN apt update && apt install -y wget

ARG MAVEN_REPO=https://repo1.maven.org/maven2
ARG SPARK_VERSION=3.2
ARG SCALA_VERSION=2.12
ARG ICEBERG_VERSION=1.4.2

ARG AWS_VERSION=2.20.18
ARG POSTGRES_VERSION=42.5.1

RUN wget $MAVEN_REPO/org/apache/iceberg/iceberg-spark-runtime-${SPARK_VERSION}_$SCALA_VERSION/$ICEBERG_VERSION/iceberg-spark-runtime-${SPARK_VERSION}_$SCALA_VERSION-$ICEBERG_VERSION.jar \
  -O $SPARK_HOME/jars/iceberg-spark-runtime-${SPARK_VERSION}_$SCALA_VERSION-$ICEBERG_VERSION.jar

RUN wget $MAVEN_REPO/software/amazon/awssdk/bundle/$AWS_VERSION/bundle-$AWS_VERSION.jar \
  -O $SPARK_HOME/jars/aws-bundle-$AWS_VERSION.jar

RUN wget $MAVEN_REPO/software/amazon/awssdk/url-connection-client/$AWS_VERSION/url-connection-client-$AWS_VERSION.jar \
  -O $SPARK_HOME/jars/aws-url-connection-client-$AWS_VERSION.jar

RUN wget $MAVEN_REPO/org/postgresql/postgresql/$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar \
  -O $SPARK_HOME/jars/postgresql-$POSTGRES_VERSION.jar
  
ENV AWS_ACCESS_KEY_ID=minioadmin
ENV AWS_SECRET_ACCESS_KEY=minioadmin
ENV AWS_REGION=eu-west-1

WORKDIR /opt/spark/work-dir/

CMD ["/opt/spark/bin/spark-submit", "main.py"]

I use wget to fetch required JARs from maven central and put them into Spark JARs directory. Our Docker image is now ready with all the necessary dependencies in our Spark installation. We can get started on PySpark code.

PySpark Job

Configuration will be almost the same as Scala Spark application:

from pyspark import SparkConf
from pyspark.sql import SparkSession

def init_spark():
    conf = SparkConf() \
        .setAppName("Apache Iceberg with PySpark") \
        .setMaster("local[2]") \
        .setAll([
            ("spark.driver.memory", "1g"),
            ("spark.executor.memory", "2g"),
            ("spark.sql.shuffle.partitions", "40"),

            # Add Iceberg SQL extensions like UPDATE or DELETE in Spark
            ("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),

            # Register `my_iceberg_catalog`
            ("spark.sql.catalog.my_iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog"),

            # Configure SQL connection to track tables inside `my_iceberg_catalog`
            ("spark.sql.catalog.my_iceberg_catalog.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog"),
            ("spark.sql.catalog.my_iceberg_catalog.uri", "jdbc:postgresql://postgres:5432/iceberg_db"),
            ("spark.sql.catalog.my_iceberg_catalog.jdbc.user", "postgres"),
            ("spark.sql.catalog.my_iceberg_catalog.jdbc.password", "postgres"),

            # Configure Warehouse on MinIO
            ("spark.sql.catalog.my_iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"),
            ("spark.sql.catalog.my_iceberg_catalog.s3.endpoint", "http://minio:9000"),
            ("spark.sql.catalog.my_iceberg_catalog.s3.path-style-access", "true"),
            ("spark.sql.catalog.my_iceberg_catalog.warehouse", "s3://warehouse"),
        ])
    spark = SparkSession.builder.config(conf=conf).getOrCreate()

    return spark

Some notes:

Write Data

import os
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F

def create_table(spark: SparkSession):
    spark.sql("""
      CREATE TABLE IF NOT EXISTS my_iceberg_catalog.db.vaccinations (
        location string,
        date date,
        vaccine string,
        source_url string,
        total_vaccinations bigint,
        people_vaccinated bigint,
        people_fully_vaccinated bigint,
        total_boosters bigint
      ) USING iceberg PARTITIONED BY (location, date)
    """)
    
def write_data(spark: SparkSession):
    current_dir = os.path.realpath(os.path.dirname(__file__))
    path = os.path.join(current_dir, "covid19-vaccinations-country-data", "Belgium.csv")

    vaccinations: DataFrame = spark.read \
      .option("header", "true") \
      .option("inferSchema", "true") \
      .csv(path)

    vaccinations \
      .withColumn("date", F.to_date(F.col("date"))) \
      .writeTo("my_iceberg_catalog.db.vaccinations") \
      .append()

I use the same approach as with Scala Spark. Spark-SQL is used for table management operations, in my case to create table. And DataFrame API is used to write data in the table.

The table should now be registered in PostgreSQL database and data should be stored on MinIO.

We should observe these logs during table creation:

INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
INFO BaseMetastoreTableOperations: Successfully committed to table db.vaccinations in 2294 ms
INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3://warehouse/db/vaccinations/metadata/00000...

And after writing data in table:

INFO TaskSchedulerImpl: Killing all running tasks in stage 4: Stage finished
INFO AppendDataExec: Data source write support IcebergBatchWrite(table=my_iceberg_catalog.db.vaccinations, format=PARQUET) is committing.
INFO SparkWrite: Committing append with 527 new data files to table my_iceberg_catalog.db.vaccinations
INFO BaseMetastoreTableOperations: Successfully committed to table db.vaccinations in 14 ms
INFO SnapshotProducer: Committed snapshot 4190756679203259638 (MergeAppend)
INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3://warehouse....

Read Data

from pyspark.sql import SparkSession

def read_data(spark: SparkSession):
    vaccinations = spark.table("my_iceberg_catalog.db.vaccinations")
    vaccinations.orderBy("date").show(3)
    
+--------+----------+---------------+--------------------+------------------+-----------------+-----------------------+--------------+
|location|      date|        vaccine|          source_url|total_vaccinations|people_vaccinated|people_fully_vaccinated|total_boosters|
+--------+----------+---------------+--------------------+------------------+-----------------+-----------------------+--------------+
| Belgium|2020-12-28|Pfizer/BioNTech|https://epistat.w...|               340|              340|                      0|             0|
| Belgium|2020-12-29|Pfizer/BioNTech|https://epistat.w...|               380|              379|                     11|             0|
| Belgium|2020-12-30|Pfizer/BioNTech|https://epistat.w...|               911|              909|                     21|             0|
+--------+----------+---------------+--------------------+------------------+-----------------+-----------------------+--------------+

Conclusion

There is no major difference in using Iceberg from PySpark compared to Scala Spark.

The key thing is to ensure that Iceberg dependencies are included in your Spark installation whenever it is deployed: locally, on premise, in clouds services or Kubernetes. Once done you can benefit from using Iceberg table format in your PySpark applications!

Code, data and docker local environment setup I have used are available here: https://github.com/romibuzi/iceberg-pyspark-demo