Router - Load Balancing, Fallbacks
LiteLLM manages:
- Load-balance across multiple deployments (e.g. Azure/OpenAI)
- Prioritizing important requests to ensure they don't fail (i.e. Queueing)
- Basic reliability logic - cooldowns, fallbacks, timeouts and retries (fixed + exponential backoff) across multiple deployments/providers.
In production, litellm supports using Redis as a way to track cooldown server and usage (managing tpm/rpm limits).
If you want a server to load balance across different LLM APIs, use our OpenAI Proxy Server
Load Balancing​
(s/o @paulpierre and sweep proxy for their contributions to this implementation) See Code
Quick Start​
from litellm import Router
model_list = [{ # list of model deployments 
    "model_name": "gpt-3.5-turbo", # model alias 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-v-2", # actual model name
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-functioncalling", 
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "gpt-3.5-turbo", 
        "api_key": os.getenv("OPENAI_API_KEY"),
    }
}]
router = Router(model_list=model_list)
# openai.ChatCompletion.create replacement
response = await router.acompletion(model="gpt-3.5-turbo", 
                messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
Available Endpoints​
- router.completion()- chat completions endpoint to call 100+ LLMs
- router.acompletion()- async chat completion calls
- router.embeddings()- embedding endpoint for Azure, OpenAI, Huggingface endpoints
- router.aembeddings()- async embeddings endpoint
- router.text_completion()- completion calls in the old OpenAI- /v1/completionsendpoint format
Advanced​
Routing Strategies - Weighted Pick, Rate Limit Aware​
Router provides 2 strategies for routing your calls across multiple deployments:
- Weighted Pick
- Rate-Limit Aware
Default Picks a deployment based on the provided Requests per minute (rpm) or Tokens per minute (tpm)
If rpm or tpm is not provided, it randomly picks a deployment
from litellm import Router 
import asyncio
model_list = [{ # list of model deployments 
    "model_name": "gpt-3.5-turbo", # model alias 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-v-2", # actual model name
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE"),
        "rpm": 900,         # requests per minute for this API
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-functioncalling", 
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE"),
        "rpm": 10,
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "gpt-3.5-turbo", 
        "api_key": os.getenv("OPENAI_API_KEY"),
        "rpm": 10,
    }
}]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
    response = await router.acompletion(
        model="gpt-3.5-turbo", 
        messages=[{"role": "user", "content": "Hey, how's it going?"}]
    )
    print(response)
    return response
asyncio.run(router_acompletion())
This will route to the deployment with the lowest TPM usage for that minute.
In production, we use Redis to track usage (TPM/RPM) across multiple deployments.
If you pass in the deployment's tpm/rpm limits, this will also check against that, and filter out any who's limits would be exceeded.
For Azure, your RPM = TPM/6.
from litellm import Router 
model_list = [{ # list of model deployments 
    "model_name": "gpt-3.5-turbo", # model alias 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-v-2", # actual model name
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    }, 
    "tpm": 100000,
    "rpm": 10000,
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-functioncalling", 
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    },
    "tpm": 100000,
    "rpm": 1000,
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "gpt-3.5-turbo", 
        "api_key": os.getenv("OPENAI_API_KEY"),
    },
    "tpm": 100000,
    "rpm": 1000,
}]
router = Router(model_list=model_list, 
                redis_host=os.environ["REDIS_HOST"], 
                redis_password=os.environ["REDIS_PASSWORD"], 
                redis_port=os.environ["REDIS_PORT"], 
                routing_strategy="simple-shuffle")
