Estimated reading time: 10 minutes

Implementing few e-Commerce queries in Spark SQL

Spark SQL Implementation – E-commerce & Retail (First 5)

Implementation # 1. Calculate daily/weekly/monthly sales trends.

This query calculates the total sales for each day, week, and month. It assumes you have an orders table with an order_date and a total_amount.


    -- Daily Sales Trend
    SELECT
        order_date,
        SUM(total_amount) AS daily_sales
    FROM orders
    GROUP BY order_date
    ORDER BY order_date;

    -- Weekly Sales Trend
    SELECT
        DATE_TRUNC('week', order_date) AS week_start_date,
        SUM(total_amount) AS weekly_sales
    FROM orders
    GROUP BY week_start_date
    ORDER BY week_start_date;

    -- Monthly Sales Trend
    SELECT
        DATE_TRUNC('month', order_date) AS month_start_date,
        SUM(total_amount) AS monthly_sales
    FROM orders
    GROUP BY month_start_date
    ORDER BY month_start_date;
    

2. Identify top-selling products by category.

This query identifies the top 5 selling products within each category. It assumes you have orders, order_items, and products tables, with a category_id in the products table.


    SELECT
        p.category_id,
        c.name AS category_name,
        p.product_id,
        p.name AS product_name,
        SUM(oi.quantity) AS total_quantity_sold,
        RANK() OVER (PARTITION BY p.category_id ORDER BY SUM(oi.quantity) DESC) as rank
    FROM orders o
    JOIN order_items oi ON o.order_id = oi.order_id
    JOIN products p ON oi.product_id = p.product_id
    JOIN categories c ON p.category_id = c.category_id
    GROUP BY p.category_id, c.name, p.product_id, p.name
    ORDER BY p.category_id, total_quantity_sold DESC
    QUALIFY rank <= 5
    

3. Analyze customer purchase patterns (market basket analysis).

This query identifies frequently purchased product combinations (association rules). It’s a simplified version of market basket analysis. For a full implementation, you would typically use specialized (like Apriori or FP-Growth, available in MLlib) in conjunction with .


    --  This gives co-occurence of products in orders.
    SELECT
        oi1.product_id AS product_id_1,
        p1.name AS product_name_1,
        oi2.product_id AS product_id_2,
        p2.name AS product_name_2,
        COUNT(DISTINCT oi1.order_id) AS co_occurrence_count
    FROM order_items oi1
    JOIN order_items oi2 ON oi1.order_id = oi2.order_id AND oi1.product_id != oi2.product_id
    JOIN products p1 ON oi1.product_id = p1.product_id
    JOIN products p2 ON oi2.product_id = p2.product_id
    GROUP BY oi1.product_id, p1.name, oi2.product_id, p2.name
    ORDER BY co_occurrence_count DESC
    LIMIT 100; --  Limit to the top 100 co-occurrences
    

4. Predict future sales using analysis.

Spark SQL itself doesn’t have built-in time series forecasting. However, you can use it to prepare your data for time series models. This example prepares data for a monthly sales forecast. You would then typically use a library like spark.ml or Prophet.


    SELECT
        DATE_TRUNC('month', order_date) AS month_start_date,
        SUM(total_amount) AS monthly_sales,
        COUNT(DISTINCT order_id) as order_count
    FROM orders
    GROUP BY month_start_date
    ORDER BY month_start_date;
    

5. Calculate customer lifetime value (CLTV).

This query calculates CLTV using a simplified formula. Note that CLTV calculation can be complex and vary based on the business model.


    WITH customer_lifetime_data AS (
        SELECT
            o.customer_id,
            c.first_name,
            c.last_name,
            MIN(o.order_date) AS first_purchase_date,
            MAX(o.order_date) AS last_purchase_date,
            COUNT(o.order_id) AS order_count,
            SUM(o.total_amount) AS total_revenue,
            (MAX(o.order_date) - MIN(o.order_date)) / 365 AS customer_lifetime_years -- Simplified lifetime
        FROM orders o
        JOIN customers c ON o.customer_id = c.customer_id
        GROUP BY o.customer_id, c.first_name, c.last_name
    )
    SELECT
        customer_id,
        first_name,
        last_name,
        order_count,
        total_revenue,
        customer_lifetime_years,
        (total_revenue / order_count) AS average_order_value,
        (order_count / customer_lifetime_years) AS purchase_frequency,
        (total_revenue / order_count) * (order_count / customer_lifetime_years) AS cltv -- Simplified CLTV
    FROM customer_lifetime_data
    ORDER BY cltv DESC;
    
Running Spark SQL Code

Running Spark SQL Code

