Saturday, February 28, 2026

Build a Relevance-Guided Active Learning System for Geospatial Discovery

What You'll Build

You'll build a Python pipeline that intelligently selects satellite imagery patches for labeling by combining uncertainty-based active learning with concept-guided relevance weighting. Instead of randomly labeling thousands of images, your system will ask "what should I learn next?" while considering domain knowledge like "areas near industrial sites are more likely to contain contamination."

The final system will process synthetic four-band multispectral imagery patches (mimicking a common RGB + near-infrared sensor configuration), maintain an incremental meta-learning loop that adapts to new labels, and prioritize samples based on both model uncertainty AND domain-relevant concepts like land cover type or proximity to known contamination sources. This approach is critical for real geospatial discovery tasks—whether hunting for rare minerals, tracking deforestation, or detecting pollution hotspots—where labeling budgets are limited and domain knowledge matters.

You'll walk away with:

  • A working concept-weighted active learning pipeline
  • A lightweight CNN classifier for multispectral imagery
  • An incremental meta-learning component for rapid adaptation
  • Reusable code patterns for geospatial ML projects
All code is on GitHub: github.com/klarson3k1o/owl-gps-active-learning
git clone https://github.com/klarson3k1o/owl-gps-active-learning.git to run without copying code manually.

Prerequisites

  • Python 3.10+ (tested on 3.11.5)
  • PyTorch 2.0+ with CUDA support optional (CPU works but slower)
  • pip or conda for package management
  • Basic understanding of:
    • Python classes and NumPy arrays
    • Neural network training loops
    • Active learning concepts (helpful but not required)
  • Estimated time: 90-120 minutes with a CUDA-capable GPU; 4-6 hours on CPU
  • Disk space: ~2GB for dependencies, ~50MB for synthetic data

Step-by-Step Instructions

Step 1: Set Up Your Project Structure and Environment

Create a clean working directory with an isolated Python environment:

mkdir owl-gps-tutorial
cd owl-gps-tutorial
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

Install PyTorch and supporting libraries:

# For CUDA 11.8 (adjust URL for your CUDA version or use CPU)
pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118

# Core dependencies
pip install scikit-learn numpy matplotlib

For CPU-only installation: Use pip install torch torchvision without the index-url flag.

Apple Silicon (M1/M2/M3): Do not use the --index-url flag above — it will fail. Use pip install torch torchvision and PyTorch will automatically use the MPS backend where available.

Verify installation:

python -c "import torch; print(f'PyTorch {torch.__version__}, CUDA available: {torch.cuda.is_available()}')"

Step 2: Generate Synthetic Geospatial Data

Create synthetic data that mimics multispectral satellite patches with associated land cover concepts. This lets you focus on the active learning logic without infrastructure overhead.

Create generate_data.py:

import numpy as np
import pickle
from pathlib import Path

def generate_synthetic_geospatial_data(n_samples=500, image_size=32, n_bands=4, seed=42):
    """
    Generate synthetic satellite-like imagery with associated concepts.

    Args:
        n_samples: Number of image patches to generate
        image_size: Spatial dimensions (height and width in pixels)
        n_bands: Number of spectral bands (4 mimics RGB + NIR)

    Returns:
        images: Array of shape (n_samples, n_bands, image_size, image_size)
        labels: Binary labels (1 = target present, 0 = absent)
        concepts: List of dicts with domain features per sample
    """
    np.random.seed(seed)
    images = []
    labels = []  # Binary: 1 = target present (e.g., contamination hotspot), 0 = absent
    concepts = []  # Land cover type, distance to industrial sites, etc.

    for i in range(n_samples):
        # Generate random multispectral patch (values in [0, 1])
        img = np.random.rand(n_bands, image_size, image_size).astype(np.float32)

        # Simulate land cover concepts: 0=forest, 1=urban, 2=water, 3=agricultural
        land_cover = np.random.randint(0, 4)

        # Distance to industrial site (normalized 0-1, where 0 is close)
        dist_industrial = np.random.rand()

        # Target probability influenced by concepts
        # Urban areas close to industrial sites more likely to have contamination
        target_prob = 0.1  # Base rate
        if land_cover == 1:  # Urban
            target_prob += 0.3
        if dist_industrial < 0.3:  # Close to industrial
            target_prob += 0.4

        label = 1 if np.random.rand() < target_prob else 0

        images.append(img)
        labels.append(label)
        concepts.append({
            'land_cover': land_cover,
            'dist_industrial': dist_industrial,
            'urban': 1 if land_cover == 1 else 0,
            'near_industrial': 1 if dist_industrial < 0.3 else 0
        })

    return np.array(images), np.array(labels), concepts

# Generate dataset
print("Generating synthetic geospatial dataset...")
images, labels, concepts = generate_synthetic_geospatial_data(n_samples=500)

# Save to disk
Path("data").mkdir(exist_ok=True)
np.save("data/images.npy", images)
np.save("data/labels.npy", labels)
with open("data/concepts.pkl", "wb") as f:
    pickle.dump(concepts, f)

print(f"Generated {len(images)} samples")
print(f"Positive class ratio: {labels.mean():.3f}")
print(f"Image shape: {images[0].shape}")
print(f"Data saved to data/ directory")

Run the data generation script:

python generate_data.py
Security note: We use pickle to load the synthetic concepts file. This is safe here because the file was generated locally by the previous script, but never unpickle files from untrusted sources.

Expected output:

Generating synthetic geospatial dataset...
Generated 500 samples
Positive class ratio: 0.314
Image shape: (4, 32, 32)
Data saved to data/ directory

What just happened: You created 500 synthetic "satellite patches" with 4 spectral bands (mimicking RGB + near-infrared). Each patch has associated concept features (land cover type, distance to industrial sites) that influence whether a target is present. This simulates real-world scenarios where domain knowledge correlates with target presence—urban areas near industrial sites have higher contamination probability in this simulation.

Step 3: Build the Concept-Weighted Uncertainty Sampler

Implement the core innovation: combining model uncertainty with concept relevance for intelligent sample selection.

Create active_learner.py:

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import roc_auc_score

class GeospatialDataset(Dataset):
    """PyTorch dataset wrapper for geospatial patches with concepts."""

    def __init__(self, images, labels, concepts):
        self.images = torch.FloatTensor(images)
        self.labels = torch.LongTensor(labels)
        self.concepts = concepts

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        return self.images[idx], self.labels[idx], idx

class SimpleCNN(nn.Module):
    """Lightweight CNN for multispectral patch classification."""

    def __init__(self, n_bands=4, n_classes=2):
        super().__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(n_bands, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)

        # Fully connected layers
        # After two pooling layers: 32x32 -> 16x16 -> 8x8
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, n_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # First conv block
        x = self.pool(F.relu(self.conv1(x)))  # -> [batch, 32, 16, 16]
        # Second conv block
        x = self.pool(F.relu(self.conv2(x)))  # -> [batch, 64, 8, 8]
        # Flatten and classify
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

class ConceptWeightedActiveLearner:
    """
    Active learner that combines uncertainty sampling with concept relevance.

    This is the core innovation: instead of purely uncertainty-based sampling,
    we weight samples by domain-relevant concepts (e.g., urban areas near
    industrial sites are more relevant for contamination detection).
    """

    def __init__(self, model, device='cuda' if torch.cuda.is_available() else 'cpu',
                 concept_weights=None):
        """
        Args:
            model:           PyTorch model to train and query
            device:          'cuda' or 'cpu'
            concept_weights: Dict mapping concept names to importance multipliers.
                             Defaults to urban=2.0, near_industrial=3.0.
                             Pass your own dict to tune for a different task.
        """
        self.model = model.to(device)
        self.device = device
        self.labeled_indices = set()

        # Domain-specific concept weights — override via constructor for new tasks
        self.concept_weights = concept_weights or {
            'urban': 2.0,          # Urban areas more relevant for contamination
            'near_industrial': 3.0  # Proximity to industrial sites very relevant
        }

    def compute_uncertainty(self, unlabeled_loader):
        """
        Compute prediction uncertainty using entropy.

        Higher entropy = model is more uncertain = more informative sample.
        """
        self.model.eval()
        uncertainties = []
        indices = []

        with torch.no_grad():
            for images, _, idx in unlabeled_loader:
                images = images.to(self.device)
                logits = self.model(images)
                probs = F.softmax(logits, dim=1)

                # Entropy-based uncertainty: -sum(p * log(p))
                entropy = -torch.sum(probs * torch.log(probs + 1e-10), dim=1)
                uncertainties.extend(entropy.cpu().numpy())
                indices.extend(idx.numpy())

        return np.array(uncertainties), np.array(indices)

    def compute_concept_relevance(self, indices, concepts):
        """
        Compute relevance score based on domain concepts.

        Higher score = more relevant to our search task based on domain knowledge.

        Args:
            indices: Array of indices into the concepts list (original dataset indices
                     returned by __getitem__ via Subset)
            concepts: The full train_dataset.concepts list (length = size of full training set).
                      Indices returned by __getitem__ via Subset are original dataset indices,
                      so this must be the complete list — not a subset-aligned list.
        """
        relevance_scores = []
        for idx in indices:
            concept = concepts[idx]
            score = 1.0  # Base relevance

            # Multiply by concept importance weights
            if concept['urban']:
                score *= self.concept_weights['urban']
            if concept['near_industrial']:
                score *= self.concept_weights['near_industrial']

            relevance_scores.append(score)

        return np.array(relevance_scores)

    def select_samples(self, unlabeled_loader, concepts, budget=10, alpha=0.5):
        """
        Select samples using concept-weighted uncertainty.

        Args:
            unlabeled_loader: DataLoader for unlabeled samples
            concepts: The full train_dataset.concepts list (length = size of full training set).
                      Must be the complete list because indices returned by the DataLoader
                      (via Subset.__getitem__) are original dataset indices, not subset positions.
            budget: Number of samples to select
            alpha: float in [0, 1]. Controls the uncertainty/relevance balance.
                   1.0 = pure uncertainty sampling (ignores domain concepts),
                   0.0 = pure concept-relevance sampling (ignores model uncertainty),
                   0.5 = equal weight between both signals.

        Returns:
            top_indices: Indices of selected samples (relative to original dataset)
            top_scores: Combined scores for selected samples
        """
        # Compute both uncertainty and relevance
        uncertainties, indices = self.compute_uncertainty(unlabeled_loader)
        relevance = self.compute_concept_relevance(indices, concepts)

        # Normalize both scores to [0, 1] range for fair combination
        uncertainties = (uncertainties - uncertainties.min()) / (uncertainties.max() - uncertainties.min() + 1e-10)
        relevance = (relevance - relevance.min()) / (relevance.max() - relevance.min() + 1e-10)

        # Combined score: weighted sum of uncertainty and relevance
        scores = alpha * uncertainties + (1 - alpha) * relevance

        # Guard against budget larger than available samples
        budget = min(budget, len(scores))

        # Select top-k samples with highest combined scores
        top_k_positions = np.argsort(scores)[-budget:]
        top_indices = indices[top_k_positions]
        top_scores = scores[top_k_positions]

        return top_indices, top_scores

    def train_epoch(self, train_loader, optimizer, criterion):
        """Train model for one epoch."""
        self.model.train()
        total_loss = 0

        for images, labels, _ in train_loader:
            images, labels = images.to(self.device), labels.to(self.device)

            optimizer.zero_grad()
            outputs = self.model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        return total_loss / len(train_loader)

    def evaluate(self, test_loader):
        """Evaluate model performance using AUC-ROC."""
        self.model.eval()
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for images, labels, _ in test_loader:
                images = images.to(self.device)
                outputs = self.model(images)
                probs = F.softmax(outputs, dim=1)[:, 1]  # Probability of positive class

                all_preds.extend(probs.cpu().numpy())
                all_labels.extend(labels.numpy())

        # Guard against single-class splits (can happen with small or skewed test sets)
        if len(set(all_labels)) < 2:
            return float('nan')
        auc = roc_auc_score(all_labels, all_preds)
        return auc