response = await router.acompletion(model="gpt-3.5-turbo", 
                messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
Basic Reliability​
Timeouts​
The timeout set in router is for the entire length of the call, and is passed down to the completion() call level as well.
from litellm import Router 
model_list = [{...}]
router = Router(model_list=model_list, 
                timeout=30) # raise timeout error if call takes > 30s 
print(response)
Cooldowns​
Set the limit for how many calls a model is allowed to fail in a minute, before being cooled down for a minute.
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list, 
                allowed_fails=1) # cooldown model if it fails > 1 call in a minute. 
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call 
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Retries​
For both async + sync functions, we support retrying failed requests.
For RateLimitError we implement exponential backoffs
For generic errors, we retry immediately
Here's a quick look at how we can set num_retries = 3: 
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,  
                num_retries=3)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call 
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Fallbacks​
If a call fails after num_retries, fall back to another model group.
If the error is a context window exceeded error, fall back to a larger model group (if given).
from litellm import Router
model_list = [
    { # list of model deployments 
        "model_name": "azure/gpt-3.5-turbo", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "azure/chatgpt-v-2", 
            "api_key": "bad-key",
            "api_version": os.getenv("AZURE_API_VERSION"),
            "api_base": os.getenv("AZURE_API_BASE")
        },
        "tpm": 240000,
        "rpm": 1800
    }, 
    { # list of model deployments 
        "model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "azure/chatgpt-v-2", 
            "api_key": "bad-key",
            "api_version": os.getenv("AZURE_API_VERSION"),
            "api_base": os.getenv("AZURE_API_BASE")
        },
        "tpm": 240000,
        "rpm": 1800
    }, 
    {
        "model_name": "azure/gpt-3.5-turbo", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "azure/chatgpt-functioncalling", 
            "api_key": "bad-key",
            "api_version": os.getenv("AZURE_API_VERSION"),
            "api_base": os.getenv("AZURE_API_BASE")
        },
        "tpm": 240000,
        "rpm": 1800
    }, 
    {
        "model_name": "gpt-3.5-turbo", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "gpt-3.5-turbo", 
            "api_key": os.getenv("OPENAI_API_KEY"),
        },
        "tpm": 1000000,
        "rpm": 9000
    },
    {
        "model_name": "gpt-3.5-turbo-16k", # openai model name 
        "litellm_params": { # params for litellm completion/embedding call 
            "model": "gpt-3.5-turbo-16k", 
            "api_key": os.getenv("OPENAI_API_KEY"),
        },
        "tpm": 1000000,
        "rpm": 9000
    }
]
router = Router(model_list=model_list, 
                fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}], 
                context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
                set_verbose=True)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal fallback call 
response = router.completion(model="azure/gpt-3.5-turbo", messages=messages)
# context window fallback call
response = router.completion(model="azure/gpt-3.5-turbo-context-fallback", messages=messages)
print(f"response: {response}")
Caching​
In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching.
In-memory Cache
router = Router(model_list=model_list, 
                cache_responses=True)
print(response)
Redis Cache
router = Router(model_list=model_list, 
                redis_host=os.getenv("REDIS_HOST"), 
                redis_password=os.getenv("REDIS_PASSWORD"), 
                redis_port=os.getenv("REDIS_PORT"),
                cache_responses=True)
print(response)
Pass in Redis URL, additional kwargs
router = Router(model_list: Optional[list] = None,
                 ## CACHING ## 
                 redis_url=os.getenv("REDIS_URL")",
                 cache_kwargs= {}, # additional kwargs to pass to RedisCache (see caching.py)
                 cache_responses=True)
Default litellm.completion/embedding params​
You can also set default params for litellm completion/embedding calls. Here's how to do that:
from litellm import Router
fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}
router = Router(model_list=model_list, 
                default_litellm_params={"context_window_fallback_dict": fallback_dict})
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call 
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Deploy Router​
If you want a server to load balance across different LLM APIs, use our OpenAI Proxy Server
Queuing (Beta)​
Never fail a request due to rate limits
The LiteLLM Queuing endpoints can handle 100+ req/s. We use Celery workers to process requests.
This is pretty new, and might have bugs. Any contributions to improving our implementation are welcome
Quick Start​
- Add Redis credentials in a .env file
REDIS_HOST="my-redis-endpoint"
REDIS_PORT="my-redis-port"
REDIS_PASSWORD="my-redis-password" # [OPTIONAL] if self-hosted
REDIS_USERNAME="default" # [OPTIONAL] if self-hosted
- Start litellm server with your model config
$ litellm --config /path/to/config.yaml --use_queue
Here's an example config for gpt-3.5-turbo
config.yaml (This will load balance between OpenAI + Azure endpoints)
model_list: 
  - model_name: gpt-3.5-turbo
    litellm_params: 
      model: gpt-3.5-turbo
      api_key: 
  - model_name: gpt-3.5-turbo
    litellm_params: 
      model: azure/chatgpt-v-2 # actual model name
      api_key: 
      api_version: 2023-07-01-preview
      api_base: https://openai-gpt-4-test-v-1.openai.azure.com/
