Databricks Workflow Sample: Simple ETL Pipeline

Let’s walk through a sample Workflow using the Workflows UI. This example will demonstrate a simple ETL (Extract, Transform, Load) pipeline:

Scenario:

  1. Extract: Read raw customer data from a CSV file in storage (e.g., S3, ADLS Gen2).
  2. Transform: Clean and transform the data using a Databricks notebook (e.g., filter out invalid records, standardize date formats).
  3. Load: Write the cleaned and transformed data into a Delta Lake table in your Databricks workspace.

Steps to Create the Workflow:

  1. Navigate to Workflows: In your Databricks workspace, click on the “Workflows” icon in the left sidebar.
  2. Create a New Workflow: Click the “Create Job” button. This will open the job creation interface.
  3. Configure the First Task (Extract – Reading the CSV):
    • Task Name: Give this task a descriptive name, like “ExtractRawData”.
    • Type: Select “Notebook”.
    • Notebook Path: Browse to or enter the path of a Databricks notebook that contains the PySpark code to read the CSV file from your cloud storage.

    Example extract_raw_data.py notebook content:

    
    from pyspark..functions import col, to_date
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    # Configure the path to your raw CSV data in cloud storage
    raw_data_path = "s3://your-bucket/raw_customer_data.csv"
    
    # Define the schema (optional, but recommended for production)
    schema = StructType([
        StructField("customer_id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("signup_date", StringType(), True)
    ])
    
    # Read the CSV data into a DataFrame
    raw_df = .read.csv(raw_data_path, header=True, schema=schema)
    
    # You might want to log the number of records read
    print(f"Number of raw records read: {raw_df.count()}")
    
    # Make the DataFrame available for the next task (optional, can use temporary views)
    raw_df.createOrReplaceTempView("raw_customer_data")
            
    • Cluster: Choose the cluster where this task will run. You can select an existing all-purpose cluster or configure a new job cluster that will be created just for this run. For a workflow, using a job cluster is often preferred as it’s more resource-efficient.
    • Parameters (Optional): If your notebook needs parameters (e.g., the input file path), you can configure them here.
    • Click “Create Task”.
  4. Configure the Second Task (Transform – Cleaning and Transforming Data):
    • Click the “+” icon next to the “ExtractRawData” task and select “Notebook”. This indicates that this task will run after the first one completes successfully.
    • Task Name: Give it a name like “TransformData”.
    • Notebook Path: Browse to or enter the path of a Databricks notebook that contains the PySpark code to transform the raw data.

    Example transform_data.py notebook content:

    • Cluster: You can choose to run this task on the same cluster as the previous task or a different one.
    • Click “Create Task”.
  5. Configure the Third Task (Load – Writing to Delta Lake):
    • Click the “+” icon next to the “TransformData” task and select “Notebook”.
    • Task Name: Give it a name like “LoadToDeltaLake”.
    • Notebook Path: Browse to or enter the path of a Databricks notebook that contains the PySpark code to write the transformed data to a Delta Lake table.

    Example load_to_delta.py notebook content:

    
    # Assuming the cleaned data is available in the temporary view "cleaned_customer_data"
    cleaned_df = spark.sql("SELECT * FROM cleaned_customer_data")
    
    # Define the path to your Delta Lake table
    delta_table_path = "dbfs:/user/hive/warehouse/customer_data_delta"
    delta_table_name = "customer_data_delta" # If you want to register it in the metastore
    
    # Write the DataFrame to Delta Lake
    cleaned_df.write.format("delta").mode("overwrite").save(delta_table_path)
    
    # Optionally, create or replace a Delta Lake table in the metastore
    # cleaned_df.write.format("delta").mode("overwrite").saveAsTable(delta_table_name)
    
    print(f"Data successfully loaded to Delta Lake at: {delta_table_path}")
            
    • Cluster: Choose the appropriate cluster.
    • Click “Create Task”.
  6. Configure Workflow Settings:
    • Job Name: Give your workflow a meaningful name (e.g., “CustomerDataETL”).
    • Schedule (Optional): You can set up a schedule to run this workflow automatically at specific intervals (e.g., daily, hourly).
    • Triggers (Optional): You can configure triggers based on events (e.g., when a file arrives in a specific location).
    • Notifications (Optional): Configure email alerts for job success or failure.
    • Tags (Optional): Add tags for better organization and tracking.
  7. Run the Workflow:
    • Click the “Run now” button in the top right corner to start a manual run of your workflow.
    • You can monitor the progress and status of each task in the workflow UI. You’ll see when each task starts, finishes, and whether it succeeded or failed.
    • You can view the logs for each task to see the output of your notebooks.

Key Concepts Illustrated:

  • Task Dependencies: The workflow defines the order in which the tasks are executed. The “TransformData” task only starts after “ExtractRawData” completes successfully, and “LoadToDeltaLake” starts after “TransformData”.
  • Notebook as a Unit of Work: Each step in the ETL process is encapsulated within a Databricks notebook.
  • Workflow UI for Orchestration: The Workflows UI provides a visual way to define, schedule, and monitor your data pipelines.
  • Job Clusters: Using a job cluster ensures that the necessary compute resources are provisioned for the workflow run and terminated afterward, optimizing costs.
  • Integration with Databricks Features: The workflow seamlessly integrates with other Databricks features like reading from cloud storage and writing to Delta Lake.

This is a basic example, but Databricks Workflows can handle much more complex pipelines with branching, conditional logic, different task types (e.g., Spark JAR, script, SQL task, Delta Live Tables task), and integration with other services. The Workflows UI provides a powerful and user-friendly way to orchestrate your data and processes in Databricks.

Agentic AI AI AI Agent API Automation auto scaling AWS aws bedrock Azure Chatbot cloud cpu database Databricks ELK gcp Generative AI gpu interview java Kafka LLM LLMs Micro Services monitoring Monolith Networking NLU Nodejs Optimization postgres productivity python Q&A RAG rasa rdbms ReactJS redis Spark spring boot sql time series Vertex AI xpu

Leave a Reply

Your email address will not be published. Required fields are marked *