if __name__ == "__main__":
    print("Active learner module loaded successfully")

Test the module:

python active_learner.py

Expected output:

Active learner module loaded successfully

What just happened: You built the core machinery—a CNN classifier for multispectral imagery, and an active learning loop that computes both model uncertainty (via entropy) and concept relevance (via domain features), then combines them. The alpha parameter lets you tune how much you trust model uncertainty versus domain knowledge: alpha=1.0 is pure uncertainty sampling, alpha=0.0 is pure concept-based sampling, and alpha=0.5 balances both.

Step 4: Implement the Incremental Meta-Learning Loop

Add a Reptile meta-learning component that nudges the model toward fast adaptability as newly labeled batches arrive.

Create meta_learning.py:

import torch
import torch.nn as nn
from copy import deepcopy

class OnlineMetaLearner:
    """
    Incremental meta-learner using the Reptile algorithm (Nichol et al., 2018).

    Reptile is a first-order meta-learning algorithm that updates the meta-model
    by interpolating its weights toward a task-adapted copy. Unlike MAML it
    requires no second-order gradients and works with any standard inner-loop
    optimizer — making it a practical drop-in for sequential active learning.

    On each call to meta_update():
      1. A copy of the meta-model is fine-tuned on the support set (newly
         labeled samples) for `inner_steps` gradient steps.
      2. The meta-model weights are nudged toward the fine-tuned weights:
             theta_meta = theta_meta + meta_lr * (theta_adapted - theta_meta)
      3. The query set is evaluated for monitoring only (no meta-gradient needed).
    """

    def __init__(self, base_model, inner_lr=0.01, meta_lr=0.1, inner_steps=5):
        """
        Args:
            base_model:   PyTorch model to meta-learn (shared with active learner)
            inner_lr:     SGD learning rate for inner-loop adaptation
            meta_lr:      Reptile step size — how far to interpolate toward
                          the adapted weights (0 < meta_lr <= 1)
            inner_steps:  Maximum gradient steps in the inner loop
        """
        self.meta_model  = base_model
        self.inner_lr    = inner_lr
        self.meta_lr     = meta_lr
        self.inner_steps = inner_steps

    def meta_update(self, support_loader, query_loader, criterion):
        """
        Perform one Reptile meta-update.

        Args:
            support_loader: DataLoader with newly labeled samples for adaptation
            query_loader:   DataLoader with held-out samples for loss monitoring
            criterion:      Loss function (e.g. CrossEntropyLoss)

        Returns:
            query_loss: Average loss on query set after adaptation (float),
                        or None if query_loader is empty.
        """
        device = next(self.meta_model.parameters()).device

        # ── Inner loop ────────────────────────────────────────────────────────
        # Fine-tune a disconnected clone on the support set.
        # Reptile intentionally uses deepcopy — the gradient never needs to flow
        # back through the inner loop (unlike MAML).
        adapted_model    = deepcopy(self.meta_model)
        inner_optimizer  = torch.optim.SGD(adapted_model.parameters(), lr=self.inner_lr)

        adapted_model.train()
        for step, (images, labels, _) in enumerate(support_loader):
            if step >= self.inner_steps:
                break
            images, labels = images.to(device), labels.to(device)
            inner_optimizer.zero_grad()
            loss = criterion(adapted_model(images), labels)
            loss.backward()
            inner_optimizer.step()

        # ── Reptile weight update ─────────────────────────────────────────────
        # Move meta-model weights toward the adapted weights.
        # theta_meta = theta_meta + meta_lr * (theta_adapted - theta_meta)
        with torch.no_grad():
            for meta_p, adapted_p in zip(self.meta_model.parameters(),
                                         adapted_model.parameters()):
                meta_p.data = meta_p.data + self.meta_lr * (adapted_p.data - meta_p.data)

        # ── Query evaluation (monitoring only) ────────────────────────────────
        adapted_model.eval()
        query_loss = 0.0
        n_batches  = 0

        with torch.no_grad():
            for images, labels, _ in query_loader:
                images, labels = images.to(device), labels.to(device)
                query_loss += criterion(adapted_model(images), labels).item()
                n_batches  += 1

        if n_batches == 0:
            return None

        return query_loss / n_batches

    def get_model(self):
        """Return current meta-model."""
        return self.meta_model

if __name__ == "__main__":
    print("Meta-learning module loaded successfully")

Test the module:

python meta_learning.py

Expected output:

Meta-learning module loaded successfully

What just happened: You implemented the Reptile meta-learning algorithm. After each new labeled batch arrives, Reptile fine-tunes a temporary copy of the model on those samples, then nudges the main model's weights toward the copy — no second-order gradients required. The meta_lr controls how aggressively the model adapts: larger values favor the new batch, smaller values preserve knowledge from all previous rounds. The query set is evaluated after adaptation to give you a signal of how well the model generalises to unseen samples from the same distribution shift.

Step 5: Build the Main Training Loop

Run steps in order: This script loads data from the data/ directory created in Step 2. If you see FileNotFoundError: data/images.npy, run python generate_data.py first.

Tie everything together into a complete active learning pipeline.

Create train_owl_gps.py:

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
import pickle  # safe: file generated locally by generate_data.py

from active_learner import GeospatialDataset, SimpleCNN, ConceptWeightedActiveLearner
from meta_learning import OnlineMetaLearner

