on
Apache Iceberg with PySpark
March 2025 update: use latest Iceberg release 1.8.1
In a previous post I have described how to use Apache Iceberg table format with Apache Spark using Scala. I will now describe how to do it with PySpark.
I will use the same testing environment based on MinIO and PostgreSQL and work with Covid vaccinations dataset.
The main difference in PySpark is the way of managing dependencies. In Scala version I could have integrated Iceberg dependency plus AWS (MinIO) and PostgreSQL clients in my Spark application.
I cannot do this with PySpark, instead we should include these dependencies in Spark installation under $SPARK_HOME/jars
directory.
Local Setup
Dependencies
I use Docker and extend Apache PySpark image where Spark is already configured: https://hub.docker.com/r/apache/spark-py.
# Dockerfile
FROM apache/spark-py:v3.4.0
# switch to root user to be able to use apt
USER root
RUN apt update && apt install -y wget
ARG MAVEN_REPO=https://repo1.maven.org/maven2
ARG SPARK_VERSION=3.4
ARG SCALA_VERSION=2.13
ARG ICEBERG_VERSION=1.8.1
ARG AWS_VERSION=2.31.1
ARG POSTGRES_VERSION=42.7.5
RUN wget $MAVEN_REPO/org/apache/iceberg/iceberg-spark-runtime-${SPARK_VERSION}_$SCALA_VERSION/$ICEBERG_VERSION/iceberg-spark-runtime-${SPARK_VERSION}_$SCALA_VERSION-$ICEBERG_VERSION.jar \
-O $SPARK_HOME/jars/iceberg-spark-runtime-${SPARK_VERSION}_$SCALA_VERSION-$ICEBERG_VERSION.jar
RUN wget $MAVEN_REPO/software/amazon/awssdk/bundle/$AWS_VERSION/bundle-$AWS_VERSION.jar \
-O $SPARK_HOME/jars/aws-bundle-$AWS_VERSION.jar
RUN wget $MAVEN_REPO/software/amazon/awssdk/url-connection-client/$AWS_VERSION/url-connection-client-$AWS_VERSION.jar \
-O $SPARK_HOME/jars/aws-url-connection-client-$AWS_VERSION.jar
RUN wget $MAVEN_REPO/org/postgresql/postgresql/$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar \
-O $SPARK_HOME/jars/postgresql-$POSTGRES_VERSION.jar
ENV AWS_ACCESS_KEY_ID=minioadmin
ENV AWS_SECRET_ACCESS_KEY=minioadmin
ENV AWS_REGION=eu-west-1
WORKDIR /opt/spark/work-dir/
CMD ["/opt/spark/bin/spark-submit", "main.py"]
I use wget
to fetch required JARs from maven central and put them into Spark JARs directory. Our Docker image is now ready with all the necessary dependencies in our Spark installation. We can get started on PySpark code.
PySpark Job
Configuration will be almost the same as Scala Spark application:
from pyspark import SparkConf
from pyspark.sql import SparkSession
def init_spark():
conf = SparkConf() \
.setAppName("Apache Iceberg with PySpark") \
.setMaster("local[2]") \
.setAll([
("spark.driver.memory", "1g"),
("spark.executor.memory", "2g"),
("spark.sql.shuffle.partitions", "40"),
# Add Iceberg SQL extensions like UPDATE or DELETE in Spark
("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),
# Register `my_iceberg_catalog`
("spark.sql.catalog.my_iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog"),
# Configure SQL connection to track tables inside `my_iceberg_catalog`
("spark.sql.catalog.my_iceberg_catalog.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog"),
("spark.sql.catalog.my_iceberg_catalog.uri", "jdbc:postgresql://postgres:5432/iceberg_db"),
("spark.sql.catalog.my_iceberg_catalog.jdbc.user", "postgres"),
("spark.sql.catalog.my_iceberg_catalog.jdbc.password", "postgres"),
# Configure Warehouse on MinIO
("spark.sql.catalog.my_iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"),
("spark.sql.catalog.my_iceberg_catalog.s3.endpoint", "http://minio:9000"),
("spark.sql.catalog.my_iceberg_catalog.s3.path-style-access", "true"),
("spark.sql.catalog.my_iceberg_catalog.warehouse", "s3://warehouse"),
])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
return spark
Some notes:
- In my testing environment PySpark application runs in Docker container inside docker-compose network.
postgres:5432
points to PostgreSQL container andhttp://minio:9000
to MinIO container. - I set
s3.path-style-access
to true so thatS3FileIO
queries on MinIO use path style access:http://minio:9000/warehouse/{key-name}
instead of Virtual-Host style:http://warehouse.minio:9000/{key-name}
Write Data
import os
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
def create_table(spark: SparkSession):
spark.sql("""
CREATE TABLE IF NOT EXISTS my_iceberg_catalog.db.vaccinations (
location string,
date date,
vaccine string,
source_url string,
total_vaccinations bigint,
people_vaccinated bigint,
people_fully_vaccinated bigint,
total_boosters bigint
) USING iceberg PARTITIONED BY (location, date)
""")
def write_data(spark: SparkSession):
current_dir = os.path.realpath(os.path.dirname(__file__))
path = os.path.join(current_dir, "covid19-vaccinations-country-data", "Belgium.csv")
vaccinations: DataFrame = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(path)
vaccinations \
.withColumn("date", F.to_date(F.col("date"))) \
.writeTo("my_iceberg_catalog.db.vaccinations") \
.append()
I use the same approach as with Scala Spark. Spark-SQL is used for table management operations, in my case to create table. And DataFrame API is used to write data in the table.
The table should now be registered in PostgreSQL database and data should be stored on MinIO.
We should observe these logs during table creation:
INFO CatalogUtil: Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
INFO BaseMetastoreTableOperations: Successfully committed to table db.vaccinations in 2294 ms
INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3://warehouse/db/vaccinations/metadata/00000...
And after writing data in table:
INFO TaskSchedulerImpl: Killing all running tasks in stage 4: Stage finished
INFO AppendDataExec: Data source write support IcebergBatchWrite(table=my_iceberg_catalog.db.vaccinations, format=PARQUET) is committing.
INFO SparkWrite: Committing append with 527 new data files to table my_iceberg_catalog.db.vaccinations
INFO BaseMetastoreTableOperations: Successfully committed to table db.vaccinations in 14 ms
INFO SnapshotProducer: Committed snapshot 4190756679203259638 (MergeAppend)
INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: s3://warehouse....
Read Data
from pyspark.sql import SparkSession
def read_data(spark: SparkSession):
vaccinations = spark.table("my_iceberg_catalog.db.vaccinations")
vaccinations.orderBy("date").show(3)
+--------+----------+---------------+--------------------+------------------+-----------------+-----------------------+--------------+
|location| date| vaccine| source_url|total_vaccinations|people_vaccinated|people_fully_vaccinated|total_boosters|
+--------+----------+---------------+--------------------+------------------+-----------------+-----------------------+--------------+
| Belgium|2020-12-28|Pfizer/BioNTech|https://epistat.w...| 340| 340| 0| 0|
| Belgium|2020-12-29|Pfizer/BioNTech|https://epistat.w...| 380| 379| 11| 0|
| Belgium|2020-12-30|Pfizer/BioNTech|https://epistat.w...| 911| 909| 21| 0|
+--------+----------+---------------+--------------------+------------------+-----------------+-----------------------+--------------+
Conclusion
There is no major difference in using Iceberg from PySpark compared to Scala Spark.
The key thing is to ensure that Iceberg dependencies are included in your Spark installation whenever it is deployed: locally, on premise, in clouds services or Kubernetes. Once done you can benefit from using Iceberg table format in your PySpark applications!
Links
Code, data and docker local environment setup I have used are available here: https://github.com/romibuzi/iceberg-pyspark-demo