TokenMix Team · 2026-03-21

How to Build a Multi-Model AI App: 4 Fallback Patterns (2026)

Building a Multi-Model AI Application with One API

Most production AI systems today rely on a single model. That works until it does not: the model has an outage, a new version regresses on your specific use case, or you discover that a different model handles certain queries better. Multi-model architectures solve these problems and unlock optimization strategies that are impossible with a single model.

This guide covers four production patterns for multi-model systems, all using the TokenMix API as the unified access layer.

Pattern 1: Intelligent Fallback Chain

The simplest multi-model pattern, and the one every production system should have: if your primary model fails, fall back to an alternative instead of returning an error.

import openai
import time
from typing import Optional

client = openai.OpenAI(
    base_url="https://api.tokenmix.ai/v1",
    api_key="your-tokenmix-api-key",
    timeout=30.0
)

class FallbackChain:
    def __init__(self, models: list[str]):
        self.models = models

    def complete(self, messages: list, **kwargs) -> tuple[str, str]:
        """Try each model in order. Returns (response, model_used)."""
        last_error = None

        for model in self.models:
            try:
                response = client.chat.completions.create(
                    model=model,
                    messages=messages,
                    **kwargs
                )
                return response.choices[0].message.content, model

            except openai.RateLimitError:
                last_error = f"{model}: rate limited"
                continue
            except openai.APITimeoutError:
                last_error = f"{model}: timeout"
                continue
            except openai.APIError as e:
                last_error = f"{model}: {e.message}"
                continue

        raise RuntimeError(f"All models failed. Last error: {last_error}")

# Usage
chain = FallbackChain([
    "claude-sonnet-4",     # Primary: best quality
    "gpt-4o",              # Fallback 1: different provider
    "gemini-2.0-flash"     # Fallback 2: fast and reliable
])

result, model_used = chain.complete(
    messages=[{"role": "user", "content": "Analyze this error log..."}],
    max_tokens=500
)
print(f"Response from {model_used}: {result}")

Key design decisions:

Pattern 2: A/B Testing Between Models

Before committing to a model change, run both models in parallel and compare results with real traffic. This pattern routes a percentage of traffic to a challenger model and collects quality metrics.

import random
import time
import json
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class ABTestResult:
    model: str
    response: str
    latency_ms: float
    input_tokens: int
    output_tokens: int

@dataclass
class ABTest:
    control_model: str
    challenger_model: str
    challenger_pct: float = 0.1  # 10% traffic to challenger
    results: dict = field(default_factory=lambda: {"control": [], "challenger": []})

    def route(self) -> str:
        if random.random() < self.challenger_pct:
            return self.challenger_model
        return self.control_model

    def complete_and_record(self, messages: list, **kwargs) -> ABTestResult:
        model = self.route()
        variant = "challenger" if model == self.challenger_model else "control"

        start = time.time()
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            **kwargs
        )
        latency = (time.time() - start) * 1000

        result = ABTestResult(
            model=model,
            response=response.choices[0].message.content,
            latency_ms=latency,
            input_tokens=response.usage.prompt_tokens,
            output_tokens=response.usage.completion_tokens
        )
        self.results[variant].append(result)
        return result

    def summary(self) -> dict:
        summary = {}
        for variant in ["control", "challenger"]:
            results = self.results[variant]
            if not results:
                continue
            summary[variant] = {
                "model": results[0].model,
                "count": len(results),
                "avg_latency_ms": sum(r.latency_ms for r in results) / len(results),
                "avg_output_tokens": sum(r.output_tokens for r in results) / len(results)
            }
        return summary

# Usage
test = ABTest(
    control_model="gpt-4o",
    challenger_model="claude-sonnet-4",
    challenger_pct=0.2  # Send 20% to challenger
)

To make A/B testing actionable, you need quality metrics beyond just latency. Consider logging responses and running periodic evaluation with a judge model or human review.

Pattern 3: Quality Scoring and Automatic Selection

For tasks where output quality varies significantly between models, use a lightweight judge to score responses and automatically select the best one.

import asyncio
from concurrent.futures import ThreadPoolExecutor

def generate_candidates(messages: list, models: list[str], **kwargs) -> list[dict]:
    """Generate responses from multiple models in parallel."""
    results = []

    def call_model(model):
        start = time.time()
        try:
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                **kwargs
            )
            return {
                "model": model,
                "response": response.choices[0].message.content,
                "latency_ms": (time.time() - start) * 1000,
                "tokens": response.usage.completion_tokens
            }
        except Exception as e:
            return {"model": model, "error": str(e)}

    with ThreadPoolExecutor(max_workers=len(models)) as executor:
        results = list(executor.map(call_model, models))

    return [r for r in results if "error" not in r]