def main():
    # Set random seeds for reproducibility
    np.random.seed(42)
    torch.manual_seed(42)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(42)  # Sets seed for all GPUs

    # Load synthetic data
    print("Loading synthetic geospatial data...")
    images = np.load("data/images.npy")
    labels = np.load("data/labels.npy")
    with open("data/concepts.pkl", "rb") as f:
        concepts = pickle.load(f)

    print(f"Loaded {len(images)} samples with {images.shape[1]} spectral bands")

    # Split into train/test (80/20 split)
    n_samples = len(images)
    indices = np.random.permutation(n_samples)
    train_idx = indices[:400]
    test_idx = indices[400:]

    # Create datasets
    train_dataset = GeospatialDataset(
        images[train_idx],
        labels[train_idx],
        [concepts[i] for i in train_idx]
    )
    test_dataset = GeospatialDataset(
        images[test_idx],
        labels[test_idx],
        [concepts[i] for i in test_idx]
    )

    print(f"Train set: {len(train_dataset)} samples")
    print(f"Test set: {len(test_dataset)} samples")

    # Initialize model and learner
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Using device: {device}")

    model = SimpleCNN(n_bands=4, n_classes=2)
    active_learner = ConceptWeightedActiveLearner(model, device=device)
    meta_learner = OnlineMetaLearner(model, inner_lr=0.01, meta_lr=0.1)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    # Active learning configuration
    initial_budget = 20  # Start with 20 randomly labeled samples
    query_budget = 10    # Query 10 new samples per round
    n_rounds = 10        # Run for 10 active learning rounds
    alpha = 0.5          # Balance uncertainty and relevance equally

    # Initialize with random labeled samples
    all_train_indices = set(range(len(train_dataset)))
    labeled_indices = set(np.random.choice(list(all_train_indices), initial_budget, replace=False))
    unlabeled_indices = all_train_indices - labeled_indices

    print(f"\n{'='*60}")
    print(f"Starting active learning with {len(labeled_indices)} initial labeled samples")
    print(f"{'='*60}\n")

    # Track performance over rounds
    results = {
        'round': [],
        'n_labeled': [],
        'test_auc': []
    }

    # Create test loader (once, since test set doesn't change)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Active learning loop
    for round_num in range(n_rounds):
        print(f"\n--- Round {round_num + 1}/{n_rounds} ---")
        print(f"Labeled samples: {len(labeled_indices)}")
        print(f"Unlabeled samples: {len(unlabeled_indices)}")

        # Create data loaders for current labeled set
        labeled_subset = Subset(train_dataset, list(labeled_indices))
        labeled_loader = DataLoader(labeled_subset, batch_size=16, shuffle=True)

        # Train for several epochs on current labeled set
        loss = float('nan')  # Initialize to handle empty loader edge case
        for epoch in range(5):
            loss = active_learner.train_epoch(labeled_loader, optimizer, criterion)
        print(f"Training loss (final epoch): {loss:.4f}")

        # Evaluate on test set
        test_auc = active_learner.evaluate(test_loader)
        print(f"Test AUC: {test_auc:.4f}")

        results['round'].append(round_num + 1)
        results['n_labeled'].append(len(labeled_indices))
        results['test_auc'].append(test_auc)

        # Skip querying on the last round
        if round_num == n_rounds - 1:
            break

        # Query new samples using concept-weighted uncertainty
        unlabeled_subset = Subset(train_dataset, list(unlabeled_indices))
        unlabeled_loader = DataLoader(unlabeled_subset, batch_size=32, shuffle=False)

        new_indices, scores = active_learner.select_samples(
            unlabeled_loader,
            train_dataset.concepts,  # Full list: __getitem__ returns dataset indices (0..399)
            budget=query_budget,
            alpha=alpha
        )
        print(f"Selected {len(new_indices)} new samples for labeling")
        print(f"  Score range: [{scores.min():.3f}, {scores.max():.3f}]")

        # "Label" the selected samples (in real world, a human annotator does this)
        labeled_indices.update(new_indices.tolist())
        unlabeled_indices -= set(new_indices.tolist())

        # Meta-learning update: adapt to newly labeled batch using Reptile
        # Note: Adam optimizer momentum buffers become stale after Reptile's in-place
        # weight update. This is acceptable for tutorial code but production systems
        # may want to reset optimizer state or use SGD.
        if len(new_indices) >= 4:
            # 50/50 split: support set for inner-loop adaptation, query set for evaluation
            n_support = len(new_indices) // 2
            support_subset = Subset(train_dataset, new_indices[:n_support].tolist())
            query_subset   = Subset(train_dataset, new_indices[n_support:].tolist())
            support_loader = DataLoader(support_subset, batch_size=8, shuffle=True)
            query_loader   = DataLoader(query_subset,   batch_size=8, shuffle=False)

            meta_loss = meta_learner.meta_update(support_loader, query_loader, criterion)
            if meta_loss is not None:
                print(f"  Reptile meta-update query loss: {meta_loss:.4f}")

    # Print final summary
    print(f"\n{'='*60}")
    print("Active Learning Complete!")
    print(f"{'='*60}")
    print(f"\nRound | Labeled | Test AUC")
    print(f"------|---------|----------")
    for r, n, auc in zip(results['round'], results['n_labeled'], results['test_auc']):
        print(f"  {r:2d}  |   {n:3d}   |  {auc:.4f}")
    print(f"\nFinal Test AUC: {results['test_auc'][-1]:.4f}")
    print(f"Labeled {len(labeled_indices)}/{len(train_dataset)} available training samples")

if __name__ == "__main__":
    main()

Run the full pipeline:

python train_owl_gps.py

Expected output (final summary table):

============================================================
Active Learning Complete!
============================================================

Round | Labeled | Test AUC
------|---------|----------
   1  |    20   |  0.5234
   2  |    30   |  0.4876
   3  |    40   |  0.5123
   4  |    50   |  0.4987
   5  |    60   |  0.5345
   6  |    70   |  0.5012
   7  |    80   |  0.5456
   8  |    90   |  0.5189
   9  |   100   |  0.5567
  10  |   110   |  0.5234

Final Test AUC: 0.5234
Labeled 110/400 available training samples
Why near-chance AUC? The synthetic images are pure random noise (np.random.rand) — the concepts (land cover, distance to industrial) are not encoded in the pixel values. Since the CNN only sees images without the concept features, there's no learnable signal. You'll see AUC hovering around 0.50–0.55, which is expected. In a real geospatial dataset, the spectral bands carry actual signal and AUC would improve meaningfully.

What just happened: The active learning loop runs for 10 rounds. Each round trains the CNN on the current labeled set, evaluates on the held-out test set, selects the next most informative samples using concept-weighted uncertainty (combining model entropy with domain relevance), and optionally runs a Reptile meta-update to help the model stay adaptable as new batches arrive. By the final round you'll see the pipeline mechanics working correctly — samples selected, meta-updates applied, results logged — even though AUC hovers near chance. That's by design with this synthetic data.

Full source code: github.com/klarson3k1o/owl-gps-active-learning — Apache 2.0 licensed.

Where to Go Next

The pipeline you built is a working foundation. Here are the natural extensions, roughly in order of difficulty.

1. Swap in Real Satellite Data

The biggest jump in learning value. Two free sources work well as direct replacements for the synthetic data:

  • Sentinel-2 (ESA) — 13 spectral bands, 10m resolution, free global coverage via the Copernicus Data Space. Use bands B02, B03, B04, B08 (RGB + NIR) to match the 4-band setup in this tutorial.
  • Landsat-8/9 (USGS) — free via EarthExplorer, coarser resolution but longer historical archive. Good for change detection tasks.

To swap in real data: replace generate_data.py with a script that tiles your GeoTIFF into 32×32 patches and extracts concept features (land cover from OpenStreetMap, distance to industrial zones from OSM or national databases). The rest of the pipeline — active_learner.py, meta_learning.py, train_owl_gps.py — works unchanged.

2. Add a Random Sampling Baseline

Right now you have no way to know if concept-weighted active learning is actually helping. Run the same pipeline with alpha=1.0 (pure uncertainty) and a second run where you replace select_samples with random selection. Plot all three AUC curves over rounds. If your concept weights are good, the concept-weighted curve should pull ahead after round 3-4 — that gap is the value you're adding over a naive approach.

3. Replace the CNN Backbone

The SimpleCNN in this tutorial is intentionally minimal. For real imagery, swap it for a pretrained backbone:

  • ResNet-18 — modify the first conv layer to accept 4 bands instead of 3: nn.Conv2d(4, 64, kernel_size=7, ...), then load ImageNet weights for all other layers. Strong baseline with minimal effort.
  • EfficientNet-B0 — smaller and faster than ResNet for the same accuracy range. Available via torchvision.models.

The active learner and meta-learner are model-agnostic — swap the backbone and nothing else needs to change.

4. Improve Uncertainty Estimation

Entropy over a single forward pass is a weak uncertainty signal because a confident but wrong model will produce low entropy. Two better approaches:

  • Monte Carlo Dropout — keep dropout active at inference time, run 10-20 forward passes per sample, and measure variance across predictions. Requires adding self.model.train() during uncertainty computation and running multiple passes. Significantly better calibration.
  • Deep Ensembles — train 3-5 independent models with different seeds, measure disagreement between them. More compute but the most reliable uncertainty estimate available without Bayesian methods.

5. Tune the Key Hyperparameters

Three parameters have the most impact on real-world performance:

  • alpha — start at 0.5, then shift toward 0.0 (more concept-driven) early in training when the model is poorly calibrated, and toward 1.0 (more uncertainty-driven) as the model improves. A simple schedule: alpha = min(1.0, round_num / n_rounds + 0.3).
  • meta_lr — if you see the model forgetting earlier rounds (AUC drops between rounds), lower it toward 0.05. If adaptation to new batches is too slow, raise it toward 0.2.
  • query_budget — in real annotation workflows, budget is usually fixed by cost per label. Set it to match your actual annotation cost: if a human annotator labels 20 patches per hour and you have 2 hours per round, query_budget=40.

6. Move Toward Production

When you're ready to run this against a real dataset at scale:

  • Save model checkpoints — add torch.save(model.state_dict(), f"checkpoint_round_{round_num}.pt") after each round so you can resume without retraining from scratch.
  • Replace the in-memory labeled setlabeled_indices is a Python set that lives in RAM. For large datasets, store labeled indices and their annotations in a database or a simple CSV so human annotators can work asynchronously.
  • Decouple annotation from training — the tutorial simulates instant labeling. In production, select_samples writes a query batch to a queue, human annotators label it over hours or days, and training resumes when labels arrive. The pipeline structure supports this naturally.

Written by: Keith Larson

klarson@3k1o.com | klarson@planet-ai.net

Visit the original blog post for the latest updates and to leave a comment.

Saturday, February 21, 2026

Beyond Vector Databases: How AI Actually Needs Persistent Memory

Every AI agent tutorial shows the same thing: a stateless chatbot that forgets everything when the session ends. Ask it the same question twice, you get two different answers. Reference something from last week, it has no idea.

This isn't a limitation of LLMs. It's a limitation of architecture.

I've been working on persistent memory systems for AI agents. Not just "store the chat history" - actual structured memory that persists across sessions, learns patterns, and improves over time. Here's how to build it.

The Memory Problem Nobody Talks About

Current AI agent architectures have three memory tiers:

  • Context window: What the model sees right now (limited, expensive)
  • Vector store: Semantic search over documents (great for RAG, terrible for state)
  • Application database: Structured data about users, sessions, history

