Let’s walk through a sample Databricks Workflow using the Workflows UI. This example will demonstrate a simple ETL (Extract, Transform, Load) pipeline:
Scenario:
- Extract: Read raw customer data from a CSV file in cloud storage (e.g., S3, ADLS Gen2).
- Transform: Clean and transform the data using a Databricks notebook (e.g., filter out invalid records, standardize date formats).
- Load: Write the cleaned and transformed data into a Delta Lake table in your Databricks workspace.
Steps to Create the Workflow:
- Navigate to Workflows: In your Databricks workspace, click on the “Workflows” icon in the left sidebar.
- Create a New Workflow: Click the “Create Job” button. This will open the job creation interface.
- Configure the First Task (Extract – Reading the CSV):
- Task Name: Give this task a descriptive name, like “ExtractRawData”.
- Type: Select “Notebook”.
- Notebook Path: Browse to or enter the path of a Databricks notebook that contains the PySpark code to read the CSV file from your cloud storage.
Example
extract_raw_data.py
notebook content:from pyspark.sql.functions import col, to_date from pyspark.sql.types import StructType, StructField, StringType, IntegerType # Configure the path to your raw CSV data in cloud storage raw_data_path = "s3://your-bucket/raw_customer_data.csv" # Define the schema (optional, but recommended for production) schema = StructType([ StructField("customer_id", IntegerType(), True), StructField("name", StringType(), True), StructField("email", StringType(), True), StructField("signup_date", StringType(), True) ]) # Read the CSV data into a DataFrame raw_df = spark.read.csv(raw_data_path, header=True, schema=schema) # You might want to log the number of records read print(f"Number of raw records read: {raw_df.count()}") # Make the DataFrame available for the next task (optional, can use temporary views) raw_df.createOrReplaceTempView("raw_customer_data")
- Cluster: Choose the cluster where this task will run. You can select an existing all-purpose cluster or configure a new job cluster that will be created just for this run. For a workflow, using a job cluster is often preferred as it’s more resource-efficient.
- Parameters (Optional): If your notebook needs parameters (e.g., the input file path), you can configure them here.
- Click “Create Task”.
- Configure the Second Task (Transform – Cleaning and Transforming Data):
- Click the “+” icon next to the “ExtractRawData” task and select “Notebook”. This indicates that this task will run after the first one completes successfully.
- Task Name: Give it a name like “TransformData”.
- Notebook Path: Browse to or enter the path of a Databricks notebook that contains the PySpark code to transform the raw data.
Example
transform_data.py
notebook content:- Cluster: You can choose to run this task on the same cluster as the previous task or a different one.
- Click “Create Task”.
- Configure the Third Task (Load – Writing to Delta Lake):
- Click the “+” icon next to the “TransformData” task and select “Notebook”.
- Task Name: Give it a name like “LoadToDeltaLake”.
- Notebook Path: Browse to or enter the path of a Databricks notebook that contains the PySpark code to write the transformed data to a Delta Lake table.
Example
load_to_delta.py
notebook content:# Assuming the cleaned data is available in the temporary view "cleaned_customer_data" cleaned_df = spark.sql("SELECT * FROM cleaned_customer_data") # Define the path to your Delta Lake table delta_table_path = "dbfs:/user/hive/warehouse/customer_data_delta" delta_table_name = "customer_data_delta" # If you want to register it in the metastore # Write the DataFrame to Delta Lake cleaned_df.write.format("delta").mode("overwrite").save(delta_table_path) # Optionally, create or replace a Delta Lake table in the metastore # cleaned_df.write.format("delta").mode("overwrite").saveAsTable(delta_table_name) print(f"Data successfully loaded to Delta Lake at: {delta_table_path}")
- Cluster: Choose the appropriate cluster.
- Click “Create Task”.
- Configure Workflow Settings:
- Job Name: Give your workflow a meaningful name (e.g., “CustomerDataETL”).
- Schedule (Optional): You can set up a schedule to run this workflow automatically at specific intervals (e.g., daily, hourly).
- Triggers (Optional): You can configure triggers based on events (e.g., when a file arrives in a specific location).
- Notifications (Optional): Configure email alerts for job success or failure.
- Tags (Optional): Add tags for better organization and tracking.
- Run the Workflow:
- Click the “Run now” button in the top right corner to start a manual run of your workflow.
- You can monitor the progress and status of each task in the workflow UI. You’ll see when each task starts, finishes, and whether it succeeded or failed.
- You can view the logs for each task to see the output of your notebooks.
Key Concepts Illustrated:
- Task Dependencies: The workflow defines the order in which the tasks are executed. The “TransformData” task only starts after “ExtractRawData” completes successfully, and “LoadToDeltaLake” starts after “TransformData”.
- Notebook as a Unit of Work: Each step in the ETL process is encapsulated within a Databricks notebook.
- Workflow UI for Orchestration: The Workflows UI provides a visual way to define, schedule, and monitor your data pipelines.
- Job Clusters: Using a job cluster ensures that the necessary compute resources are provisioned for the workflow run and terminated afterward, optimizing costs.
- Integration with Databricks Features: The workflow seamlessly integrates with other Databricks features like reading from cloud storage and writing to Delta Lake.
This is a basic example, but Databricks Workflows can handle much more complex pipelines with branching, conditional logic, different task types (e.g., Spark JAR, Python script, SQL task, Delta Live Tables task), and integration with other services. The Workflows UI provides a powerful and user-friendly way to orchestrate your data and AI processes in Databricks.
Leave a Reply