def score_response(original_prompt: str, response: str) -> float:
    """Score a response using a fast judge model."""
    judge_response = client.chat.completions.create(
        model="gemini-2.0-flash",
        messages=[
            {
                "role": "system",
                "content": (
                    "Rate this AI response on a scale of 1-10. Consider:\n"
                    "- Accuracy and correctness\n"
                    "- Completeness (addresses the full question)\n"
                    "- Clarity and conciseness\n"
                    "Respond with only a number."
                )
            },
            {
                "role": "user",
                "content": f"Prompt: {original_prompt}\n\nResponse: {response}"
            }
        ],
        max_tokens=5,
        temperature=0
    )
    try:
        return float(judge_response.choices[0].message.content.strip())
    except ValueError:
        return 5.0  # Default middle score

def best_of_n(messages: list, models: list[str], **kwargs) -> dict:
    """Generate from multiple models, return the highest-scored response."""
    candidates = generate_candidates(messages, models, **kwargs)
    if not candidates:
        raise RuntimeError("All models failed")

    prompt = messages[-1]["content"] if messages else ""
    for candidate in candidates:
        candidate["score"] = score_response(prompt, candidate["response"])

    return max(candidates, key=lambda c: c["score"])

# Usage: get the best response from three models
best = best_of_n(
    messages=[{"role": "user", "content": "Write a technical summary of WebSockets."}],
    models=["claude-sonnet-4", "gpt-4o", "gemini-2.0-flash"]
)
print(f"Best response from {best['model']} (score: {best['score']}):")
print(best["response"])

This pattern costs more per request (you are calling multiple models plus a judge), so use it selectively for high-value outputs where quality matters more than cost.

Pattern 4: Graceful Degradation with Circuit Breakers

In production, you need more than simple fallbacks. A circuit breaker pattern prevents cascading failures by detecting when a model is consistently failing and temporarily removing it from the rotation.

import time
from dataclasses import dataclass
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Model disabled
    HALF_OPEN = "half_open" # Testing recovery

@dataclass
class CircuitBreaker:
    model: str
    failure_threshold: int = 5      # Failures before opening
    recovery_timeout: float = 60.0  # Seconds before testing recovery
    state: CircuitState = CircuitState.CLOSED
    failure_count: int = 0
    last_failure_time: float = 0

    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        return True  # HALF_OPEN: allow one test request

    def record_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

class ResilientMultiModel:
    def __init__(self, models: list[str]):
        self.breakers = {m: CircuitBreaker(model=m) for m in models}

    def complete(self, messages: list, **kwargs) -> tuple[str, str]:
        for model, breaker in self.breakers.items():
            if not breaker.can_execute():
                continue

            try:
                response = client.chat.completions.create(
                    model=model,
                    messages=messages,
                    **kwargs
                )
                breaker.record_success()
                return response.choices[0].message.content, model

            except (openai.RateLimitError, openai.APITimeoutError, openai.APIError):
                breaker.record_failure()
                continue

        raise RuntimeError("All models are in circuit-open state")

    def status(self) -> dict:
        return {
            model: {
                "state": breaker.state.value,
                "failures": breaker.failure_count
            }
            for model, breaker in self.breakers.items()
        }

# Usage
system = ResilientMultiModel([
    "claude-sonnet-4",
    "gpt-4o",
    "gemini-2.0-flash"
])

# In your request handler
result, model = system.complete(
    messages=[{"role": "user", "content": "Hello!"}]
)

# Monitor circuit breaker states
print(system.status())

Choosing the Right Pattern

Pattern When to Use Cost Impact Complexity
Fallback Chain Every production system None (only pays for successful call) Low
A/B Testing Before model migrations Low (small traffic percentage) Medium
Quality Scoring High-value outputs High (3x+ per request) Medium
Circuit Breaker High-availability systems None Medium

Start with the Fallback Chain. It is the highest-value, lowest-effort pattern. Add Circuit Breakers when you need high availability. Use A/B Testing before any model change. Reserve Quality Scoring for your most important outputs.

Why a Unified API Matters

All four patterns above depend on one thing: being able to call different models with the same code. Without a unified API, each pattern would require managing multiple SDKs, API keys, authentication methods, and response formats. TokenMix eliminates this complexity by providing a single OpenAI-compatible endpoint for every model.

This is not just a convenience -- it is an architectural enabler. Multi-model patterns become practical when switching models is a one-line change instead of a week-long integration project.

Production Checklist

Before deploying a multi-model system: