Let’s explore some common Databricks data ingestion scenarios with code samples in PySpark (which is the primary language for data manipulation in Databricks notebooks).
Before You Begin
- Set up your environment: Ensure you have a Databricks workspace and have attached a notebook to a running cluster.
- Configure access: Depending on the data source, you might need to configure access credentials (e.g., AWS keys, Azure Storage connection strings, database credentials). Databricks Secrets can be used to manage sensitive information securely.
Common Ingestion Examples
1. Reading from Cloud Storage (e.g., AWS S3, Azure Blob Storage, ADLS Gen2, GCS)
Scenario: You have data files (CSV, JSON, Parquet, etc.) stored in cloud storage and want to load them into a Databricks DataFrame.
Configuration (Example for AWS S3 – adjust for other cloud providers):
You can configure access using instance profiles (recommended for production) or by providing access keys directly (less secure, suitable for development).
# Using instance profiles (no explicit credentials in code)
s3_path = "s3://your-bucket/path/to/your/data.csv"
# If you need to provide access keys (not recommended for production)
# spark.conf.set("fs.s3a.access.key", "YOUR_ACCESS_KEY")
# spark.conf.set("fs.s3a.secret.key", "YOUR_SECRET_KEY")
Reading CSV:
csv_df = spark.read.csv(s3_path, header=True, inferSchema=True)
csv_df.display()
Reading JSON:
json_path = "s3://your-bucket/path/to/your/data.json"
json_df = spark.read.json(json_path, multiLine=True) # Use multiLine=True if each JSON record spans multiple lines
json_df.display()
Reading Parquet (highly recommended for performance):
parquet_path = "s3://your-bucket/path/to/your/data.parquet"
parquet_df = spark.read.parquet(parquet_path)
parquet_df.display()
2. Reading from Relational Databases (JDBC)
Scenario: You want to load data from a relational database like PostgreSQL, MySQL, SQL Server, etc.
Configuration:
You’ll need the JDBC driver for your database and the connection details.
jdbc_url = "jdbc:postgresql://your_host:5432/your_database"
table_name = "your_table"
user = "your_user"
password = "your_password"
driver_class = "org.postgresql.Driver" # Example for PostgreSQL
Reading data:
jdbc_df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", table_name) \
.option("user", user) \
.option("password", password) \
.option("driver", driver_class) \
.load()
jdbc_df.display()
3. Reading from Streaming Sources (e.g., Apache Kafka)
Scenario: You want to ingest real-time data streams.
Configuration:
You’ll need the Kafka broker details and the topic name.
kafka_brokers = "your_kafka_broker1:9092,your_kafka_broker2:9092"
kafka_topic = "your_topic"
Reading the stream:
streaming_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_brokers) \
.option("subscribe", kafka_topic) \
.option("startingOffsets", "latest") # Or "earliest"
.load()
# Streaming DataFrames need to be processed with a writeStream
query = streaming_df.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("live_data") \
.start()
# To view the data (for testing purposes only, not for production)
spark.sql("SELECT * FROM live_data").show()
# To stop the stream
# query.stop()
4. Reading from Delta Lake
Scenario: You want to load data from tables stored in Delta Lake format.
Configuration:
If the Delta Lake table is registered in the Unity Catalog or the Hive metastore, you can directly query it by name. Otherwise, you can specify the path.
# If the Delta Lake table is registered
delta_table_name = "your_catalog.your_schema.your_delta_table"
delta_df = spark.table(delta_table_name)
delta_df.display()
# If you have the path to the Delta Lake table
delta_path = "dbfs:/user/hive/warehouse/your_delta_table" # Example DBFS path
delta_df_path = spark.read.format("delta").load(delta_path)
delta_df_path.display()
5. Reading from Databricks File System (DBFS)
Scenario: You have files stored in the Databricks File System (DBFS).
dbfs_path = "dbfs:/FileStore/your_data.csv"
dbfs_df = spark.read.csv(dbfs_path, header=True, inferSchema=True)
dbfs_df.display()
Best Practices for Data Ingestion in Databricks
- Use Parquet or Delta Lake format: These formats are highly optimized for Spark performance.
- Schema Inference vs. Explicit Schema: For development,
inferSchema=True
can be convenient, but for production, it’s recommended to define the schema explicitly for better performance and type safety. - Partitioning: If your data is large and frequently queried based on certain columns, consider partitioning your data in the storage layer (e.g., by date). Spark can then efficiently filter data based on these partitions.
- File Handling: For large directories with many small files, consider optimizing file reading to avoid performance bottlenecks.
- Error Handling: Implement robust error handling mechanisms, especially for production pipelines.
- Security: Manage access credentials securely using Databricks Secrets or instance profiles.
- Data Validation: Implement data validation steps after ingestion to ensure data quality.
These examples cover some of the most common data ingestion scenarios in Databricks. Remember to adapt the paths, connection details, and configurations to your specific data sources and environment.
Leave a Reply