Here’s a sample script using PySpark to run the Spark SQL queries provided earlier. This script assumes you have a SparkSession set up and have loaded your data into DataFrames (e.g., orders_df, products_df, customers_df, etc.).

PySpark Example


from pyspark.sql import SparkSession
from pyspark.sql.functions import date_trunc, sum, rank
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

# 1.  Create a SparkSession
spark = SparkSession.builder.appName("ECommerceAnalysis").getOrCreate()

# 2.  Define the schema for the  tables (replace with your actual schema)
customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("registration_date", DateType(), True)
])

product_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("description", StringType(), True),
    StructField("price", IntegerType(), True),
    StructField("category_id", IntegerType(), True)
])

order_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", DateType(), True),
    StructField("total_amount", IntegerType(), True)
])

order_item_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", IntegerType(), True)
])

category_schema = StructType([
    StructField("category_id", IntegerType(), True),
    StructField("name", StringType(), True)
])

review_schema = StructType([
    StructField("review_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("comment", StringType(), True),
    StructField("review_date", DateType(), True)
])
# 3.  Read data from RDBMS (replace with your actual connection details and tables)
rdbms_url = "jdbc:postgresql://:/"
rdbms_user = ""
rdbms_password = ""
driver_class = "org.postgresql.Driver"

# Function to read a table from the RDBMS
def read_table(table_name, schema):
    return spark.read.jdbc(
        url=rdbms_url,
        table=table_name,
        properties={
            "user": rdbms_user,
            "password": rdbms_password,
            "driver": driver_class
        },
        schema=schema
    )
# Read tables into Spark DataFrames
customers_df = read_table("customers", customer_schema)
products_df = read_table("products", product_schema)
orders_df = read_table("orders", order_schema)
order_items_df = read_table("order_items", order_item_schema)
categories_df = read_table("categories", category_schema)
reviews_df = read_table("reviews", review_schema)


# Register DataFrames as temporary views, so we can use SQL
customers_df.createOrReplaceTempView("customers")
products_df.createOrReplaceTempView("products")
orders_df.createOrReplaceTempView("orders")
order_items_df.createOrReplaceTempView("order_items")
categories_df.createOrReplaceTempView("categories")
reviews_df.createOrReplaceTempView("reviews")


# 4.  Execute Spark SQL Queries

# 4.1 Calculate daily/weekly/monthly sales trends.
daily_sales_df = spark.sql("""
    SELECT
        order_date,
        SUM(total_amount) AS daily_sales
    FROM orders
    GROUP BY order_date
    ORDER BY order_date
""")
daily_sales_df.show()

weekly_sales_df = spark.sql("""
    SELECT
        DATE_TRUNC('week', order_date) AS week_start_date,
        SUM(total_amount) AS weekly_sales
    FROM orders
    GROUP BY week_start_date
    ORDER BY week_start_date
""")
weekly_sales_df.show()

monthly_sales_df = spark.sql("""
    SELECT
        DATE_TRUNC('month', order_date) AS month_start_date,
        SUM(total_amount) AS monthly_sales
    FROM orders
    GROUP BY month_start_date
    ORDER BY month_start_date
""")
monthly_sales_df.show()


# 4.2 Identify top-selling products by category.
top_selling_products_df = spark.sql("""
    SELECT
        p.category_id,
        c.name AS category_name,
        p.product_id,
        p.name AS product_name,
        SUM(oi.quantity) AS total_quantity_sold,
        RANK() OVER (PARTITION BY p.category_id ORDER BY SUM(oi.quantity) DESC) as rank
    FROM orders o
    JOIN order_items oi ON o.order_id = oi.order_id
    JOIN products p ON oi.product_id = p.product_id
    JOIN categories c ON p.category_id = c.category_id
    GROUP BY p.category_id, c.name, p.product_id, p.name
    ORDER BY p.category_id, total_quantity_sold DESC
    QUALIFY rank <= 5
""")
top_selling_products_df.show()



# 4.3 Analyze customer purchase patterns (market basket analysis).
market_basket_df = spark.sql("""
    SELECT
        oi1.product_id AS product_id_1,
        p1.name AS product_name_1,
        oi2.product_id AS product_id_2,
        p2.name AS product_name_2,
        COUNT(DISTINCT oi1.order_id) AS co_occurrence_count
    FROM order_items oi1
    JOIN order_items oi2 ON oi1.order_id = oi2.order_id AND oi1.product_id != oi2.product_id
    JOIN products p1 ON oi1.product_id = p1.product_id
    JOIN products p2 ON oi2.product_id = p2.product_id
    GROUP BY oi1.product_id, p1.name, oi2.product_id, p2.name
    ORDER BY co_occurrence_count DESC
    LIMIT 100
""")
market_basket_df.show()


# 4.4 Predict future sales using time series analysis.
monthly_sales_for_forecast_df = spark.sql("""
    SELECT
        DATE_TRUNC('month', order_date) AS month_start_date,
        SUM(total_amount) AS monthly_sales,
        COUNT(DISTINCT order_id) as order_count
    FROM orders
    GROUP BY month_start_date
    ORDER BY month_start_date
""")
monthly_sales_for_forecast_df.show()



# 4.5 Calculate customer lifetime value (CLTV).
cltv_df = spark.sql("""
    WITH customer_lifetime_data AS (
        SELECT
            o.customer_id,
            c.first_name,
            c.last_name,
            MIN(o.order_date) AS first_purchase_date,
            MAX(o.order_date) AS last_purchase_date,
            COUNT(o.order_id) AS order_count,
            SUM(o.total_amount) AS total_revenue,
            (MAX(o.order_date) - MIN(o.order_date)) / 365 AS customer_lifetime_years
        FROM orders o
        JOIN customers c ON o.customer_id = c.customer_id
        GROUP BY o.customer_id, c.first_name, c.last_name
    )
    SELECT
        customer_id,
        first_name,
        last_name,
        order_count,
        total_revenue,
        customer_lifetime_years,
        (total_revenue / order_count) AS average_order_value,
        (order_count / customer_lifetime_years) AS purchase_frequency,
        (total_revenue / order_count) * (order_count / customer_lifetime_years) AS cltv
    FROM customer_lifetime_data
    ORDER BY cltv DESC
""")
cltv_df.show()


# 5. Stop the SparkSession
spark.stop()
    

Explanation:

  1. Import Libraries: Imports necessary Spark SQL functions and data types.
  2. Create SparkSession: Initializes a SparkSession, which is the entry point to Spark.
  3. Define Schemas: Defines the schema for your RDBMS tables. This is crucial for Spark to correctly interpret the data. Replace these with your actual table structures.
  4. Read from RDBMS: Reads data from your RDBMS tables into Spark DataFrames. You’ll need to configure the connection details (URL, user, password, driver) to match your .
  5. Register as Temporary Views: Registers the DataFrames as temporary views. This allows you to query them using Spark SQL.
  6. Execute Queries: Executes the Spark SQL queries from the previous response. Each query is executed using spark.sql(), and the resulting DataFrame is displayed using .show().
  7. Stop SparkSession: Cleanly stops the SparkSession.

Key Points:

  • Replace Placeholders: Make sure to replace the placeholder values for your RDBMS connection details (rdbms_url, rdbms_user, rdbms_password, driver_class) and table names with your actual values.
  • DataFrames: The data is loaded into DataFrames, which are Spark’s way of representing structured data.
  • Spark SQL: The queries are written in Spark SQL, which is very similar to standard SQL.
  • .show(): The .show() method displays the first 20 rows of the DataFrame in a tabular format. This is useful for seeing the results of your queries.
  • Error Handling: This example doesn’t include explicit error handling, but in a production environment, you would want to add error handling to make your code more robust.
  • : For large datasets, you’ll want to consider optimizing your Spark code for (e.g., by partitioning your data, using appropriate storage formats, and tuning Spark configuration parameters).

Agentic AI (40) AI Agent (27) airflow (7) Algorithm (29) Algorithms (70) apache (51) apex (5) API (115) Automation (59) Autonomous (48) auto scaling (5) AWS (63) aws bedrock (1) Azure (41) BigQuery (22) bigtable (2) blockchain (3) Career (6) Chatbot (20) cloud (128) cosmosdb (3) cpu (41) cuda (14) Cybersecurity (9) database (121) Databricks (18) Data structure (16) Design (90) dynamodb (9) ELK (2) embeddings (31) emr (3) flink (10) gcp (26) Generative AI (18) gpu (23) graph (34) graph database (11) graphql (4) image (39) indexing (25) interview (7) java (33) json (73) Kafka (31) LLM (48) LLMs (41) Mcp (4) monitoring (109) Monolith (6) mulesoft (4) N8n (9) Networking (14) NLU (5) node.js (14) Nodejs (6) nosql (26) Optimization (77) performance (167) Platform (106) Platforms (81) postgres (4) productivity (20) programming (41) pseudo code (1) python (90) pytorch (19) RAG (54) rasa (5) rdbms (5) ReactJS (1) realtime (2) redis (15) Restful (6) rust (2) salesforce (15) Spark (34) sql (58) tensor (11) time series (18) tips (12) tricks (29) use cases (67) vector (50) vector db (5) Vertex AI (21) Workflow (57)

Leave a Reply