The gap is between vector stores and application databases. Vector DBs are for similarity search. They're not for tracking "what did we decide last Tuesday" or "what's the current status of this workflow."

How Persistent Memory Actually Works

Real persistent memory for AI needs four capabilities:

  1. Structured storage - What happened, when, why
  2. Pattern recognition - What connects to what
  3. Temporal awareness - What changed over time
  4. Relationship tracking - How decisions relate to outcomes

Here's the architecture that handles this:


┌───────────────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│         AI Agent          │────▶│   Structured    │────▶│   Pattern       │
│ (Claude Code | Kimi CLI)  │◀────│   Memory        │◀────│   Graph         │
│                           │     │   (MySQL)       │     │   (Neo4j)       │
└───────────────────────────┘     └─────────────────┘     └─────────────────┘

MySQL: The Structured Memory Layer

MySQL (or PostgreSQL) handles the structured data:

-- What decisions were made
CREATE TABLE architecture_decisions (
    id INT AUTO_INCREMENT PRIMARY KEY,
    project_id INT NOT NULL,
    title VARCHAR(255) NOT NULL,
    decision TEXT NOT NULL,
    rationale TEXT,
    decided_at DATETIME,
    INDEX idx_project_date (project_id, decided_at)
);

-- Reusable patterns discovered
CREATE TABLE code_patterns (
    id INT AUTO_INCREMENT PRIMARY KEY,
    category VARCHAR(50),
    name VARCHAR(255),
    description TEXT,
    code_example TEXT,
    confidence_score FLOAT,
    usage_count INT DEFAULT 0
);

This is boring, reliable, ACID-compliant storage. It's what you need when an AI agent says "I decided to use FastAPI" and you need to remember why six months later.

Neo4j: The Pattern Recognition Layer

Neo4j handles what MySQL can't: relationships and similarity.

When an AI agent makes a decision, you want to know:

  • Is this similar to a previous decision?
  • What patterns keep recurring?
  • Which decisions led to good outcomes?
// Graph model for AI agent memory
(:Decision {title: 'Use Redis for caching'})
  -[:SIMILAR_TO]->(:Decision {title: 'Used Redis in project X'})
  -[:LEADS_TO]->(:Outcome {type: 'performance_improvement'})

// Query: Find similar past decisions
MATCH (d:Decision)-[:SIMILAR_TO]-(similar:Decision)
WHERE d.id = $current_decision
RETURN similar
ORDER BY similar.confidence_score DESC

Why Not Just Vector Databases?

Vector DBs (Pinecone, Weaviate, etc.) are for similarity search. They're optimized for:

  • Finding documents similar to a query
  • Semantic search
  • RAG retrieval

They're not optimized for:

  • ACID transactions
  • Complex relationships
  • Temporal queries
  • Structured metadata filtering

Real-World Example: MCP Protocol

The Model Context Protocol (MCP) is gaining traction for exactly this reason. It defines how AI systems should store and retrieve context - not just embeddings, but structured session state, decisions, and patterns.

What MCP implementations are discovering: you need both structured storage (MySQL/PostgreSQL) and graph relationships (Neo4j). Vector DBs alone don't cut it for agent memory.

Stuck? Let AI Help You Build It

If you're thinking "this sounds complicated" - you're right, it kind of is. But you don't have to build it alone.

Here's the best part: ask your AI tool (Claude Code, Kimi CLI, or whatever you're using) to implement this architecture for you. Paste the schema above, describe what you want to build, and let it generate the code.

Need a starting point? This tutorial By Bala Priya C walks through building an MCP server from scratch:

Building a Simple MCP Server in Python

The AI can handle the boilerplate. You handle the logic. That's the whole point of persistent memory - the system learns so you don't have to start from zero every time.

Summary

AI agents need memory. Not just vector similarity - structured, relational, temporal memory.

MySQL gives you structured state. Neo4j gives you pattern recognition. Together they provide what vector databases alone cannot: true persistent memory for AI agents.

For the database-focused perspective on this architecture, see the companion post on AnotherMySQLDBA.

Thursday, February 19, 2026

Building a Production-Ready Inference Cache with Redis for LLM KV Management

Building a Production-Ready Inference Cache with Redis for LLM KV Management

What You'll Build

By the end of this tutorial, you'll have a working KV (key-value) cache system using Redis to store and retrieve LLM inference results. This dramatically reduces latency for repeated inference requests—think chatbot conversations where context gets reused, or RAG systems hitting the same documents.

You'll build a Python service that intercepts LLM inference calls, checks Redis for cached results, and only hits your expensive GPU inference when there's a cache miss. This pattern can reduce repeated inference latency by orders of magnitude for conversational workloads, turning multi-second responses into millisecond lookups.

Why this matters: as inference workloads scale, you can't just throw more GPUs at the problem. Caching is how engineering teams manage inference costs without sacrificing response times.

Prerequisites

  • Python 3.10+ installed (3.10, 3.11, or 3.12 recommended)
  • Docker 20.x or later for running Redis
  • pip package manager
  • At least 8GB RAM (16GB recommended if running models locally)
  • Basic familiarity with Python and command line
  • Estimated time: 45-60 minutes

Install Python dependencies:

pip install torch transformers redis numpy

Verify installations:

python -c "import torch, transformers, redis; print('All packages installed')"

Step-by-Step Instructions

Step 1: Start Redis with Persistence

Run Redis in Docker with volume mounting so your cache survives restarts:

docker run -d \
  --name inference-cache \
  -p 6379:6379 \
  -v redis-data:/data \
  redis:7.2-alpine redis-server --appendonly yes

What this does:

  • -d: Runs container in detached mode (background)
  • --name inference-cache: Names the container for easy reference
  • -p 6379:6379: Maps Redis default port to your host
  • -v redis-data:/data: Creates persistent volume for cache data
  • --appendonly yes: Enables AOF persistence (writes survive restarts)

Verify Redis is running:

docker logs inference-cache | grep -i "ready to accept"

You should see output indicating Redis is ready to accept connections.

Step 2: Create the KV Cache Manager

Create a file called kv_cache_manager.py. This handles serialization of inference results into Redis-friendly byte strings and manages cache keys with TTL (time-to-live).

import redis
import numpy as np
import hashlib
import pickle
from typing import Optional, Tuple

class KVCacheManager:
    def __init__(self, host='localhost', port=6379, ttl=3600):
        """
        Initialize Redis connection with TTL for cache entries.
        
        Args:
            host: Redis server hostname
            port: Redis server port
            ttl: Time-to-live in seconds (default: 3600 = 1 hour)
        """
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            decode_responses=False  # Store binary data
        )
        self.ttl = ttl
        
    def _generate_key(self, prompt: str, layer_idx: int) -> str:
        """
        Generate cache key from prompt + layer index.
        Uses SHA256 hash to keep keys manageable length.
        
        Args:
            prompt: Input text prompt
            layer_idx: Layer index (-1 for final output)
            
        Returns:
            Redis key string like "kv:layer-1:a3f7c8b4e9d2c1f5"
        """
        prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()[:16]
        return f"kv:layer{layer_idx}:{prompt_hash}"
    
    def store_kv(self, prompt: str, layer_idx: int, 
                 key_cache: np.ndarray, value_cache: np.ndarray):
        """
        Store key and value tensors for a specific layer.
        
        Args:
            prompt: Input prompt used to generate cache key
            layer_idx: Layer index for this KV pair
            key_cache: Numpy array representing key tensor
            value_cache: Numpy array representing value tensor
        """
        cache_key = self._generate_key(prompt, layer_idx)
        
        # Serialize numpy arrays using pickle
        data = pickle.dumps({
            'key': key_cache,
            'value': value_cache
        })
        
        # Store with TTL to prevent unbounded memory growth
        self.redis_client.setex(cache_key, self.ttl, data)
        
    def retrieve_kv(self, prompt: str, layer_idx: int) -> Optional[Tuple[np.ndarray, np.ndarray]]:
        """
        Retrieve cached KV pairs.
        
        Args:
            prompt: Input prompt to look up
            layer_idx: Layer index to retrieve
            
        Returns:
            Tuple of (key_cache, value_cache) if found, None otherwise
        """
        cache_key = self._generate_key(prompt, layer_idx)
        data = self.redis_client.get(cache_key)
        
        if data is None:
            return None
            
        kv_pair = pickle.loads(data)
        return kv_pair['key'], kv_pair['value']
    
    def clear_cache(self):
        """Flush all KV cache entries matching our pattern."""
        for key in self.redis_client.scan_iter("kv:*"):
            self.redis_client.delete(key)

What this does: The cache manager creates unique keys by hashing prompts (to keep key lengths manageable), serializes numpy arrays using pickle, and stores them in Redis with automatic expiration via TTL. This prevents your cache from growing unbounded and consuming all available memory.

Note on pickle security: Pickle has known security vulnerabilities when deserializing untrusted data. For production systems handling untrusted input, use safer serialization formats like msgpack or protobuf.

Step 3: Create the Cached Inference Wrapper

Create cached_inference.py. This wraps a HuggingFace model and intercepts inference calls to check the cache first.

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from kv_cache_manager import KVCacheManager
import time

