Here we demonstrate a simplistic implementation of Medallion Architecture. Medallion Architecture provides a structured and robust approach to building a data lakehouse. By progressively refining data through the Bronze, Silver, and Gold layers, organizations can ensure data quality, improve governance, and ultimately derive more valuable insights for their business
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_json, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# --- Configuration ---
RAW_DATA_PATH = "s3://your-raw-data-bucket/events/" # Replace with your raw data source
BRONZE_TABLE = "bronze_events"
SILVER_PATH = "s3://your-curated-data-bucket/silver/events/" # Replace with your silver layer path
SILVER_TABLE = "silver_events"
GOLD_PATH = "s3://your-refined-data-bucket/gold/user_activity/" # Replace with your gold layer path
GOLD_TABLE = "gold_user_activity"
CHECKPOINT_LOCATION_BRONZE = "s3://your-checkpoint-bucket/bronze_events/" # For structured streaming
CHECKPOINT_LOCATION_SILVER = "s3://your-checkpoint-bucket/silver_events/" # For structured streaming
# --- Define Schema for Raw Data (Example) ---
raw_event_schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("details", StringType(), True) # Could be JSON string
])
# --- Define Schema for Details (Example) ---
details_schema = StructType([
StructField("product_id", StringType(), True),
StructField("category", StringType(), True),
StructField("duration", IntegerType(), True)
])
# --- Initialize Spark Session ---
spark = SparkSession.builder.appName("MedallionArchitecture").enableHiveSupport().getOrCreate()
# --- Bronze Layer (Ingest Raw Data) ---
def create_bronze_table():
"""Ingests raw data and creates the bronze table."""
raw_df = spark.read.format("json").schema(raw_event_schema).load(RAW_DATA_PATH)
(raw_df.withColumn("ingest_timestamp", current_timestamp())
.write.mode("overwrite") # Or "append" depending on your needs
.saveAsTable(BRONZE_TABLE))
print(f"Bronze table '{BRONZE_TABLE}' created.")
def create_bronze_stream():
"""Ingests raw data as a stream and creates the bronze table (Delta Lake)."""
streaming_raw_df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(raw_event_schema)
.option("path", RAW_DATA_PATH)
.option("cloudFiles.schemaLocation", CHECKPOINT_LOCATION_BRONZE)
.load())
bronze_stream_writer = (streaming_raw_df.withColumn("ingest_timestamp", current_timestamp())
.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_LOCATION_BRONZE)
.option("path", RAW_DATA_PATH.replace("events/", "bronze/events/")) # Store bronze data separately
.table(BRONZE_TABLE))
print(f"Bronze stream writing to Delta table '{BRONZE_TABLE}'.")
return bronze_stream_writer
# --- Silver Layer (Cleanse and Conform Data) ---
def create_silver_table():
"""Reads from the bronze table, cleanses, and creates the silver table (Delta Lake)."""
bronze_df = spark.table(BRONZE_TABLE)
silver_df = (bronze_df.filter(col("user_id").isNotNull())
.withColumn("event_timestamp", col("timestamp").cast(TimestampType()))
.withColumn("details_json", from_json(col("details"), details_schema))
.withColumn("product_id", col("details_json.product_id"))
.withColumn("category", col("details_json.category"))
.withColumn("duration", col("details_json.duration"))
.drop("details", "details_json", "timestamp"))
(silver_df.write.format("delta")
.mode("overwrite") # Or "append"
.option("path", SILVER_PATH)
.saveAsTable(SILVER_TABLE))
print(f"Silver table '{SILVER_TABLE}' created at '{SILVER_PATH}'.")
def create_silver_stream():
"""Reads from the bronze stream, cleanses, and creates the silver table (Delta Lake)."""
bronze_stream_df = spark.readStream.table(BRONZE_TABLE)
silver_stream_df = (bronze_stream_df.filter(col("user_id").isNotNull())
.withColumn("event_timestamp", col("timestamp").cast(TimestampType()))
.withColumn("details_json", from_json(col("details"), details_schema))
.withColumn("product_id", col("details_json.product_id"))
.withColumn("category", col("details_json.category"))
.withColumn("duration", col("details_json.duration"))
.drop("details", "details_json", "timestamp"))
silver_stream_writer = (silver_stream_df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_LOCATION_SILVER)
.option("path", SILVER_PATH)
.table(SILVER_TABLE))
print(f"Silver stream writing to Delta table '{SILVER_TABLE}' at '{SILVER_PATH}'.")
return silver_stream_writer
# --- Gold Layer (Refine and Aggregate Data) ---
def create_gold_table():
"""Reads from the silver table, aggregates, and creates the gold table (Delta Lake)."""
silver_df = spark.table(SILVER_TABLE)
gold_df = (silver_df.groupBy("user_id", "category")
.agg({"duration": "sum", "event_timestamp": "max"})
.withColumnRenamed("sum(duration)", "total_duration")
.withColumnRenamed("max(event_timestamp)", "last_activity_time"))
(gold_df.write.format("delta")
.mode("overwrite")
.option("path", GOLD_PATH)
.saveAsTable(GOLD_TABLE))
print(f"Gold table '{GOLD_TABLE}' created at '{GOLD_PATH}'.")
def create_gold_stream():
"""Reads from the silver stream, aggregates, and creates the gold table (Delta Lake)."""
silver_stream_df = spark.readStream.table(SILVER_TABLE)
gold_stream_df = (silver_stream_df.groupBy("user_id", "category", "window") # Need windowing for streaming aggregation
.agg({"duration": "sum", "event_timestamp": "max"})
.withColumnRenamed("sum(duration)", "total_duration")
.withColumnRenamed("max(event_timestamp)", "last_activity_time"))
gold_stream_writer = (gold_stream_df.writeStream.format("delta")
.outputMode("complete") # Or "update" depending on aggregation
.option("checkpointLocation", GOLD_PATH + "/_checkpoints")
.option("path", GOLD_PATH)
.table(GOLD_TABLE))
print(f"Gold stream writing to Delta table '{GOLD_TABLE}' at '{GOLD_PATH}'.")
return gold_stream_writer
# --- Main Execution ---
if __name__ == "__main__":
# Batch Processing (Uncomment if you are processing static data)
create_bronze_table()
create_silver_table()
create_gold_table()
# Structured Streaming (Uncomment if you are processing continuous data)
# bronze_stream_query = create_bronze_stream().start()
# silver_stream_query = create_silver_stream().start()
# gold_stream_query = (create_gold_stream()
# .trigger(processingTime='1 minute') # Adjust trigger interval
# .start())
#
# spark.streams.awaitAnyTermination()
spark.stop()
Explanation of the Sample Code (using PySpark and Delta Lake):
- Configuration: Defines paths for raw data, bronze, silver, and gold layers (ideally on cloud storage like S3 or ADLS). Also defines table names and checkpoint locations for streaming.
- Schema Definition: Example schemas are defined for the raw JSON events and the nested
detailsfield. You’ll need to adapt these to the actual structure of your data. - Spark Session Initialization: Creates a SparkSession with Hive support (useful for managing metadata).
- Bronze Layer (
create_bronze_tableandcreate_bronze_stream):- Batch: Reads raw JSON data from the specified path and writes it to a managed Spark table (
bronze_events). Aningest_timestampis added. - Streaming: Reads a stream of JSON data using
cloudFilesformat (optimized for cloud object storage). It writes the data to a Delta Lake table, providing transactional guarantees and schema evolution. A checkpoint location is crucial for fault tolerance in streaming.
- Batch: Reads raw JSON data from the specified path and writes it to a managed Spark table (
- Silver Layer (
create_silver_tableandcreate_silver_stream):- Batch: Reads from the
bronze_eventstable. It then performs basic data quality steps:- Filters out records with a null
user_id. - Casts the
timestampstring to aTimestampType. - Parses the
detailsJSON string into a structured format usingfrom_json. - Extracts individual fields (
product_id,category,duration) from the parsed JSON. - Drops the raw
detailsand originaltimestampcolumns. - Writes the cleaned and conformed data to a Delta Lake table (
silver_events) at the specified path.
- Filters out records with a null
- Streaming: Similar to the batch process but operates on the streaming
bronze_eventsDataFrame and writes to a streaming Delta Lake table.
- Batch: Reads from the
- Gold Layer (
create_gold_tableandcreate_gold_stream):- Batch: Reads from the
silver_eventstable. It then performs business-level transformations:- Groups the data by
user_idandcategory. - Aggregates the
duration(calculating the sum) and finds themaxevent_timestamp. - Renames the aggregated columns to be more business-friendly.
- Writes the refined data to a Delta Lake table (
gold_user_activity) at the specified path.
- Groups the data by
- Streaming: Reads from the streaming
silver_eventsDataFrame. For streaming aggregations, you typically need to define a window (e.g., tumbling window of 1 minute) to group events within a specific time frame. TheoutputModeis set tocomplete(all aggregated results are rewritten on each trigger) orupdate(only the changes are written), depending on the specific aggregation requirements.
- Batch: Reads from the
- Main Execution: The
if __name__ == "__main__":block shows how to run either the batch processing or the structured streaming pipelines by uncommenting the relevant sections.
Key Concepts Used:
- PySpark: A Python API for working with Apache Spark, a distributed processing engine.
- Spark SQL: Spark’s module for structured data processing using SQL and DataFrames.
- Delta Lake: An open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It’s highly recommended for building reliable data lakehouses.
- Structured Streaming: Spark’s scalable and fault-tolerant stream processing engine.
- Schema Definition: Explicitly defining schemas ensures data consistency and allows Spark to optimize processing.
- Data Transformations: Using Spark SQL functions (
filter,withColumn,from_json,groupBy,agg,drop,withColumnRenamed,cast) to cleanse, conform, and refine data. - Write Modes:
overwritereplaces existing data, whileappendadds new data. For streaming,appendis common for bronze and silver, while gold might usecompleteorupdatefor aggregations. - Checkpointing: Essential for fault tolerance in structured streaming, storing the state of the processing.
To Adapt This Code:
- Replace Placeholder Paths: Update the S3 paths (or your storage system paths) for raw data, bronze, silver, gold layers, and checkpoint locations.
- Define Your Actual Schema: Modify
raw_event_schemaanddetails_schemato match the exact structure of your incoming data. - Implement Your Data Quality Rules: Adjust the filtering and cleansing logic in the
create_silver_table/streamfunctions based on your specific data quality requirements. - Implement Your Business Logic: Modify the aggregation and transformation logic in the
create_gold_table/streamfunctions to create the business-ready views you need. - Choose Batch or Streaming: Decide whether you are processing static data (batch) or continuous data streams and uncomment the appropriate sections in the
if __name__ == "__main__":block. - Configure Streaming Trigger: Adjust the
processingTimein thegold_stream_queryif you are using streaming.
This sample code provides a foundational structure for implementing the Medallion Architecture using PySpark and Delta Lake. You’ll need to customize it extensively to fit your specific data sources, data structures, and business requirements. Remember to consider error handling, logging, and more robust configuration management for production environments.
