While a full-fledged MLOps pipeline involves integrating various tools and platforms, here are some illustrative code snippets demonstrating key MLOps concepts using popular Python libraries and tools. These examples focus on individual stages and can be combined to build a more comprehensive pipeline.
1. Data Versioning with DVC (Data Version Control):
This isn’t Python code, but DVC is a crucial MLOps tool for tracking data changes.
Bash
# Initialize DVC in your project
dvc init
# Track your raw data directory
dvc add data/raw
# Commit the changes (similar to Git)
git commit -m "Track raw data with DVC"
# Make changes to your data...
# Update the DVC tracking
dvc add data/raw
# Commit the new data version
git commit -m "Update raw data version"
# Push data to a remote storage (e.g., S3, Google Cloud Storage, Azure Blob Storage)
dvc remote add myremote s3://your-s3-bucket/your-dvc-remote
dvc push
2. Experiment Tracking with MLflow:
Python
import mlflow
import mlflow.sklearn
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import numpy as np
# Set the MLflow tracking URI (can be a local directory or a remote server)
mlflow.set_tracking_uri("mlruns")
# Start an MLflow experiment
with mlflow.start_run():
# Log parameters
alpha = 0.1
mlflow.log_param("alpha", alpha)
# Load sample data (replace with your actual data loading)
X = np.array([[1], [2], [3], [4], [5]])
y = np.array([2, 4, 5, 4, 5])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train a model
model = LinearRegression()
model.fit(X_train, y_train)
# Make predictions
predictions = model.predict(X_test)
# Log metrics
rmse = np.sqrt(mean_squared_error(y_test, predictions))
mlflow.log_metric("rmse", rmse)
# Log the trained model
mlflow.sklearn.log_model(model, "linear_regression_model")
print(f"Logged experiment with alpha={alpha} and rmse={rmse}")
3. Model Serving with Flask (Basic Example):
This is a simple example for local testing. Production serving would typically use more robust tools like TensorFlow Serving, TorchServe, or a cloud-managed service.
Python
from flask import Flask, request, jsonify
import mlflow.pyfunc
import numpy as np
app = Flask(__name__)
# Load the MLflow model
model_path = "mlruns/0/your_run_id/artifacts/linear_regression_model" # Replace with your actual run ID
loaded_model = mlflow.pyfunc.load_model(model_path)
@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
if 'features' not in data:
return jsonify({'error': 'Missing "features" in request'}), 400
features = np.array(data['features']).reshape(1, -1)
prediction = loaded_model.predict(features).tolist()
return jsonify({'prediction': prediction})
if __name__ == '__main__':
app.run(debug=True, port=5000)
To run this Flask app:
- Save the code as
app.py
. - Replace
"mlruns/0/your_run_id/artifacts/linear_regression_model"
with the actual path to your saved MLflow model. You can find this in the MLflow UI. - Run
python app.py
. - Send a POST request with JSON data like
{"features": [3.5]}
tohttp://localhost:5000/predict
.
4. CI/CD with GitHub Actions (Example for Model Training):
This is a basic example of a GitHub Actions workflow to automatically train a model when code is pushed to the main
branch.
YAML
name: Train Model
on:
push:
branches: [ main ]
jobs:
train:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.x'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Train model and log with MLflow
run: python train.py # Replace with your actual training script
env:
MLFLOW_TRACKING_URI: mlruns/
5. Model Monitoring (Conceptual Snippet using a hypothetical monitoring library):
This is a conceptual example. Real monitoring would involve more sophisticated libraries and integration with monitoring dashboards.
Python
# Hypothetical monitoring library
import monitoring_lib
import time
import random
import mlflow.pyfunc
import numpy as np
# Load the production model
model_path = "production_model" # Replace with the actual path
production_model = mlflow.pyfunc.load_model(model_path)
monitoring_service = monitoring_lib.MonitoringService(endpoint="your_monitoring_dashboard")
while True:
# Simulate receiving new data
live_data = np.array([[random.uniform(1, 6)]])
# Make prediction
prediction = production_model.predict(live_data)
# Log prediction and features for monitoring
monitoring_service.log_prediction(features=live_data.tolist(), prediction=prediction.tolist())
# Simulate checking for drift (simplified)
if random.random() < 0.01:
monitoring_service.log_alert("Potential Data Drift Detected")
time.sleep(5)
requirements.txt
(Example):
mlflow
scikit-learn
numpy
flask
train.py
(Example – referenced in GitHub Actions):
Python
import mlflow
import mlflow.sklearn
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import numpy as np
mlflow.set_tracking_uri(os.environ.get("MLFLOW_TRACKING_URI", "mlruns/"))
with mlflow.start_run():
alpha = 0.2
mlflow.log_param("alpha", alpha)
X = np.array([[1], [2], [3], [4], [5]])
y = np.array([2, 4, 5, 4, 5])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LinearRegression(alpha=alpha)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, predictions))
mlflow.log_metric("rmse", rmse)
mlflow.sklearn.log_model(model, "linear_regression_model")
print(f"Trained model with alpha={alpha} and rmse={rmse}")
Important Notes:
- Replace Placeholders: Remember to replace placeholders like bucket names, remote URLs, run IDs, and file paths with your actual values.
- Simplified Examples: These are basic illustrations. Real-world MLOps pipelines are often much more complex and involve orchestrating these steps using tools like Apache Airflow, Kubeflow Pipelines, or cloud-managed MLOps platforms.
- Tool Integration: MLOps is about the integration of various tools to automate the ML lifecycle. These examples show individual tool usage.
- Environment Management: Consider using tools like Conda or virtualenv to manage dependencies.
- Security: In production, ensure proper security measures are in place for accessing data, models, and infrastructure.
These code samples provide a starting point for understanding how different MLOps concepts can be implemented using Python and related tools. As you delve deeper into MLOps, you’ll explore more advanced techniques and orchestrate these steps into automated and reliable pipelines.