Implementing a real-time recommendation engine AI agent on AWS requires a robust and scalable architecture. Here are implementation examples for key services in the tech stack:
1. Real-time Data Ingestion (Amazon Kinesis Data Streams):
You would use the AWS SDK (Boto3 in Python) in your application backend to put records into the Kinesis stream whenever a user interaction occurs.
import boto3
import json
import datetime
kinesis_client = boto3.client('kinesis', region_name='your-aws-region') # Replace region
stream_name = 'user-interaction-stream'
def put_user_interaction(user_id, item_id, interaction_type):
try:
payload = {
'user_id': user_id,
'item_id': item_id,
'type': interaction_type,
'timestamp': datetime.datetime.now().isoformat()
}
response = kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(payload).encode('utf-8'),
PartitionKey=user_id # Using user ID for partitioning
)
print(f"Put record to Kinesis: {response}")
except Exception as e:
print(f"Error putting record to Kinesis: {e}")
# Example usage in your application
# put_user_interaction('user123', 'item456', 'view')
# put_user_interaction('user123', 'item789', 'click')
Example of putting user interaction data into a Kinesis stream.
2. Real-time Data Processing & Feature Engineering (AWS Lambda):
The Lambda function (as shown in the previous response) would consume this Kinesis stream. Here’s a slightly expanded version that also interacts with ElastiCache:
import json
import boto3
import time
dynamodb = boto3.resource('dynamodb')
user_interactions_table = dynamodb.Table('user_interactions')
elasticache_client = boto3.client('elasticache', region_name='your-aws-region') # Replace region
redis_host = 'your-elasticache-redis-endpoint' # Replace with your Redis endpoint
redis_port = 6379
def lambda_handler(event, context):
import redis
r = redis.Redis(host=redis_host, port=redis_port)
for record in event['Records']:
if record['eventName'] == 'INSERT':
kinesis_data = json.loads(record['kinesis']['data'])
user_id = kinesis_data.get('user_id')
item_id = kinesis_data.get('item_id')
interaction_type = kinesis_data.get('type')
timestamp = int(time.time())
if user_id and item_id and interaction_type:
try:
user_interactions_table.put_item(
Item={
'user_id': user_id,
'timestamp': timestamp,
'item_id': item_id,
'type': interaction_type
}
)
print(f"Logged interaction: User {user_id}, Item {item_id}, Type {interaction_type}")
# Example: Update recent viewed items in ElastiCache (Redis)
recent_views_key = f"recent_views:{user_id}"
r.lpush(recent_views_key, item_id)
r.ltrim(recent_views_key, 0, 9) # Keep only the last 10 views
print(f"Updated recent views for user {user_id} in ElastiCache.")
except Exception as e:
print(f"Error processing record: {e}")
return {
'statusCode': 200,
'body': json.dumps('Successfully processed Kinesis records!')
}
Lambda function consuming Kinesis and interacting with DynamoDB and ElastiCache (Redis).
3. Low-Latency Data Store (Amazon ElastiCache – Redis):
The recommendation service (running on SageMaker or Lambda) would fetch real-time context from ElastiCache. Here’s a conceptual example within a Lambda function serving recommendations:
import boto3
import json
import redis
elasticache_host = 'your-elasticache-redis-endpoint' # Replace with your Redis endpoint
elasticache_port = 6379
dynamodb = boto3.resource('dynamodb')
user_profiles_table = dynamodb.Table('user_profiles')
# Assume a pre-trained recommendation model is available
def get_recommendations(user_id):
r = redis.Redis(host=elasticache_host, port=elasticache_port)
recent_views = r.lrange(f"recent_views:{user_id}", 0, -1)
recent_views = [item.decode('utf-8') for item in recent_views]
try:
user_profile = user_profiles_table.get_item(Key={'user_id': user_id}).get('Item', {})
except Exception as e:
print(f"Error fetching user profile: {e}")
user_profile = {}
# Logic to generate recommendations based on recent views and user profile
# This would involve calling your trained model or applying business rules
recommendations = generate_recommendations_logic(user_id, recent_views, user_profile)
return recommendations
def generate_recommendations_logic(user_id, recent_views, user_profile):
# Placeholder for your recommendation algorithm
# This could involve model inference using SageMaker endpoint
if recent_views:
return [f"Recommended item based on recent view: {recent_views[0]}", "Another recommendation"]
else:
return ["Popular item 1", "Popular item 2"]
def lambda_handler(event, context):
user_id = event.get('user_id')
if not user_id:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Missing user_id'})
}
recommendations = get_recommendations(user_id)
return {
'statusCode': 200,
'body': json.dumps({'recommendations': recommendations})
}
Lambda function serving real-time recommendations, fetching recent views from ElastiCache (Redis) and user profiles from DynamoDB.
4. Scalable Data Store (Amazon DynamoDB):
DynamoDB is used to store long-term data like user profiles and item metadata. Here’s a basic example of interacting with a user profiles table:
import boto3
dynamodb = boto3.resource('dynamodb')
user_profiles_table = dynamodb.Table('user_profiles')
def get_user_profile(user_id):
try:
response = user_profiles_table.get_item(Key={'user_id': user_id})
return response.get('Item')
except Exception as e:
print(f"Error fetching user profile: {e}")
return None
def update_user_preferences(user_id, preferences):
try:
response = user_profiles_table.update_item(
Key={'user_id': user_id},
UpdateExpression="set preferences = :p",
ExpressionAttributeValues={':p': preferences},
ReturnValues="UPDATED_NEW"
)
print(f"Updated user preferences: {response}")
except Exception as e:
print(f"Error updating user preferences: {e}")
# Example usage
# profile = get_user_profile('user123')
# if profile:
# print(f"User profile: {profile}")
# update_user_preferences('user123', {'category_interests': ['fiction', 'science']})
Example of interacting with a DynamoDB table for user profiles.
5. Recommendation Model Hosting & Inference (Amazon SageMaker Real-time Inference):
For production, hosting your trained model on a SageMaker endpoint provides scalability and managed infrastructure. You would then invoke this endpoint from your recommendation service (e.g., the Lambda function shown earlier).
import boto3
import json
sagemaker_runtime = boto3.client('sagemaker-runtime', region_name='your-aws-region') # Replace region
endpoint_name = 'your-recommendation-model-endpoint' # Replace with your SageMaker endpoint name
def get_sagemaker_recommendations(user_id, recent_views, user_profile):
payload = {
'user_id': user_id,
'recent_views': recent_views,
'user_profile': user_profile
}
try:
response = sagemaker_runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType='application/json',
Body=json.dumps(payload).encode('utf-8')
)
result = json.loads(response['Body'].read().decode('utf-8'))
return result.get('recommendations', [])
except Exception as e:
print(f"Error invoking SageMaker endpoint: {e}")
return []
# Example usage within your recommendation service
# recommendations = get_sagemaker_recommendations('user123', ['item456', 'item789'], {'age': 30, 'location': 'US'})
# print(f"SageMaker Recommendations: {recommendations}")
Example of invoking a SageMaker real-time inference endpoint to get recommendations.
These code snippets provide basic examples of how different AWS services can be implemented within a real-time recommendation engine. A complete production-ready system would involve more complex logic for data processing, feature engineering, model training, and error handling.
Real-time Recommendation Engine AI Agent on AWS with SageMaker
Here’s a conceptual implementation of the recommendation model training and deployment using Amazon SageMaker.
6. Recommendation Model Training and Deployment (Amazon SageMaker):
Training Script (Conceptual – train.py):
This script would be executed within a SageMaker training job. It loads data from S3, preprocesses it, trains a recommendation model (e.g., using TensorFlow, PyTorch, or a library like LightGBM or implicit for collaborative filtering), and saves the model artifacts to S3.
import os
import argparse
import pandas as pd
# import tensorflow as tf # Example for TensorFlow
# import torch # Example for PyTorch
# import lightgbm as lgb # Example for LightGBM
import joblib # For saving the trained model
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--model_dir', type=str)
parser.add_argument('--train_data', type=str, default='/opt/ml/input/data/train')
# Add other hyperparameters as needed
return parser.parse_args()
def load_data(train_data_path):
# Load your training data from the specified path (e.g., CSV files from S3)
all_files = os.listdir(train_data_path)
data_files = [os.path.join(train_data_path, f) for f in all_files if f.endswith('.csv')]
df = pd.concat((pd.read_csv(f) for f in data_files), ignore_index=True)
print(f"Loaded training data with shape: {df.shape}")
return df
def preprocess_data(df):
# Perform data preprocessing steps like encoding categorical features,
# creating interaction matrices, etc.
# Example:
# user_ids = df['user_id'].unique()
# item_ids = df['item_id'].unique()
# user_id_map = {user: i for i, user in enumerate(user_ids)}
# item_id_map = {item: i for i, item in enumerate(item_ids)}
# df['user_id_encoded'] = df['user_id'].map(user_id_map)
# df['item_id_encoded'] = df['item_id'].map(item_id_map)
return df
def train_model(df):
# Train your recommendation model here
# Example using LightGBM for ranking:
# lgb_data = lgb.Dataset(df[['user_id_encoded', 'item_id_encoded', '...other_features...']], label=df['target'])
# params = {'objective': 'lambdarank', 'metric': 'ndcg', ...}
# model = lgb.train(params, lgb_data, num_boost_round=100)
# Example using a simple collaborative filtering approach (implicit library):
# from implicit.als import AlternatingLeastSquares
# model = AlternatingLeastSquares(factors=50, regularization=0.1, iterations=15)
# user_item_matrix = create_user_item_matrix(df) # Implement this function
# model.fit(user_item_matrix.T)
# Placeholder - replace with your actual model training
print("Training recommendation model...")
model = "your_trained_model" # Replace with your trained model object
return model
def save_model(model, model_dir):
# Save the trained model to the specified directory
model_path = os.path.join(model_dir, 'model.joblib')
joblib.dump(model, model_path)
print(f"Saved trained model to: {model_path}")
if __name__ == '__main__':
args = parse_args()
train_df = load_data(args.train_data)
processed_df = preprocess_data(train_df)
model = train_model(processed_df)
save_model(model, args.model_dir)
Conceptual training script (train.py) for a SageMaker training job.
SageMaker Training Job (Conceptual Python using Boto3):
import boto3
import sagemaker
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
role = 'arn:aws:iam::your-aws-account-id:role/your-sagemaker-execution-role' # Replace with your IAM role
bucket = sagemaker_session.default_bucket()
prefix = 'recommendation-engine'
# Define paths to your training data in S3
train_data_s3_uri = f's3://{bucket}/{prefix}/training-data'
# Define the SageMaker Estimator
estimator = Estimator(
entry_point='train.py', # Your training script
role=role,
instance_count=1,
instance_type='ml.m5.large', # Choose an appropriate instance type
framework_version='...', # Specify your framework version (e.g., '2.11' for TensorFlow, '1.13' for PyTorch)
py_version='py39',
output_path=f's3://{bucket}/{prefix}/model',
sagemaker_session=sagemaker_session,
hyperparameters={
# Add any hyperparameters your training script needs
}
)
# Configure the training input
train_input = TrainingInput(s3_data=train_data_s3_uri, content_type='text/csv') # Adjust content type
# Run the training job
estimator.fit({'train': train_input})
training_job_name = estimator.latest_training_job.job_name
print(f"Training job name: {training_job_name}")
Conceptual Python code to launch a SageMaker training job.
SageMaker Model Deployment (Conceptual Python using Boto3):
After training, you would deploy the model to a SageMaker real-time inference endpoint.
import boto3
import sagemaker
from sagemaker.tensorflow import TensorFlowModel # Or PyTorchModel, FrameworkModel
# Assuming you saved a TensorFlow model
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
role = 'arn:aws:iam::your-aws-account-id:role/your-sagemaker-execution-role' # Replace with your IAM role
bucket = sagemaker_session.default_bucket()
prefix = 'recommendation-engine'
model_path = f's3://{bucket}/{prefix}/model/model.joblib' # Adjust based on your training script output
# Define the SageMaker Model
model = sagemaker.model.Model(
model_data=estimator.model_data, # Or specify model_path directly
role=role,
entry_point='inference.py', # Your inference script (see next section)
framework_version='...', # Match your training framework version
py_version='py39',
sagemaker_session=sagemaker_session
)
endpoint_name = 'your-recommendation-model-endpoint' # Choose a unique endpoint name
instance_type = 'ml.t2.medium' # Choose an appropriate instance type
initial_instance_count = 1
# Deploy the model to a real-time inference endpoint
predictor = model.deploy(
endpoint_name=endpoint_name,
instance_type=instance_type,
initial_instance_count=initial_instance_count
)
print(f"SageMaker endpoint name: {endpoint_name}")
Conceptual Python code to deploy the trained model to a SageMaker real-time inference endpoint.
Inference Script (Conceptual – inference.py):
This script would be loaded by the SageMaker inference container. It defines how to load the trained model and how to handle prediction requests.
import os
import joblib # For loading the saved model
# import tensorflow as tf
# import torch
def model_fn(model_dir):
# Load the trained model
model_path = os.path.join(model_dir, 'model.joblib')
model = joblib.load(model_path)
return model
def predict_fn(data, model):
# Process the input data and generate predictions using the loaded model
input_data = data.get('features') # Adjust based on your input format
user_id = data.get('user_id')
recent_views = data.get('recent_views')
user_profile = data.get('user_profile')
# Preprocess the input data for the model if needed
# Generate recommendations using the model
# Example using a loaded collaborative filtering model:
# user_index = user_id_map.get(user_id)
# if user_index is not None:
# n_recommendations = 10
# item_ids = model.recommend(user_index, user_item_matrix[user_index], N=n_recommendations)
# recommended_items = [item_id_reverse_map.get(item[0]) for item in item_ids]
# return {'recommendations': recommended_items}
# else:
# return {'recommendations': ['fallback_item1', 'fallback_item2']}
# Placeholder - replace with your actual inference logic
print(f"Generating predictions for user: {user_id}, recent views: {recent_views}, profile: {user_profile}")
recommendations = ["predicted_item_1", "predicted_item_2"]
return {'recommendations': recommendations}
def input_fn(request_body, request_content_type):
if request_content_type == 'application/json':
return json.loads(request_body)
else:
raise ValueError(f"Unsupported content type: {request_content_type}")
def output_fn(prediction, content_type):
if content_type == 'application/json':
return json.dumps(prediction)
else:
raise ValueError(f"Unsupported content type: {content_type}")
Conceptual inference script (inference.py) for the SageMaker endpoint.
These snippets illustrate the key steps involved in training and deploying a recommendation model using Amazon SageMaker. You would need to adapt the training and inference scripts to your specific model and data.
This is a simplified example of training data for a recommendation engine. The actual data schema and content will depend on your specific use case and the chosen recommendation algorithm. This example includes user interactions (views, clicks, purchases) with items.
Sample User Interactions (CSV Format):
user_id,item_id,interaction_type,timestamp
user1,itemA,view,2025-04-29 10:00:00
user2,itemB,view,2025-04-29 10:05:00
user1,itemB,click,2025-04-29 10:10:00
user3,itemC,view,2025-04-29 10:15:00
user2,itemC,purchase,2025-04-29 10:20:00
user1,itemD,view,2025-04-29 10:25:00
user4,itemA,view,2025-04-29 10:30:00
user3,itemB,click,2025-04-29 10:35:00
user4,itemC,purchase,2025-04-29 10:40:00
user1,itemA,view,2025-04-29 10:45:00
user2,itemD,view,2025-04-29 10:50:00
user3,itemA,click,2025-04-29 10:55:00
user4,itemB,view,2025-04-29 11:00:00
user1,itemC,purchase,2025-04-29 11:05:00
user2,itemA,view,2025-04-29 11:10:00
user3,itemD,view,2025-04-29 11:15:00
user4,itemD,click,2025-04-29 11:20:00
user1,itemB,view,2025-04-29 11:25:00
user2,itemC,view,2025-04-29 11:30:00
user3,itemB,purchase,2025-04-29 11:35:00
Explanation of Columns:
user_id
: Unique identifier for a user.item_id
: Unique identifier for an item.interaction_type
: The type of interaction (e.g., ‘view’, ‘click’, ‘purchase’, ‘add_to_cart’). You might have different types depending on your application.timestamp
: The time when the interaction occurred.
Sample User Features (CSV Format – Optional):
Depending on your model, you might also include user features.
user_id,age,gender,location,interests
user1,30,male,USA,technology|books
user2,25,female,Canada,fashion|music
user3,40,male,UK,sports|news
user4,35,female,USA,cooking|travel
Explanation of Columns:
user_id
: Unique identifier for a user (must match the user IDs in the interactions data).age
: User’s age.gender
: User’s gender.location
: User’s location.interests
: User’s interests (can be a pipe-separated string or other format).
Sample Item Features (CSV Format – Optional):
Similarly, you might include features about the items.
item_id,category,brand,price
itemA,electronics,BrandX,100.00
itemB,books,PublisherY,15.00
itemC,electronics,BrandZ,250.00
itemD,clothing,BrandW,50.00
Explanation of Columns:
item_id
: Unique identifier for an item (must match the item IDs in the interactions data).category
: Category of the item.brand
: Brand of the item.price
: Price of the item.
How to Use This Data:
- Save to CSV files: Save these sample data snippets into separate CSV files (e.g.,
user_interactions.csv
,user_features.csv
,item_features.csv
). - Upload to S3: Upload these CSV files to an S3 bucket that your SageMaker training job can access (as specified in the
train_data_s3_uri
in your SageMaker training script). - Adapt your training script: Modify your
train.py
script to load and process these CSV files. You’ll need to implement theload_data
andpreprocess_data
functions to correctly handle this data format. - Feature Engineering: In your preprocessing step, you’ll likely want to engineer features relevant to your chosen recommendation algorithm. For example, you might create user-item interaction matrices for collaborative filtering or encode categorical features for other models.
Remember that this is a very basic example. Real-world training datasets for recommendation systems can be much larger and more complex, potentially including more interaction types, user features, item features, and temporal information.
Leave a Reply