- Test (in another window) → sends 100 simultaneous requests to the queue
$ litellm --test_async --num_requests 100
Available Endpoints​
- /queue/request- Queues a /chat/completions request. Returns a job id.
- /queue/response/{id}- Returns the status of a job. If completed, returns the response as well. Potential status's are:- queuedand- finished.
Hosted Request Queing api.litellm.ai​
Queue your LLM API requests to ensure you're under your rate limits
- Step 1: Step 1 Add a config to the proxy, generate a temp key
- Step 2: Queue a request to the proxy, using your generated_key
- Step 3: Poll the request
Step 1 Add a config to the proxy, generate a temp key​
import requests
import time
import os
# Set the base URL as needed
base_url = "https://api.litellm.ai"
# Step 1 Add a config to the proxy, generate a temp key
# use the same model_name to load balance
config = {
  "model_list": [
    {
      "model_name": "gpt-3.5-turbo",
      "litellm_params": {
        "model": "gpt-3.5-turbo",
        "api_key": os.environ['OPENAI_API_KEY'],
      }
    },
    {
      "model_name": "gpt-3.5-turbo",
      "litellm_params": {
        "model": "azure/chatgpt-v-2",
        "api_key": "",
        "api_base": "https://openai-gpt-4-test-v-1.openai.azure.com/",
        "api_version": "2023-07-01-preview"
      }
    }
  ]
}
response = requests.post(
    url=f"{base_url}/key/generate",
    json={
        "config": config,
        "duration": "30d"  # default to 30d, set it to 30m if you want a temp 30 minute key
    },
    headers={
        "Authorization": "Bearer sk-hosted-litellm" # this is the key to use api.litellm.ai
    }
)
print("\nresponse from generating key", response.text)
print("\n json response from gen key", response.json())
generated_key = response.json()["key"]
print("\ngenerated key for proxy", generated_key)
Output​
response from generating key {"key":"sk-...,"expires":"2023-12-22T03:43:57.615000+00:00"}
Step 2: Queue a request to the proxy, using your generated_key​
print("Creating a job on the proxy")
job_response = requests.post(
    url=f"{base_url}/queue/request",
    json={
        'model': 'gpt-3.5-turbo',
        'messages': [
            {'role': 'system', 'content': f'You are a helpful assistant. What is your name'},
        ],
    },
    headers={
        "Authorization": f"Bearer {generated_key}"
    }
)
print(job_response.status_code)
print(job_response.text)
print("\nResponse from creating job", job_response.text)
job_response = job_response.json()
job_id = job_response["id"]
polling_url = job_response["url"]
polling_url = f"{base_url}{polling_url}"
print("\nCreated Job, Polling Url", polling_url)
Output​
Response from creating job 
{"id":"0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7","url":"/queue/response/0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7","eta":5,"status":"queued"}
Step 3: Poll the request​
while True:
    try:
        print("\nPolling URL", polling_url)
        polling_response = requests.get(
            url=polling_url,
            headers={
                "Authorization": f"Bearer {generated_key}"
            }
        )
        print("\nResponse from polling url", polling_response.text)
        polling_response = polling_response.json()
        status = polling_response.get("status", None)
        if status == "finished":
            llm_response = polling_response["result"]
            print("LLM Response")
            print(llm_response)
            break
        time.sleep(0.5)
    except Exception as e:
        print("got exception in polling", e)
        break
Output​
Polling URL https://api.litellm.ai/queue/response/0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7
Response from polling url {"status":"queued"}
Polling URL https://api.litellm.ai/queue/response/0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7
Response from polling url {"status":"queued"}
Polling URL https://api.litellm.ai/queue/response/0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7
Response from polling url 
{"status":"finished","result":{"id":"chatcmpl-8NYRce4IeI4NzYyodT3NNp8fk5cSW","choices":[{"finish_reason":"stop","index":0,"message":{"content":"I am an AI assistant and do not have a physical presence or personal identity. You can simply refer to me as \"Assistant.\" How may I assist you today?","role":"assistant"}}],"created":1700624639,"model":"gpt-3.5-turbo-0613","object":"chat.completion","system_fingerprint":null,"usage":{"completion_tokens":33,"prompt_tokens":17,"total_tokens":50}}}