Implementation # 1. Calculate daily/weekly/monthly sales trends.
This query calculates the total sales for each day, week, and month. It assumes you have an orders
table with an order_date
and a total_amount
.
-- Daily Sales Trend
SELECT
order_date,
SUM(total_amount) AS daily_sales
FROM orders
GROUP BY order_date
ORDER BY order_date;
-- Weekly Sales Trend
SELECT
DATE_TRUNC('week', order_date) AS week_start_date,
SUM(total_amount) AS weekly_sales
FROM orders
GROUP BY week_start_date
ORDER BY week_start_date;
-- Monthly Sales Trend
SELECT
DATE_TRUNC('month', order_date) AS month_start_date,
SUM(total_amount) AS monthly_sales
FROM orders
GROUP BY month_start_date
ORDER BY month_start_date;
2. Identify top-selling products by category.
This query identifies the top 5 selling products within each category. It assumes you have orders
, order_items
, and products
tables, with a category_id
in the products
table.
SELECT
p.category_id,
c.name AS category_name,
p.product_id,
p.name AS product_name,
SUM(oi.quantity) AS total_quantity_sold,
RANK() OVER (PARTITION BY p.category_id ORDER BY SUM(oi.quantity) DESC) as rank
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
JOIN categories c ON p.category_id = c.category_id
GROUP BY p.category_id, c.name, p.product_id, p.name
ORDER BY p.category_id, total_quantity_sold DESC
QUALIFY rank <= 5
3. Analyze customer purchase patterns (market basket analysis).
This query identifies frequently purchased product combinations (association rules). It’s a simplified version of market basket analysis. For a full implementation, you would typically use specialized algorithms (like Apriori or FP-Growth, available in MLlib) in conjunction with Spark SQL.
-- This gives co-occurence of products in orders.
SELECT
oi1.product_id AS product_id_1,
p1.name AS product_name_1,
oi2.product_id AS product_id_2,
p2.name AS product_name_2,
COUNT(DISTINCT oi1.order_id) AS co_occurrence_count
FROM order_items oi1
JOIN order_items oi2 ON oi1.order_id = oi2.order_id AND oi1.product_id != oi2.product_id
JOIN products p1 ON oi1.product_id = p1.product_id
JOIN products p2 ON oi2.product_id = p2.product_id
GROUP BY oi1.product_id, p1.name, oi2.product_id, p2.name
ORDER BY co_occurrence_count DESC
LIMIT 100; -- Limit to the top 100 co-occurrences
4. Predict future sales using time series analysis.
Spark SQL itself doesn’t have built-in time series forecasting. However, you can use it to prepare your data for time series models. This example prepares data for a monthly sales forecast. You would then typically use a library like spark.ml
or Prophet
.
SELECT
DATE_TRUNC('month', order_date) AS month_start_date,
SUM(total_amount) AS monthly_sales,
COUNT(DISTINCT order_id) as order_count
FROM orders
GROUP BY month_start_date
ORDER BY month_start_date;
5. Calculate customer lifetime value (CLTV).
This query calculates CLTV using a simplified formula. Note that CLTV calculation can be complex and vary based on the business model.
WITH customer_lifetime_data AS (
SELECT
o.customer_id,
c.first_name,
c.last_name,
MIN(o.order_date) AS first_purchase_date,
MAX(o.order_date) AS last_purchase_date,
COUNT(o.order_id) AS order_count,
SUM(o.total_amount) AS total_revenue,
(MAX(o.order_date) - MIN(o.order_date)) / 365 AS customer_lifetime_years -- Simplified lifetime
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
GROUP BY o.customer_id, c.first_name, c.last_name
)
SELECT
customer_id,
first_name,
last_name,
order_count,
total_revenue,
customer_lifetime_years,
(total_revenue / order_count) AS average_order_value,
(order_count / customer_lifetime_years) AS purchase_frequency,
(total_revenue / order_count) * (order_count / customer_lifetime_years) AS cltv -- Simplified CLTV
FROM customer_lifetime_data
ORDER BY cltv DESC;
Running Spark SQL Code
Here’s a sample Python script using PySpark to run the Spark SQL queries provided earlier. This script assumes you have a SparkSession set up and have loaded your data into DataFrames (e.g., orders_df
, products_df
, customers_df
, etc.).
PySpark Example
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_trunc, sum, rank
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
# 1. Create a SparkSession
spark = SparkSession.builder.appName("ECommerceAnalysis").getOrCreate()
# 2. Define the schema for the RDBMS tables (replace with your actual schema)
customer_schema = StructType([
StructField("customer_id", IntegerType(), True),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("email", StringType(), True),
StructField("registration_date", DateType(), True)
])
product_schema = StructType([
StructField("product_id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("description", StringType(), True),
StructField("price", IntegerType(), True),
StructField("category_id", IntegerType(), True)
])
order_schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("customer_id", IntegerType(), True),
StructField("order_date", DateType(), True),
StructField("total_amount", IntegerType(), True)
])
order_item_schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("product_id", IntegerType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", IntegerType(), True)
])
category_schema = StructType([
StructField("category_id", IntegerType(), True),
StructField("name", StringType(), True)
])
review_schema = StructType([
StructField("review_id", IntegerType(), True),
StructField("customer_id", IntegerType(), True),
StructField("product_id", IntegerType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("review_date", DateType(), True)
])
# 3. Read data from RDBMS (replace with your actual connection details and tables)
rdbms_url = "jdbc:postgresql://:/"
rdbms_user = ""
rdbms_password = ""
driver_class = "org.postgresql.Driver"
# Function to read a table from the RDBMS
def read_table(table_name, schema):
return spark.read.jdbc(
url=rdbms_url,
table=table_name,
properties={
"user": rdbms_user,
"password": rdbms_password,
"driver": driver_class
},
schema=schema
)
# Read tables into Spark DataFrames
customers_df = read_table("customers", customer_schema)
products_df = read_table("products", product_schema)
orders_df = read_table("orders", order_schema)
order_items_df = read_table("order_items", order_item_schema)
categories_df = read_table("categories", category_schema)
reviews_df = read_table("reviews", review_schema)
# Register DataFrames as temporary views, so we can use SQL
customers_df.createOrReplaceTempView("customers")
products_df.createOrReplaceTempView("products")
orders_df.createOrReplaceTempView("orders")
order_items_df.createOrReplaceTempView("order_items")
categories_df.createOrReplaceTempView("categories")
reviews_df.createOrReplaceTempView("reviews")
# 4. Execute Spark SQL Queries
# 4.1 Calculate daily/weekly/monthly sales trends.
daily_sales_df = spark.sql("""
SELECT
order_date,
SUM(total_amount) AS daily_sales
FROM orders
GROUP BY order_date
ORDER BY order_date
""")
daily_sales_df.show()
weekly_sales_df = spark.sql("""
SELECT
DATE_TRUNC('week', order_date) AS week_start_date,
SUM(total_amount) AS weekly_sales
FROM orders
GROUP BY week_start_date
ORDER BY week_start_date
""")
weekly_sales_df.show()
monthly_sales_df = spark.sql("""
SELECT
DATE_TRUNC('month', order_date) AS month_start_date,
SUM(total_amount) AS monthly_sales
FROM orders
GROUP BY month_start_date
ORDER BY month_start_date
""")
monthly_sales_df.show()
# 4.2 Identify top-selling products by category.
top_selling_products_df = spark.sql("""
SELECT
p.category_id,
c.name AS category_name,
p.product_id,
p.name AS product_name,
SUM(oi.quantity) AS total_quantity_sold,
RANK() OVER (PARTITION BY p.category_id ORDER BY SUM(oi.quantity) DESC) as rank
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
JOIN categories c ON p.category_id = c.category_id
GROUP BY p.category_id, c.name, p.product_id, p.name
ORDER BY p.category_id, total_quantity_sold DESC
QUALIFY rank <= 5
""")
top_selling_products_df.show()
# 4.3 Analyze customer purchase patterns (market basket analysis).
market_basket_df = spark.sql("""
SELECT
oi1.product_id AS product_id_1,
p1.name AS product_name_1,
oi2.product_id AS product_id_2,
p2.name AS product_name_2,
COUNT(DISTINCT oi1.order_id) AS co_occurrence_count
FROM order_items oi1
JOIN order_items oi2 ON oi1.order_id = oi2.order_id AND oi1.product_id != oi2.product_id
JOIN products p1 ON oi1.product_id = p1.product_id
JOIN products p2 ON oi2.product_id = p2.product_id
GROUP BY oi1.product_id, p1.name, oi2.product_id, p2.name
ORDER BY co_occurrence_count DESC
LIMIT 100
""")
market_basket_df.show()
# 4.4 Predict future sales using time series analysis.
monthly_sales_for_forecast_df = spark.sql("""
SELECT
DATE_TRUNC('month', order_date) AS month_start_date,
SUM(total_amount) AS monthly_sales,
COUNT(DISTINCT order_id) as order_count
FROM orders
GROUP BY month_start_date
ORDER BY month_start_date
""")
monthly_sales_for_forecast_df.show()
# 4.5 Calculate customer lifetime value (CLTV).
cltv_df = spark.sql("""
WITH customer_lifetime_data AS (
SELECT
o.customer_id,
c.first_name,
c.last_name,
MIN(o.order_date) AS first_purchase_date,
MAX(o.order_date) AS last_purchase_date,
COUNT(o.order_id) AS order_count,
SUM(o.total_amount) AS total_revenue,
(MAX(o.order_date) - MIN(o.order_date)) / 365 AS customer_lifetime_years
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
GROUP BY o.customer_id, c.first_name, c.last_name
)
SELECT
customer_id,
first_name,
last_name,
order_count,
total_revenue,
customer_lifetime_years,
(total_revenue / order_count) AS average_order_value,
(order_count / customer_lifetime_years) AS purchase_frequency,
(total_revenue / order_count) * (order_count / customer_lifetime_years) AS cltv
FROM customer_lifetime_data
ORDER BY cltv DESC
""")
cltv_df.show()
# 5. Stop the SparkSession
spark.stop()
Explanation:
- Import Libraries: Imports necessary Spark SQL functions and data types.
- Create SparkSession: Initializes a SparkSession, which is the entry point to Spark.
- Define Schemas: Defines the schema for your RDBMS tables. This is crucial for Spark to correctly interpret the data. Replace these with your actual table structures.
- Read from RDBMS: Reads data from your RDBMS tables into Spark DataFrames. You’ll need to configure the connection details (URL, user, password, driver) to match your database.
- Register as Temporary Views: Registers the DataFrames as temporary views. This allows you to query them using Spark SQL.
- Execute Queries: Executes the Spark SQL queries from the previous response. Each query is executed using
spark.sql()
, and the resulting DataFrame is displayed using.show()
. - Stop SparkSession: Cleanly stops the SparkSession.
Key Points:
- Replace Placeholders: Make sure to replace the placeholder values for your RDBMS connection details (
rdbms_url
,rdbms_user
,rdbms_password
,driver_class
) and table names with your actual values. - DataFrames: The data is loaded into DataFrames, which are Spark’s way of representing structured data.
- Spark SQL: The queries are written in Spark SQL, which is very similar to standard SQL.
.show()
: The.show()
method displays the first 20 rows of the DataFrame in a tabular format. This is useful for seeing the results of your queries.- Error Handling: This example doesn’t include explicit error handling, but in a production environment, you would want to add error handling to make your code more robust.
- Optimization: For large datasets, you’ll want to consider optimizing your Spark code for performance (e.g., by partitioning your data, using appropriate storage formats, and tuning Spark configuration parameters).
Leave a Reply