Simplistic implementation of Medallion Architecture (With Code)

Here we demonstrate a simplistic implementation of Medallion Architecture. Medallion Architecture provides a structured and robust approach to building a data lakehouse. By progressively refining data through the Bronze, Silver, and Gold layers, organizations can ensure data quality, improve governance, and ultimately derive more valuable insights for their business

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_json, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# --- Configuration ---
RAW_DATA_PATH = "s3://your-raw-data-bucket/events/"  # Replace with your raw data source
BRONZE_TABLE = "bronze_events"
SILVER_PATH = "s3://your-curated-data-bucket/silver/events/"  # Replace with your silver layer path
SILVER_TABLE = "silver_events"
GOLD_PATH = "s3://your-refined-data-bucket/gold/user_activity/"  # Replace with your gold layer path
GOLD_TABLE = "gold_user_activity"
CHECKPOINT_LOCATION_BRONZE = "s3://your-checkpoint-bucket/bronze_events/"  # For structured streaming
CHECKPOINT_LOCATION_SILVER = "s3://your-checkpoint-bucket/silver_events/"  # For structured streaming

# --- Define Schema for Raw Data (Example) ---
raw_event_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("details", StringType(), True)  # Could be JSON string
])

# --- Define Schema for Details (Example) ---
details_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("duration", IntegerType(), True)
])

# --- Initialize  Session ---
spark = SparkSession.builder.appName("MedallionArchitecture").enableHiveSupport().getOrCreate()

# --- Bronze Layer (Ingest Raw Data) ---
def create_bronze_table():
    """Ingests raw data and creates the bronze table."""
    raw_df = spark.read.format("json").schema(raw_event_schema).load(RAW_DATA_PATH)
    (raw_df.withColumn("ingest_timestamp", current_timestamp())
             .write.mode("overwrite")  # Or "append" depending on your needs
             .saveAsTable(BRONZE_TABLE))
    print(f"Bronze table '{BRONZE_TABLE}' created.")

def create_bronze_stream():
    """Ingests raw data as a stream and creates the bronze table (Delta Lake)."""
    streaming_raw_df = (spark.readStream.format("cloudFiles")
                        .option("cloudFiles.format", "json")
                        .schema(raw_event_schema)
                        .option("path", RAW_DATA_PATH)
                        .option("cloudFiles.schemaLocation", CHECKPOINT_LOCATION_BRONZE)
                        .load())

    bronze_stream_writer = (streaming_raw_df.withColumn("ingest_timestamp", current_timestamp())
                             .writeStream.format("delta")
                             .outputMode("append")
                             .option("checkpointLocation", CHECKPOINT_LOCATION_BRONZE)
                             .option("path", RAW_DATA_PATH.replace("events/", "bronze/events/")) # Store bronze data separately
                             .table(BRONZE_TABLE))
    print(f"Bronze stream writing to Delta table '{BRONZE_TABLE}'.")
    return bronze_stream_writer

# --- Silver Layer (Cleanse and Conform Data) ---
def create_silver_table():
    """Reads from the bronze table, cleanses, and creates the silver table (Delta Lake)."""
    bronze_df = spark.table(BRONZE_TABLE)
    silver_df = (bronze_df.filter(col("user_id").isNotNull())
                          .withColumn("event_timestamp", col("timestamp").cast(TimestampType()))
                          .withColumn("details_json", from_json(col("details"), details_schema))
                          .withColumn("product_id", col("details_json.product_id"))
                          .withColumn("category", col("details_json.category"))
                          .withColumn("duration", col("details_json.duration"))
                          .drop("details", "details_json", "timestamp"))

    (silver_df.write.format("delta")
               .mode("overwrite")  # Or "append"
               .option("path", SILVER_PATH)
               .saveAsTable(SILVER_TABLE))
    print(f"Silver table '{SILVER_TABLE}' created at '{SILVER_PATH}'.")

def create_silver_stream():
    """Reads from the bronze stream, cleanses, and creates the silver table (Delta Lake)."""
    bronze_stream_df = spark.readStream.table(BRONZE_TABLE)
    silver_stream_df = (bronze_stream_df.filter(col("user_id").isNotNull())
                             .withColumn("event_timestamp", col("timestamp").cast(TimestampType()))
                             .withColumn("details_json", from_json(col("details"), details_schema))
                             .withColumn("product_id", col("details_json.product_id"))
                             .withColumn("category", col("details_json.category"))
                             .withColumn("duration", col("details_json.duration"))
                             .drop("details", "details_json", "timestamp"))

    silver_stream_writer = (silver_stream_df.writeStream.format("delta")
                              .outputMode("append")
                              .option("checkpointLocation", CHECKPOINT_LOCATION_SILVER)
                              .option("path", SILVER_PATH)
                              .table(SILVER_TABLE))
    print(f"Silver stream writing to Delta table '{SILVER_TABLE}' at '{SILVER_PATH}'.")
    return silver_stream_writer

# --- Gold Layer (Refine and Aggregate Data) ---
def create_gold_table():
    """Reads from the silver table, aggregates, and creates the gold table (Delta Lake)."""
    silver_df = spark.table(SILVER_TABLE)
    gold_df = (silver_df.groupBy("user_id", "category")
                        .agg({"duration": "sum", "event_timestamp": "max"})
                        .withColumnRenamed("sum(duration)", "total_duration")
                        .withColumnRenamed("max(event_timestamp)", "last_activity_time"))

    (gold_df.write.format("delta")
             .mode("overwrite")
             .option("path", GOLD_PATH)
             .saveAsTable(GOLD_TABLE))
    print(f"Gold table '{GOLD_TABLE}' created at '{GOLD_PATH}'.")