class CachedInferenceModel:
    def __init__(self, model_name: str, cache_manager: KVCacheManager):
        """
        Initialize model with cache support.
        
        Args:
            model_name: HuggingFace model identifier (e.g., 'gpt2')
            cache_manager: KVCacheManager instance for caching
        """
        print(f"Loading tokenizer for {model_name}...")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        
        print(f"Loading model {model_name}...")
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16,  # Use half precision to save memory
            device_map='auto'  # Automatically choose CPU/GPU
        )
        
        self.cache_manager = cache_manager
        self.cache_hits = 0
        self.cache_misses = 0
        
    def generate_with_cache(self, prompt: str, max_new_tokens: int = 50) -> str:
        """
        Generate text with caching support.
        
        This implementation caches final outputs based on exact prompt matching.
        For production, you'd cache intermediate KV tensors from attention layers.
        
        Args:
            prompt: Input text prompt
            max_new_tokens: Maximum tokens to generate
            
        Returns:
            Generated text (with [CACHED] prefix if from cache)
        """
        start_time = time.time()
        
        # Check cache first (using layer_idx=-1 to indicate final output)
        cached_output = self.cache_manager.retrieve_kv(prompt, layer_idx=-1)
        
        if cached_output is not None:
            self.cache_hits += 1
            elapsed = time.time() - start_time
            print(f"✓ Cache HIT! Retrieved in {elapsed:.4f}s")
            # Reconstruct output from cached data
            cached_text = cached_output[0].tobytes().decode('utf-8')
            return f"[CACHED] {cached_text}"
        
        # Cache miss - run full inference
        self.cache_misses += 1
        print(f"✗ Cache MISS. Running full inference...")
        
        # Tokenize input
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
        
        # Generate output
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=False,  # Deterministic output for caching
                use_cache=True,  # Enable model's internal KV cache
                pad_token_id=self.tokenizer.eos_token_id  # Prevent warnings
            )
        
        # Decode generated tokens
        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        # Store in cache for future requests
        # We store the text as numpy array for consistency with the interface
        import numpy as np
        text_bytes = generated_text.encode('utf-8')
        self.cache_manager.store_kv(
            prompt, 
            layer_idx=-1, 
            key_cache=np.frombuffer(text_bytes, dtype=np.uint8), 
            value_cache=np.array([])  # Empty value cache for this simplified version
        )
        
        elapsed = time.time() - start_time
        print(f"Generated in {elapsed:.4f}s")
        
        return generated_text
    
    def print_stats(self):
        """Print cache performance statistics."""
        total = self.cache_hits + self.cache_misses
        hit_rate = (self.cache_hits / total * 100) if total > 0 else 0
        print(f"\n=== Cache Statistics ===")
        print(f"Hits: {self.cache_hits}")
        print(f"Misses: {self.cache_misses}")
        print(f"Hit Rate: {hit_rate:.1f}%")

What this does: This wrapper checks the cache before running inference. On cache miss, it runs the full model inference, stores the result, and returns it. On cache hit, it returns the cached result immediately—typically orders of magnitude faster than full inference.

Simplified approach: This implementation caches final text outputs rather than intermediate KV tensors from attention layers. Caching actual KV tensors requires modifying the model's forward pass (possible but beyond this tutorial's scope). The caching pattern and performance benefits are identical.

Step 4: Test the Cache System

Create test_cache.py to demonstrate cache hits vs misses:

from kv_cache_manager import KVCacheManager
from cached_inference import CachedInferenceModel

def main():
    # Initialize cache manager
    cache_mgr = KVCacheManager(host='localhost', port=6379, ttl=3600)
    
    # Clear any existing cache for clean test
    print("Clearing cache...")
    cache_mgr.clear_cache()
    
    # Load model (using GPT-2 for speed - works with any causal LM)
    print("\nLoading model...")
    model = CachedInferenceModel('gpt2', cache_mgr)
    
    # Test prompt
    prompt = "The future of AI infrastructure is"
    
    # First run - cache miss expected
    print(f"\n{'='*60}")
    print(f"TEST 1: First inference (cache miss expected)")
    print(f"{'='*60}")
    output1 = model.generate_with_cache(prompt, max_new_tokens=30)
    print(f"Output: {output1[:100]}...")
    
    # Second run - cache hit expected
    print(f"\n{'='*60}")
    print(f"TEST 2: Second inference with same prompt (cache hit expected)")
    print(f"{'='*60}")
    output2 = model.generate_with_cache(prompt, max_new_tokens=30)
    print(f"Output: {output2[:100]}...")
    
    # Third run - different prompt, cache miss expected
    print(f"\n{'='*60}")
    print(f"TEST 3: Different prompt (cache miss expected)")
    print(f"{'='*60}")
    prompt2 = "AI models require"
    output3 = model.generate_with_cache(prompt2, max_new_tokens=30)
    print(f"Output: {output3[:100]}...")
    
    # Fourth run - back to first prompt, cache hit expected
    print(f"\n{'='*60}")
    print(f"TEST 4: Back to first prompt (cache hit expected)")
    print(f"{'='*60}")
    output4 = model.generate_with_cache(prompt, max_new_tokens=30)
    print(f"Output: {output4[:100]}...")
    
    # Print final statistics
    model.print_stats()

if __name__ == "__main__":
    main()

Run the test:

python test_cache.py

Performance analysis: You'll observe dramatic performance differences between cache misses (full inference) and cache hits (Redis lookup). Cache hits typically complete in milliseconds while full inference takes seconds—demonstrating how caching reduces latency for repeated requests. In production systems serving thousands of requests, this translates directly to reduced GPU costs and improved user experience.

Step 5: Monitor Redis Memory Usage

Check current memory usage:

docker exec inference-cache redis-cli INFO memory | grep used_memory_human

For continuous monitoring, create monitor_cache.py:

import redis
import time

def monitor_cache(host='localhost', port=6379, interval=5):
    """
    Monitor Redis cache metrics in real-time.
    
    Args:
        host: Redis hostname
        port: Redis port
        interval: Seconds between updates
    """
    client = redis.Redis(host=host, port=port)
    
    print("Monitoring Redis cache (Ctrl+C to stop)...")
    print(f"{'Time':<20} {'Keys':<10} {'Memory':<15} {'Hit Rate':<10}")
    print("-" * 60)
    
    try:
        while True:
            info = client.info()
            
            # Gather metrics
            keys = client.dbsize()
            memory_mb = info['used_memory'] / (1024 * 1024)
            hits = info.get('keyspace_hits', 0)
            misses = info.get('keyspace_misses', 0)
            total = hits + misses
            hit_rate = (hits / total * 100) if total > 0 else 0
            
            # Display row
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
            print(f"{timestamp:<20} {keys:<10} {memory_mb:>10.2f} MB   {hit_rate:>6.1f}%")
            
            time.sleep(interval)
            
    except KeyboardInterrupt:
        print("\nMonitoring stopped.")

if __name__ == "__main__":
    monitor_cache()

Run in a separate terminal while testing:

python monitor_cache.py

This gives you real-time visibility into cache performance and memory consumption—critical for production deployments.

Verification

Confirm everything works correctly with these checks:

1. Verify Redis Container Status

docker ps | grep inference-cache

Container should show "Up" status.

2. Verify Cache Keys Exist

docker exec inference-cache redis-cli KEYS "kv:*"

After running test_cache.py, you should see keys matching the pattern kv:layer-1:{hash}.

3. Test Cache Hit Rate

Run test_cache.py a second time:

python test_cache.py

The second execution should show 100% cache hits for both test prompts (4 hits, 0 misses).

4. Verify TTL is Working

Check time-to-live on a cache key (replace the hash with an actual key from step 2):

docker exec inference-cache redis-cli TTL "kv:layer-1:a3f7c8b4e9d2c1f5"

Should return a positive integer less than 3600 (seconds remaining until expiry). If it returns -1, TTL wasn't set correctly.

5. Test Cache Persistence

Restart Redis and verify cache survives:

# Restart container
docker restart inference-cache

# Wait 5 seconds for startup
sleep 5

# Check if keys still exist
docker exec inference-cache redis-cli KEYS "kv:*"

Keys should still be present, confirming AOF persistence is working.

Troubleshooting

Issue 1: "ConnectionRefusedError: [Errno 111] Connection refused"

Cause: Redis isn't running or isn't accessible on port 6379.

Fix:

# Check if Redis container is running
docker ps -a | grep inference-cache

# If stopped, start it
docker start inference-cache

# If it doesn't exist, recreate it
docker run -d --name inference-cache -p 6379:6379 -v redis-data:/data redis:7.2-alpine redis-server --appendonly yes

# Verify it's accepting connections
docker logs inference-cache | grep -i "ready"

Issue 2: "ModuleNotFoundError: No module named 'transformers'"

Cause: Python dependencies not installed or wrong Python environment active.

Fix:

# Reinstall dependencies
pip install torch transformers redis numpy

# Verify installation
python -c "import transformers; print(transformers.__version__)"

Issue 3: Cache hits not occurring on repeated prompts

Cause: TTL expired, or cache was cleared between runs.

Fix:

# Check if keys exist
docker exec inference-cache redis-cli KEYS "kv:*"

# If no keys, run test_cache.py again
python test_cache.py

# Then immediately run it again to see cache hits
python test_cache.py

Issue 4: "RuntimeError: CUDA out of memory"

Cause: GPU doesn't have enough memory for the model.

Fix:

The code already uses torch.float16 for memory efficiency. If still encountering issues:

# In cached_inference.py, modify model loading:
self.model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    device_map='cpu'  # Force CPU usage
)

Or use a smaller model like distilgpt2 instead of gpt2.

Issue 5: "pickle.UnpicklingError: invalid load key"

Cause: Corrupted cache data or version mismatch between pickle writes and reads.

Fix:

# Clear the cache completely
docker exec inference-cache redis-cli FLUSHDB

# Run test again
python test_cache.py

Next Steps

Now that you have a working inference cache, consider these enhancements:

1. Implement Semantic Caching

Instead of exact prompt matching, use embedding similarity to cache semantically similar prompts. This increases cache hit rates for paraphrased queries.

from sentence_transformers import SentenceTransformer

# Add to KVCacheManager.__init__
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

# Modify _generate_key to use embeddings
def _generate_key_semantic(self, prompt: str, threshold=0.9):
    embedding = self.embedding_model.encode(prompt)
    # Search for similar cached prompts using cosine similarity
    # Return existing key if similarity > threshold

2. Add Cache Warming

Pre-populate the cache with common queries during deployment:

def warm_cache(model, common_prompts):
    """Pre-cache frequently used prompts."""
    for prompt in common_prompts:
        model.generate_with_cache(prompt)

3. Implement Cache Eviction Policies

Beyond TTL, implement LRU (Least Recently Used) or LFU (Least Frequently Used)

Building a Local LLM Evaluation Pipeline with Ragas and Ollama

Building a Local LLM Evaluation Pipeline with Ragas and Ollama

What You'll Build

A working evaluation pipeline that tests your RAG (Retrieval-Augmented Generation) system locally using Ragas metrics and Ollama models. No OpenAI API keys required, no cloud dependencies—everything runs on your machine.

You'll create a Python script that takes questions, retrieves context from a document store, generates answers with a local LLM, and scores them across four key metrics: faithfulness, answer relevancy, context precision, and context recall. This approach costs nothing, keeps your data local, and provides repeatable evaluation results. You'll walk away with a complete pipeline you can adapt to evaluate your own RAG systems.

Prerequisites

  • Python 3.10+ (check with python --version)
  • pip package manager (included with Python)
  • Ollama installed and running - Download from https://ollama.ai/download
  • 8GB+ RAM (16GB recommended for smoother operation)
  • 5GB free disk space (for model downloads)
  • Basic understanding of RAG - you've built or used one before
  • Estimated time: 45-60 minutes

Step-by-Step Instructions

Step 1: Set Up Your Environment

Create a directory and virtual environment to isolate dependencies:

mkdir rag-eval-local
cd rag-eval-local
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

Expected output: Your terminal prompt shows (venv) at the beginning, indicating the virtual environment is active.

Step 2: Install Required Packages

Install Ragas and the Langchain Ollama integration:

pip install ragas==0.1.9 langchain-community==0.2.10 langchain-ollama==0.1.0 datasets==2.14.0

What each package does:

  • ragas - Evaluation framework providing RAG metrics
  • langchain-ollama - Connects Langchain to local Ollama models
  • langchain-community - Required dependency for Langchain integrations
  • datasets - Handles data formatting for Ragas

Expected output: Successful installation messages ending with "Successfully installed ragas-0.1.9..." (installation takes 1-2 minutes).

Step 3: Pull the Required Ollama Models

Download the LLM for evaluation and the embedding model (takes 10-15 minutes depending on connection speed):

# Pull the instruction-tuned LLM for evaluation (4.1GB)
ollama pull mistral:7b-instruct

# Pull the embedding model (274MB)
ollama pull nomic-embed-text

Expected output:

pulling manifest
pulling 4a03f83c5f0d... 100% ▕████████████████▏ 4.1 GB
pulling e6836092461f... 100% ▕████████████████▏  7.7 KB
pulling 4a03f83c5f0e... 100% ▕████████████████▏   11 KB
success

Why these models: Mistral 7B Instruct is fast enough for local evaluation while maintaining good quality. Nomic-embed-text is an open embedding model optimized for retrieval tasks and runs efficiently on Ollama.

Verify the models are ready:

ollama list

You should see both models listed with their sizes.

Step 4: Create Sample Data

Create sample_data.py with test data. In production, you'd load this from your actual RAG system, but this lets us focus on evaluation mechanics:

# sample_data.py
from datasets import Dataset

# Sample RAG outputs to evaluate
# Each entry represents: question asked, answer generated, 
# contexts retrieved, and ground truth answer
eval_data = {
    "question": [
        "What is the capital of France?",
        "How does photosynthesis work?",
        "What are the main causes of climate change?"
    ],
    "answer": [
        "The capital of France is Paris, a major European city known for its art, fashion, and culture.",
        "Photosynthesis is the process where plants use sunlight, water, and carbon dioxide to create oxygen and energy in the form of sugar.",
        "Climate change is primarily caused by human activities that release greenhouse gases, especially burning fossil fuels, deforestation, and industrial processes."
    ],
    "contexts": [
        ["Paris is the capital and most populous city of France. It is located in north-central France."],
        ["Photosynthesis is a process used by plants and other organisms to convert light energy into chemical energy. It uses carbon dioxide and water, producing glucose and oxygen."],
        ["The primary cause of climate change is the burning of fossil fuels like coal, oil, and gas, which releases carbon dioxide. Deforestation also contributes significantly."]
    ],
    "ground_truth": [
        "Paris is the capital of France.",
        "Photosynthesis converts light energy into chemical energy using CO2 and water, producing glucose and oxygen.",
        "Climate change is mainly caused by burning fossil fuels and deforestation, which release greenhouse gases."
    ]
}

# Convert to Ragas-compatible dataset format
dataset = Dataset.from_dict(eval_data)

Data structure explained: Ragas expects contexts as a list of lists (each question can have multiple retrieved context chunks), while other fields are simple lists. This mirrors what your RAG system produces.

Step 5: Configure Ragas with Ollama

Create the main evaluation script. This connects Ragas to your local Ollama models instead of cloud APIs.

Create evaluate_rag.py:

# evaluate_rag.py
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
)
from langchain_ollama import ChatOllama, OllamaEmbeddings
from sample_data import dataset

# Configure the Ollama LLM for evaluation
llm = ChatOllama(
    model="mistral:7b-instruct",
    temperature=0,  # Deterministic outputs for consistent evaluation
    num_ctx=4096,   # Context window size
)

# Configure the embedding model
embeddings = OllamaEmbeddings(
    model="nomic-embed-text"
)

# Run evaluation with local models
# The llm and embeddings parameters override Ragas' default OpenAI models
result = evaluate(
    dataset,
    metrics=[
        faithfulness,        # Is answer grounded in context?
        answer_relevancy,    # Does answer address the question?
        context_precision,   # Are retrieved contexts relevant?
        context_recall,      # Do contexts contain needed info?
    ],
    llm=llm,
    embeddings=embeddings,
)

# Display results
print("\n=== Evaluation Results ===")
print(result)

# Save detailed per-question results
result_df = result.to_pandas()
result_df.to_csv("evaluation_results.csv", index=False)
print("\nDetailed results saved to evaluation_results.csv")

Key configuration choices:

  • temperature=0 makes evaluation deterministic—you want consistent scores across runs
  • num_ctx=4096 provides enough context window for Ragas' evaluation prompts
  • We pass our local models directly to evaluate() instead of using default OpenAI models

Step 6: Run the Evaluation

Verify Ollama is running (it should auto-start after installation), then execute the evaluation script:

# Check Ollama is running
ollama list

# Run the evaluation
python evaluate_rag.py

Expected output:

Evaluating: 100%|████████████████████| 12/12 [00:45<00:00,  3.78s/it]

=== Evaluation Results ===
{'faithfulness': 0.8333, 'answer_relevancy': 0.9123, 'context_precision': 0.8889, 'context_recall': 0.9167}

Detailed results saved to evaluation_results.csv

What just happened: Ragas evaluated 12 items (4 metrics × 3 questions). Each metric uses the LLM to judge quality through carefully designed prompts. Execution time varies by hardware—expect 30-60 seconds on modern machines.

Step 7: Understand Your Metrics

Open evaluation_results.csv to see per-question scores. Here's what each metric measures:

  • Faithfulness (0-1): Is the answer grounded in the retrieved context? Scores near 1 mean no hallucinations.
  • Answer Relevancy (0-1): Does the answer actually address the question asked?
  • Context Precision (0-1): Are the retrieved contexts relevant to answering the question?
  • Context Recall (0-1): Do the contexts contain all information needed to answer?

Create a visualization script to better understand the results. Create visualize.py:

# visualize.py
import pandas as pd
import matplotlib.pyplot as plt

# Load evaluation results
df = pd.read_csv("evaluation_results.csv")

# Calculate average scores across all questions
metrics = ['faithfulness', 'answer_relevancy', 'context_precision', 'context_recall']
scores = df[metrics].mean()

# Create bar chart
plt.figure(figsize=(10, 6))
plt.bar(metrics, scores, color='steelblue')
plt.ylim(0, 1)
plt.ylabel('Score')
plt.title('RAG System Evaluation Metrics')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('metrics.png', dpi=150)
print("Visualization saved to metrics.png")

Install matplotlib if needed, then run the visualization:

pip install matplotlib==3.7.1
python visualize.py

Open metrics.png to see your results visualized. This makes it easier to spot which aspects of your RAG system need improvement.

Verification

Confirm everything worked correctly with these checks:

  1. Check the CSV exists and has content:
    ls -lh evaluation_results.csv
    head evaluation_results.csv
    
    You should see a file with multiple columns including your metrics and scores.
  2. Verify scores are reasonable: Open evaluation_results.csv—all metric scores should be between 0 and 1. If you see NaN, null, or values outside this range, something failed.
  3. Test with intentionally bad data: Modify sample_data.py to add a clearly wrong answer:
    # Add this to the end of the "answer" list in sample_data.py
    "The capital of France is London."
    # Add corresponding question, contexts, and ground_truth entries
    
    Run python evaluate_rag.py again—faithfulness should drop significantly for that question.
  4. Test consistency: Run python evaluate_rag.py three times with the original data. Scores should be identical or vary by less than 0.02 due to temperature=0.

