Here we demonstrate a simplistic implementation of Medallion Architecture. Medallion Architecture provides a structured and robust approach to building a data lakehouse. By progressively refining data through the Bronze, Silver, and Gold layers, organizations can ensure data quality, improve governance, and ultimately derive more valuable insights for their business
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_json, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# --- Configuration ---
RAW_DATA_PATH = "s3://your-raw-data-bucket/events/" # Replace with your raw data source
BRONZE_TABLE = "bronze_events"
SILVER_PATH = "s3://your-curated-data-bucket/silver/events/" # Replace with your silver layer path
SILVER_TABLE = "silver_events"
GOLD_PATH = "s3://your-refined-data-bucket/gold/user_activity/" # Replace with your gold layer path
GOLD_TABLE = "gold_user_activity"
CHECKPOINT_LOCATION_BRONZE = "s3://your-checkpoint-bucket/bronze_events/" # For structured streaming
CHECKPOINT_LOCATION_SILVER = "s3://your-checkpoint-bucket/silver_events/" # For structured streaming
# --- Define Schema for Raw Data (Example) ---
raw_event_schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("details", StringType(), True) # Could be JSON string
])
# --- Define Schema for Details (Example) ---
details_schema = StructType([
StructField("product_id", StringType(), True),
StructField("category", StringType(), True),
StructField("duration", IntegerType(), True)
])
# --- Initialize Spark Session ---
spark = SparkSession.builder.appName("MedallionArchitecture").enableHiveSupport().getOrCreate()
# --- Bronze Layer (Ingest Raw Data) ---
def create_bronze_table():
"""Ingests raw data and creates the bronze table."""
raw_df = spark.read.format("json").schema(raw_event_schema).load(RAW_DATA_PATH)
(raw_df.withColumn("ingest_timestamp", current_timestamp())
.write.mode("overwrite") # Or "append" depending on your needs
.saveAsTable(BRONZE_TABLE))
print(f"Bronze table '{BRONZE_TABLE}' created.")
def create_bronze_stream():
"""Ingests raw data as a stream and creates the bronze table (Delta Lake)."""
streaming_raw_df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(raw_event_schema)
.option("path", RAW_DATA_PATH)
.option("cloudFiles.schemaLocation", CHECKPOINT_LOCATION_BRONZE)
.load())
bronze_stream_writer = (streaming_raw_df.withColumn("ingest_timestamp", current_timestamp())
.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_LOCATION_BRONZE)
.option("path", RAW_DATA_PATH.replace("events/", "bronze/events/")) # Store bronze data separately
.table(BRONZE_TABLE))
print(f"Bronze stream writing to Delta table '{BRONZE_TABLE}'.")
return bronze_stream_writer
# --- Silver Layer (Cleanse and Conform Data) ---
def create_silver_table():
"""Reads from the bronze table, cleanses, and creates the silver table (Delta Lake)."""
bronze_df = spark.table(BRONZE_TABLE)
silver_df = (bronze_df.filter(col("user_id").isNotNull())
.withColumn("event_timestamp", col("timestamp").cast(TimestampType()))
.withColumn("details_json", from_json(col("details"), details_schema))
.withColumn("product_id", col("details_json.product_id"))
.withColumn("category", col("details_json.category"))
.withColumn("duration", col("details_json.duration"))
.drop("details", "details_json", "timestamp"))
(silver_df.write.format("delta")
.mode("overwrite") # Or "append"
.option("path", SILVER_PATH)
.saveAsTable(SILVER_TABLE))
print(f"Silver table '{SILVER_TABLE}' created at '{SILVER_PATH}'.")
def create_silver_stream():
"""Reads from the bronze stream, cleanses, and creates the silver table (Delta Lake)."""
bronze_stream_df = spark.readStream.table(BRONZE_TABLE)
silver_stream_df = (bronze_stream_df.filter(col("user_id").isNotNull())
.withColumn("event_timestamp", col("timestamp").cast(TimestampType()))
.withColumn("details_json", from_json(col("details"), details_schema))
.withColumn("product_id", col("details_json.product_id"))
.withColumn("category", col("details_json.category"))
.withColumn("duration", col("details_json.duration"))
.drop("details", "details_json", "timestamp"))
silver_stream_writer = (silver_stream_df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_LOCATION_SILVER)
.option("path", SILVER_PATH)
.table(SILVER_TABLE))
print(f"Silver stream writing to Delta table '{SILVER_TABLE}' at '{SILVER_PATH}'.")
return silver_stream_writer
# --- Gold Layer (Refine and Aggregate Data) ---
def create_gold_table():
"""Reads from the silver table, aggregates, and creates the gold table (Delta Lake)."""
silver_df = spark.table(SILVER_TABLE)
gold_df = (silver_df.groupBy("user_id", "category")
.agg({"duration": "sum", "event_timestamp": "max"})
.withColumnRenamed("sum(duration)", "total_duration")
.withColumnRenamed("max(event_timestamp)", "last_activity_time"))
(gold_df.write.format("delta")
.mode("overwrite")
.option("path", GOLD_PATH)
.saveAsTable(GOLD_TABLE))
print(f"Gold table '{GOLD_TABLE}' created at '{GOLD_PATH}'.")
def create_gold_stream():
"""Reads from the silver stream, aggregates, and creates the gold table (Delta Lake)."""
silver_stream_df = spark.readStream.table(SILVER_TABLE)
gold_stream_df = (silver_stream_df.groupBy("user_id", "category", "window") # Need windowing for streaming aggregation
.agg({"duration": "sum", "event_timestamp": "max"})
.withColumnRenamed("sum(duration)", "total_duration")
.withColumnRenamed("max(event_timestamp)", "last_activity_time"))
gold_stream_writer = (gold_stream_df.writeStream.format("delta")
.outputMode("complete") # Or "update" depending on aggregation
.option("checkpointLocation", GOLD_PATH + "/_checkpoints")
.option("path", GOLD_PATH)
.table(GOLD_TABLE))
print(f"Gold stream writing to Delta table '{GOLD_TABLE}' at '{GOLD_PATH}'.")
return gold_stream_writer
# --- Main Execution ---
if __name__ == "__main__":
# Batch Processing (Uncomment if you are processing static data)
create_bronze_table()
create_silver_table()
create_gold_table()
# Structured Streaming (Uncomment if you are processing continuous data)
# bronze_stream_query = create_bronze_stream().start()
# silver_stream_query = create_silver_stream().start()
# gold_stream_query = (create_gold_stream()
# .trigger(processingTime='1 minute') # Adjust trigger interval
# .start())
#
# spark.streams.awaitAnyTermination()
spark.stop()
Explanation of the Sample Code (using PySpark and Delta Lake):
- Configuration: Defines paths for raw data, bronze, silver, and gold layers (ideally on cloud storage like S3 or ADLS). Also defines table names and checkpoint locations for streaming.
- Schema Definition: Example schemas are defined for the raw JSON events and the nested
details
field. You’ll need to adapt these to the actual structure of your data. - Spark Session Initialization: Creates a SparkSession with Hive support (useful for managing metadata).
- Bronze Layer (
create_bronze_table
andcreate_bronze_stream
):- Batch: Reads raw JSON data from the specified path and writes it to a managed Spark table (
bronze_events
). Aningest_timestamp
is added. - Streaming: Reads a stream of JSON data using
cloudFiles
format (optimized for cloud object storage). It writes the data to a Delta Lake table, providing transactional guarantees and schema evolution. A checkpoint location is crucial for fault tolerance in streaming.
- Batch: Reads raw JSON data from the specified path and writes it to a managed Spark table (
- Silver Layer (
create_silver_table
andcreate_silver_stream
):- Batch: Reads from the
bronze_events
table. It then performs basic data quality steps:- Filters out records with a null
user_id
. - Casts the
timestamp
string to aTimestampType
. - Parses the
details
JSON string into a structured format usingfrom_json
. - Extracts individual fields (
product_id
,category
,duration
) from the parsed JSON. - Drops the raw
details
and originaltimestamp
columns. - Writes the cleaned and conformed data to a Delta Lake table (
silver_events
) at the specified path.
- Filters out records with a null
- Streaming: Similar to the batch process but operates on the streaming
bronze_events
DataFrame and writes to a streaming Delta Lake table.
- Batch: Reads from the
- Gold Layer (
create_gold_table
andcreate_gold_stream
):- Batch: Reads from the
silver_events
table. It then performs business-level transformations:- Groups the data by
user_id
andcategory
. - Aggregates the
duration
(calculating the sum) and finds themax
event_timestamp
. - Renames the aggregated columns to be more business-friendly.
- Writes the refined data to a Delta Lake table (
gold_user_activity
) at the specified path.
- Groups the data by
- Streaming: Reads from the streaming
silver_events
DataFrame. For streaming aggregations, you typically need to define a window (e.g., tumbling window of 1 minute) to group events within a specific time frame. TheoutputMode
is set tocomplete
(all aggregated results are rewritten on each trigger) orupdate
(only the changes are written), depending on the specific aggregation requirements.
- Batch: Reads from the
- Main Execution: The
if __name__ == "__main__":
block shows how to run either the batch processing or the structured streaming pipelines by uncommenting the relevant sections.
Key Concepts Used:
- PySpark: A Python API for working with Apache Spark, a distributed processing engine.
- Spark SQL: Spark’s module for structured data processing using SQL and DataFrames.
- Delta Lake: An open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It’s highly recommended for building reliable data lakehouses.
- Structured Streaming: Spark’s scalable and fault-tolerant stream processing engine.
- Schema Definition: Explicitly defining schemas ensures data consistency and allows Spark to optimize processing.
- Data Transformations: Using Spark SQL functions (
filter
,withColumn
,from_json
,groupBy
,agg
,drop
,withColumnRenamed
,cast
) to cleanse, conform, and refine data. - Write Modes:
overwrite
replaces existing data, whileappend
adds new data. For streaming,append
is common for bronze and silver, while gold might usecomplete
orupdate
for aggregations. - Checkpointing: Essential for fault tolerance in structured streaming, storing the state of the processing.
To Adapt This Code:
- Replace Placeholder Paths: Update the S3 paths (or your storage system paths) for raw data, bronze, silver, gold layers, and checkpoint locations.
- Define Your Actual Schema: Modify
raw_event_schema
anddetails_schema
to match the exact structure of your incoming data. - Implement Your Data Quality Rules: Adjust the filtering and cleansing logic in the
create_silver_table/stream
functions based on your specific data quality requirements. - Implement Your Business Logic: Modify the aggregation and transformation logic in the
create_gold_table/stream
functions to create the business-ready views you need. - Choose Batch or Streaming: Decide whether you are processing static data (batch) or continuous data streams and uncomment the appropriate sections in the
if __name__ == "__main__":
block. - Configure Streaming Trigger: Adjust the
processingTime
in thegold_stream_query
if you are using streaming.
This sample code provides a foundational structure for implementing the Medallion Architecture using PySpark and Delta Lake. You’ll need to customize it extensively to fit your specific data sources, data structures, and business requirements. Remember to consider error handling, logging, and more robust configuration management for production environments.