def create_gold_stream():
    """Reads from the silver stream, aggregates, and creates the gold table (Delta Lake)."""
    silver_stream_df = spark.readStream.table(SILVER_TABLE)
    gold_stream_df = (silver_stream_df.groupBy("user_id", "category", "window") # Need windowing for streaming aggregation
                               .agg({"duration": "sum", "event_timestamp": "max"})
                               .withColumnRenamed("sum(duration)", "total_duration")
                               .withColumnRenamed("max(event_timestamp)", "last_activity_time"))

    gold_stream_writer = (gold_stream_df.writeStream.format("delta")
                            .outputMode("complete") # Or "update" depending on aggregation
                            .option("checkpointLocation", GOLD_PATH + "/_checkpoints")
                            .option("path", GOLD_PATH)
                            .table(GOLD_TABLE))
    print(f"Gold stream writing to Delta table '{GOLD_TABLE}' at '{GOLD_PATH}'.")
    return gold_stream_writer

# --- Main Execution ---
if __name__ == "__main__":
    # Batch Processing (Uncomment if you are processing static data)
    create_bronze_table()
    create_silver_table()
    create_gold_table()

    # Structured Streaming (Uncomment if you are processing continuous data)
    # bronze_stream_query = create_bronze_stream().start()
    # silver_stream_query = create_silver_stream().start()
    # gold_stream_query = (create_gold_stream()
    #                      .trigger(processingTime='1 minute') # Adjust trigger interval
    #                      .start())
    #
    # spark.streams.awaitAnyTermination()

    spark.stop()

Explanation of the Sample Code (using PySpark and Delta Lake):

  1. Configuration: Defines paths for raw data, bronze, silver, and gold layers (ideally on cloud storage like S3 or ADLS). Also defines table names and checkpoint locations for streaming.
  2. Schema Definition: Example schemas are defined for the raw JSON events and the nested details field. You’ll need to adapt these to the actual structure of your data.
  3. Spark Session Initialization: Creates a SparkSession with Hive support (useful for managing metadata).
  4. Bronze Layer (create_bronze_table and create_bronze_stream):
    • Batch: Reads raw JSON data from the specified path and writes it to a managed Spark table (bronze_events). An ingest_timestamp is added.
    • Streaming: Reads a stream of JSON data using cloudFiles format (optimized for cloud object storage). It writes the data to a Delta Lake table, providing transactional guarantees and schema evolution. A checkpoint location is crucial for fault tolerance in streaming.
  5. Silver Layer (create_silver_table and create_silver_stream):
    • Batch: Reads from the bronze_events table. It then performs basic data quality steps:
      • Filters out records with a null user_id.
      • Casts the timestamp string to a TimestampType.
      • Parses the details JSON string into a structured format using from_json.
      • Extracts individual fields (product_id, category, duration) from the parsed JSON.
      • Drops the raw details and original timestamp columns.
      • Writes the cleaned and conformed data to a Delta Lake table (silver_events) at the specified path.
    • Streaming: Similar to the batch process but operates on the streaming bronze_events DataFrame and writes to a streaming Delta Lake table.
  6. Gold Layer (create_gold_table and create_gold_stream):
    • Batch: Reads from the silver_events table. It then performs business-level transformations:
      • Groups the data by user_id and category.
      • Aggregates the duration (calculating the sum) and finds the max event_timestamp.
      • Renames the aggregated columns to be more business-friendly.
      • Writes the refined data to a Delta Lake table (gold_user_activity) at the specified path.
    • Streaming: Reads from the streaming silver_events DataFrame. For streaming aggregations, you typically need to define a window (e.g., tumbling window of 1 minute) to group events within a specific time frame. The outputMode is set to complete (all aggregated results are rewritten on each trigger) or update (only the changes are written), depending on the specific aggregation requirements.
  7. Main Execution: The if __name__ == "__main__": block shows how to run either the batch processing or the structured streaming pipelines by uncommenting the relevant sections.

Key Concepts Used:

  • PySpark: A Python for working with Apache Spark, a distributed processing engine.
  • Spark SQL: Spark’s module for structured data processing using SQL and DataFrames.
  • Delta Lake: An open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It’s highly recommended for building reliable data lakehouses.
  • Structured Streaming: Spark’s scalable and fault-tolerant stream processing engine.
  • Schema Definition: Explicitly defining schemas ensures data consistency and allows Spark to optimize processing.
  • Data Transformations: Using Spark SQL functions (filter, withColumn, from_json, groupBy, agg, drop, withColumnRenamed, cast) to cleanse, conform, and refine data.
  • Write Modes: overwrite replaces existing data, while append adds new data. For streaming, append is common for bronze and silver, while gold might use complete or update for aggregations.
  • Checkpointing: Essential for fault tolerance in structured streaming, storing the state of the processing.

To Adapt This Code:

  1. Replace Placeholder Paths: Update the S3 paths (or your storage system paths) for raw data, bronze, silver, gold layers, and checkpoint locations.
  2. Define Your Actual Schema: Modify raw_event_schema and details_schema to match the exact structure of your incoming data.
  3. Implement Your Data Quality Rules: Adjust the filtering and cleansing logic in the create_silver_table/stream functions based on your specific data quality requirements.
  4. Implement Your Business Logic: Modify the aggregation and transformation logic in the create_gold_table/stream functions to create the business-ready views you need.
  5. Choose Batch or Streaming: Decide whether you are processing static data (batch) or continuous data streams and uncomment the appropriate sections in the if __name__ == "__main__": block.
  6. Configure Streaming Trigger: Adjust the processingTime in the gold_stream_query if you are using streaming.

This sample code provides a foundational structure for implementing the Medallion Architecture using PySpark and Delta Lake. You’ll need to customize it extensively to fit your specific data sources, data structures, and business requirements. Remember to consider error handling, logging, and more robust configuration management for production environments.