Success looks like: You can run the evaluation repeatedly and get consistent scores, the CSV contains per-question breakdowns, and intentionally bad answers score lower than good ones.

Common Issues & Fixes

Issue 1: "Connection refused" or "Ollama not found"

Error message:

requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=11434): 
Max retries exceeded with url: /api/generate

Cause: Ollama service isn't running.

Fix: Start Ollama manually:

# macOS/Linux
ollama serve

# Or check if it's already running
ps aux | grep ollama

# Windows - Ollama should run as a system service
# Check system tray for Ollama icon

On macOS, Ollama typically runs as a background service after installation. If ollama list works, the service is running.

Issue 2: Evaluation hangs or takes extremely long

Symptoms: Progress bar stuck at 0% for 5+ minutes, or system becomes unresponsive.

Cause: Model not fully loaded in memory, or system is swapping to disk.

Fix: Cancel with Ctrl+C and verify models load properly:

# Test model responds quickly
ollama run mistral:7b-instruct "What is 2+2?"
# Should respond within 5-10 seconds

If the model loads slowly or your system is swapping, try a smaller quantized model:

# Pull 4-bit quantized version (smaller, faster)
ollama pull mistral:7b-instruct-q4_0

# Update evaluate_rag.py to use it
# Change: model="mistral:7b-instruct-q4_0"

Issue 3: Low or inconsistent scores on clearly good answers

Symptoms: Faithfulness of 0.3 when answer clearly matches context, or scores vary wildly between runs.

Cause: Model may be misinterpreting Ragas' evaluation prompts.

Fix: Try an alternative model that handles instruction-following better:

# Pull Llama 3 (often more reliable for evaluation)
ollama pull llama3:8b-instruct

# Update evaluate_rag.py
# Change: model="llama3:8b-instruct"

Or re-pull the current model to ensure you have the latest version:

ollama pull mistral:7b-instruct

Issue 4: ImportError or ModuleNotFoundError

Error message:

ModuleNotFoundError: No module named 'ragas'

Cause: Virtual environment not activated, or packages not installed.

Fix: Ensure virtual environment is active and reinstall:

# Activate virtual environment
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Verify activation - should show venv path
which python

# Reinstall packages
pip install ragas==0.1.9 langchain-community==0.2.10 langchain-ollama==0.1.0 datasets==2.14.0

Next Steps

Now that you have a working local evaluation pipeline, here's how to extend it:

  • Integrate with your actual RAG system: Replace sample_data.py with outputs from your production system. Export questions, generated answers, retrieved contexts, and ground truth answers in the same format.
  • Add custom metrics: Ragas supports custom evaluators for domain-specific requirements. See the official documentation at https://docs.ragas.io/en/latest/concepts/metrics/custom.html
  • Batch evaluation: Process larger datasets by loading them from CSV or JSON files. Ragas handles batching automatically, but consider chunking very large datasets (1000+ questions) to avoid memory issues.
  • Track metrics over time: Store results in a database (SQLite works well) to monitor how system changes affect metrics. This helps you catch regressions early.
  • Compare different approaches: Evaluate different chunking strategies, embedding models, or retrieval methods by swapping out the contexts while keeping questions constant. This isolates what actually improves performance.
  • Automate with CI/CD: Add evaluation to your testing pipeline to catch quality regressions before deployment.

Local models score differently than GPT-4 would, but they're consistent and free. This makes them ideal for iterative development where you need to run hundreds of evaluations. You can always run a final validation with commercial APIs once you've narrowed down your best approach.

Helpful resources:

If you hit issues not covered here, the Ragas GitHub Issues page is actively maintained, and the Langchain community forums have good coverage of Ollama integration questions.

Build a Multi-Region AI Inference Pipeline with Global Infrastructure

Build a Multi-Region AI Inference Pipeline with Global Infrastructure

What You'll Build

By the end of this tutorial, you'll have a working multi-region inference pipeline that intelligently routes requests between cloud regions. Your FastAPI service will select the optimal region based on real-time latency measurements and cost optimization, with automatic failover when regions become unavailable.

This architecture matters because compute pricing varies across regions. You'll build a system that balances cost differences with latency requirements. The tutorial uses local mock servers for testing before deploying to real cloud infrastructure, so you can validate the routing logic without incurring cloud costs during development.

Prerequisites

  • Python 3.10+ installed locally (python --version to check)
  • pip package manager
  • curl for testing endpoints
  • jq for parsing JSON responses: brew install jq (macOS) or apt-get install jq (Ubuntu)
  • AWS Account (optional for Step 8 - real deployment only)
  • AWS CLI v2 (optional for Step 8): aws --version
  • Terraform 1.6+ (optional for Step 8): terraform --version
  • Basic understanding of REST APIs and async Python
  • Estimated time: 60-90 minutes (Steps 1-7), additional 30-60 minutes for Step 8 if deploying to cloud

Install core Python dependencies:

pip install fastapi==0.104.1 uvicorn==0.24.0 httpx==0.25.0 pydantic==2.5.0

Expected output:

Successfully installed fastapi-0.104.1 uvicorn-0.24.0 httpx-0.25.0 pydantic-2.5.0

Step-by-Step Instructions

Step 1: Set Up the Project Structure

Create the project directory and file structure:

mkdir multi-region-inference
cd multi-region-inference
mkdir -p src terraform
touch src/__init__.py src/main.py src/region_router.py src/region_config.py

Verify the structure:

tree -L 2

Expected output:

.
├── src
│   ├── __init__.py
│   ├── main.py
│   ├── region_config.py
│   └── region_router.py
└── terraform

What just happened: You've created a standard Python project layout. The src directory contains your application code, and terraform will hold infrastructure-as-code files if you deploy to real cloud regions in Step 8.

Step 2: Create the Region Configuration

Create src/region_config.py:

import os
from typing import Dict, List
from pydantic import BaseModel


class RegionEndpoint(BaseModel):
    """Configuration for a single region endpoint"""
    name: str
    endpoint: str
    cost_per_1k_tokens: float  # Cost in USD
    priority: int  # Lower number = higher priority
    provider: str  # "aws" or "azure"


# Region configurations with representative cost values
REGIONS: Dict[str, RegionEndpoint] = {
    "us-east-1": RegionEndpoint(
        name="us-east-1",
        endpoint=os.getenv("AWS_US_EAST_ENDPOINT", "http://localhost:8001"),
        cost_per_1k_tokens=0.50,
        priority=3,
        provider="aws"
    ),
    "me-south-1": RegionEndpoint(
        name="me-south-1",  # AWS Bahrain - Middle East
        endpoint=os.getenv("AWS_ME_SOUTH_ENDPOINT", "http://localhost:8002"),
        cost_per_1k_tokens=0.35,
        priority=1,
        provider="aws"
    ),
    "ap-south-1": RegionEndpoint(
        name="ap-south-1",  # AWS Mumbai - South Asia
        endpoint=os.getenv("AWS_AP_SOUTH_ENDPOINT", "http://localhost:8003"),
        cost_per_1k_tokens=0.32,
        priority=1,
        provider="aws"
    ),
    "brazilsouth": RegionEndpoint(
        name="brazilsouth",  # Azure Brazil
        endpoint=os.getenv("AZURE_BRAZIL_ENDPOINT", "http://localhost:8004"),
        cost_per_1k_tokens=0.38,
        priority=2,
        provider="azure"
    ),
}


def get_sorted_regions() -> List[RegionEndpoint]:
    """Returns regions sorted by priority, then by cost"""
    return sorted(
        REGIONS.values(),
        key=lambda x: (x.priority, x.cost_per_1k_tokens)
    )

What just happened: You've defined configurations for four regions with different cost profiles. The priority system favors lower-cost regions (me-south-1 and ap-south-1) while maintaining fallback options. Endpoints default to localhost for local testing but can be overridden with environment variables for production deployment.

Step 3: Build the Smart Region Router

Create src/region_router.py:

import asyncio
import time
from typing import Optional, Dict, Any
import httpx
from src.region_config import get_sorted_regions, RegionEndpoint


class RegionRouter:
    """Routes inference requests to optimal regions based on latency and cost"""
    
    def __init__(self, timeout: float = 5.0):
        self.timeout = timeout
        self.latency_cache: Dict[str, float] = {}
        self.failure_count: Dict[str, int] = {}
        
    async def measure_latency(self, region: RegionEndpoint) -> Optional[float]:
        """
        Ping region endpoint to measure actual latency.
        Returns latency in seconds, or None if region is unreachable.
        """
        try:
            start = time.time()
            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.get(f"{region.endpoint}/health")
                if response.status_code == 200:
                    latency = time.time() - start
                    self.latency_cache[region.name] = latency
                    return latency
        except Exception as e:
            print(f"Failed to reach {region.name}: {e}")
            self.failure_count[region.name] = self.failure_count.get(region.name, 0) + 1
            return None
    
    async def select_best_region(self) -> Optional[RegionEndpoint]:
        """
        Select optimal region based on:
        1. Priority (cost tier)
        2. Measured latency
        3. Failure history (exclude regions with 3+ consecutive failures)
        """
        regions = get_sorted_regions()
        
        # Measure latency for all regions concurrently
        latency_tasks = [self.measure_latency(r) for r in regions]
        await asyncio.gather(*latency_tasks)
        
        # Filter out unhealthy regions
        available_regions = [
            r for r in regions 
            if self.failure_count.get(r.name, 0) < 3
            and self.latency_cache.get(r.name) is not None
        ]
        
        if not available_regions:
            print("WARNING: No healthy regions available!")
            return None
        
        # Calculate weighted score: priority + normalized latency
        def score_region(region: RegionEndpoint) -> float:
            latency = self.latency_cache.get(region.name, 999)
            # Lower score is better
            return region.priority + (latency * 10)
        
        best_region = min(available_regions, key=score_region)
        latency_ms = self.latency_cache.get(best_region.name, 0) * 1000
        print(f"Selected region: {best_region.name} "
              f"(latency: {latency_ms:.0f}ms, cost: ${best_region.cost_per_1k_tokens}/1k tokens)")
        return best_region
    
    async def route_inference(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """
        Route inference request to best available region.
        Returns response with metadata about selected region.
        """
        best_region = await self.select_best_region()
        
        if not best_region:
            raise Exception("No healthy regions available for inference")
        
        try:
            async with httpx.AsyncClient(timeout=30.0) as client:
                response = await client.post(
                    f"{best_region.endpoint}/v1/inference",
                    json=payload
                )
                response.raise_for_status()
                result = response.json()
                
                # Add routing metadata to response
                result["_metadata"] = {
                    "region": best_region.name,
                    "provider": best_region.provider,
                    "cost_per_1k": best_region.cost_per_1k_tokens
                }
                return result
        except Exception as e:
            print(f"Inference failed on {best_region.name}: {e}")
            self.failure_count[best_region.name] = self.failure_count.get(best_region.name, 0) + 1
            raise

What just happened: This is the core routing engine. It measures real latency to each region using concurrent health checks, maintains a failure count to avoid repeatedly trying dead regions, and selects the optimal region using a weighted scoring system that balances cost priority with actual latency. The route_inference method handles the actual request forwarding and adds metadata about which region processed the request.

Step 4: Create the FastAPI Application

Create src/main.py:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any
from src.region_router import RegionRouter

app = FastAPI(
    title="Multi-Region Inference API",
    description="Intelligent routing for AI inference across global regions"
)
router = RegionRouter()


class InferenceRequest(BaseModel):
    """Request schema for inference endpoint"""
    prompt: str
    max_tokens: int = 100
    temperature: float = 0.7


class InferenceResponse(BaseModel):
    """Response schema with result and routing metadata"""
    result: str
    metadata: Dict[str, Any]


@app.get("/health")
async def health_check():
    """Health check endpoint for load balancers"""
    return {"status": "healthy", "service": "multi-region-router"}


@app.post("/v1/inference", response_model=InferenceResponse)
async def inference(request: InferenceRequest):
    """
    Main inference endpoint - automatically routes to optimal region.
    
    The router selects the best region based on cost and latency,
    then forwards the request and returns the result with metadata.
    """
    try:
        payload = request.dict()
        result = await router.route_inference(payload)
        return InferenceResponse(
            result=result.get("output", ""),
            metadata=result.get("_metadata", {})
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/regions/status")
async def region_status():
    """
    Debug endpoint showing current region health metrics.
    Useful for monitoring and troubleshooting routing decisions.
    """
    return {
        "latency_cache": router.latency_cache,
        "failure_count": router.failure_count
    }


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

What just happened: You've created a FastAPI application that wraps the region router. The /v1/inference endpoint accepts inference requests and automatically routes them to the optimal region. The /regions/status endpoint exposes internal routing metrics for debugging and monitoring.

Step 5: Create Mock Regional Endpoints for Testing

Create mock_regional_server.py in the project root directory:

"""
Mock regional inference server for local testing.
Simulates a real inference endpoint with configurable latency.
"""
import sys
import time
import random
from fastapi import FastAPI
import uvicorn


def create_mock_server(region_name: str, latency_ms: int):
    """Create a mock server that simulates a regional endpoint"""
    app = FastAPI()
    
    @app.get("/health")
    async def health():
        # Simulate network latency
        time.sleep(latency_ms / 1000.0)
        return {"status": "healthy", "region": region_name}
    
    @app.post("/v1/inference")
    async def inference(payload: dict):
        # Simulate inference processing time
        time.sleep(random.uniform(0.5, 1.5))
        return {
            "output": f"Mock response from {region_name}: {payload.get('prompt', '')[:50]}...",
            "tokens_used": payload.get('max_tokens', 100)
        }
    
    return app


if __name__ == "__main__":
    # Parse command line arguments
    region = sys.argv[1] if len(sys.argv) > 1 else "us-east-1"
    port = int(sys.argv[2]) if len(sys.argv) > 2 else 8001
    latency = int(sys.argv[3]) if len(sys.argv) > 3 else 50
    
    app = create_mock_server(region, latency)
    print(f"Starting mock server for {region} on port {port} (simulated latency: {latency}ms)")
    uvicorn.run(app, host="0.0.0.0", port=port, log_level="warning")

What just happened: This creates mock regional inference endpoints with configurable latency characteristics. Each mock server simulates a real regional deployment, allowing you to test the routing logic locally without deploying to actual cloud infrastructure. The latency parameter lets you simulate geographic distance.

Step 6: Test the Multi-Region Router Locally

You'll need 5 terminal windows for this step. Open them all and navigate to your project directory in each.

Sub-step 6a: Start the mock regional servers

In terminal 1 (US East - higher latency):

python mock_regional_server.py us-east-1 8001 100

In terminal 2 (Middle East - lower latency):

python mock_regional_server.py me-south-1 8002 60

In terminal 3 (South Asia - lowest latency):

python mock_regional_server.py ap-south-1 8003 50

In terminal 4 (Brazil - medium latency):

python mock_regional_server.py brazilsouth 8004 80

Expected output in each terminal:

Starting mock server for [region] on port [port] (simulated latency: [X]ms)
INFO:     Started server process [PID]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:[port]

Sub-step 6b: Start the main router service

In terminal 5:

python src/main.py

Expected output:

INFO:     Started server process [12345]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

What just happened: You now have a complete simulated multi-region environment running locally. Four mock servers represent different geographic regions with realistic latency profiles, and the main router service is ready to intelligently distribute requests among them.

Step 7: Send Test Requests and Verify Routing

Sub-step 7a: Test a single inference request

Open a new terminal (terminal 6) and send a test request:

curl -X POST http://localhost:8000/v1/inference \
  -H "Content-Type: application/json" \
  -d '{
    "prompt": "Explain quantum computing in simple terms",
    "max_tokens": 150,
    "temperature": 0.7
  }'

Expected output:

{
  "result": "Mock response from ap-south-1: Explain quantum computing in simple terms...",
  "metadata": {
    "region": "ap-south-1",
    "provider": "aws",
    "cost_per_1k": 0.32
  }
}

In terminal 5 (where the router is running), you should see:

Selected region: ap-south-1 (latency: 52ms, cost: $0.32/1k tokens)

Sub-step 7b: Check region health status

curl -s http://localhost:8000/regions/status | jq

Expected output:

{
  "latency_cache": {
    "us-east-1": 0.102,
    "me-south-1": 0.062,
    "ap-south-1": 0.051,
    "brazilsouth": 0.081
  },
  "failure_count": {}
}

Sub-step 7c: Test failover behavior

Stop the ap-south-1 server (press Ctrl+C in terminal 3), then send another request:

curl -X POST http://localhost:8000/v1/inference \
  -H "Content-Type: application/json" \
  -d '{"prompt": "Test failover", "max_tokens": 50}'

Expected output: The response should now come from me-south-1 (the next best region):

{
  "result": "Mock response from me-south-1: Test failover...",
  "metadata": {
    "region": "me-south-1",
    "provider": "aws",
    "cost_per_1k": 0.35
  }
}

Restart the ap-south-1 server in terminal 3 for the next steps.

What just happened: You've verified that the router correctly selects the lowest-cost, lowest-latency region (ap-south-1) under normal conditions, and automatically fails over to the next best region when the primary becomes unavailable. This demonstrates the core value proposition: cost optimization with reliability.

Step 8: Deploy to Real Cloud Regions (Optional)

Note: This step requires an AWS account and will incur cloud infrastructure costs. Skip this step if you want to stay with local testing only.

Sub-step 8a: Install additional prerequisites

pip install boto3==1.34.0

Verify AWS CLI is configured:

aws sts get-caller-identity

Expected output:

{
    "UserId": "AIDAXXXXXXXXXXXXXXXXX",
    "Account": "123456789012",
    "Arn": "arn:aws:iam::123456789012:user/your-username"
}

Sub-step 8b: Enable required AWS regions

Some regions like me-south-1 require manual opt-in. Enable them via AWS Console or CLI:

# Check which regions are enabled
aws account list-regions --region-opt-status-contains ENABLED ENABLED_BY_DEFAULT

# Enable Middle East region (if not already enabled)
aws account enable-region --region-name me-south-1

Wait 5-10 minutes for the region to become fully available.

Sub-step 8c: Create Terraform configuration

Create terraform/main.tf:

terraform {
  required_version = ">= 1.6"
  
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

# Define provider for each region
provider "aws" {
  alias  = "us_east"
  region = "us-east-1"
}

provider "aws" {
  alias  = "me_south"
  region = "me-south-1"
}

provider "aws" {
  alias  = "ap_south"
  region = "ap-south-1"
}

# Example: Create ECS cluster in each region
# This is a simplified example - production requires security groups,
# load balancers, auto-scaling, and monitoring

resource "aws_ecs_cluster" "inference_us_east" {
  provider = aws.us_east
  name     = "inference-cluster-us-east-1"
}

resource "aws_ecs_cluster" "inference_me_south" {
  provider = aws.me_south
  name     = "inference-cluster-me-south-1"
}

resource "