Router - Load Balancing, Fallbacks
LiteLLM manages:
- Load-balance across multiple deployments (e.g. Azure/OpenAI)
- Prioritizing important requests to ensure they don't fail (i.e. Queueing)
- Basic reliability logic - cooldowns, fallbacks, timeouts and retries (fixed + exponential backoff) across multiple deployments/providers.
In production, litellm supports using Redis as a way to track cooldown server and usage (managing tpm/rpm limits).
If you want a server to load balance across different LLM APIs, use our OpenAI Proxy Server
Load Balancing​
(s/o @paulpierre and sweep proxy for their contributions to this implementation) See Code
Quick Start​
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}]
router = Router(model_list=model_list)
# openai.ChatCompletion.create replacement
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
Available Endpoints​
router.completion()
- chat completions endpoint to call 100+ LLMsrouter.acompletion()
- async chat completion callsrouter.embeddings()
- embedding endpoint for Azure, OpenAI, Huggingface endpointsrouter.aembeddings()
- async embeddings endpointrouter.text_completion()
- completion calls in the old OpenAI/v1/completions
endpoint format
Advanced​
Routing Strategies - Weighted Pick, Rate Limit Aware​
Router provides 2 strategies for routing your calls across multiple deployments:
- Weighted Pick
- Rate-Limit Aware
Default Picks a deployment based on the provided Requests per minute (rpm) or Tokens per minute (tpm)
If rpm
or tpm
is not provided, it randomly picks a deployment
from litellm import Router
import asyncio
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 900, # requests per minute for this API
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 10,
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"rpm": 10,
}
}]
# init router
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
This will route to the deployment with the lowest TPM usage for that minute.
In production, we use Redis to track usage (TPM/RPM) across multiple deployments.
If you pass in the deployment's tpm/rpm limits, this will also check against that, and filter out any who's limits would be exceeded.
For Azure, your RPM = TPM/6.
from litellm import Router
model_list = [{ # list of model deployments
"model_name": "gpt-3.5-turbo", # model alias
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2", # actual model name
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 10000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 1000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 1000,
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="simple-shuffle")
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
Basic Reliability​
Timeouts​
The timeout set in router is for the entire length of the call, and is passed down to the completion() call level as well.
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
timeout=30) # raise timeout error if call takes > 30s
print(response)
Cooldowns​
Set the limit for how many calls a model is allowed to fail in a minute, before being cooled down for a minute.
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
allowed_fails=1) # cooldown model if it fails > 1 call in a minute.
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Retries​
For both async + sync functions, we support retrying failed requests.
For RateLimitError we implement exponential backoffs
For generic errors, we retry immediately
Here's a quick look at how we can set num_retries = 3
:
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Fallbacks​
If a call fails after num_retries, fall back to another model group.
If the error is a context window exceeded error, fall back to a larger model group (if given).
from litellm import Router
model_list = [
{ # list of model deployments
"model_name": "azure/gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{ # list of model deployments
"model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{
"model_name": "azure/gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-functioncalling",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 1000000,
"rpm": 9000
},
{
"model_name": "gpt-3.5-turbo-16k", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "gpt-3.5-turbo-16k",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 1000000,
"rpm": 9000
}
]
router = Router(model_list=model_list,
fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
set_verbose=True)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal fallback call
response = router.completion(model="azure/gpt-3.5-turbo", messages=messages)
# context window fallback call
response = router.completion(model="azure/gpt-3.5-turbo-context-fallback", messages=messages)
print(f"response: {response}")
Caching​
In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching.
In-memory Cache
router = Router(model_list=model_list,
cache_responses=True)
print(response)
Redis Cache
router = Router(model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)
print(response)
Pass in Redis URL, additional kwargs
router = Router(model_list: Optional[list] = None,
## CACHING ##
redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {}, # additional kwargs to pass to RedisCache (see caching.py)
cache_responses=True)
Default litellm.completion/embedding params​
You can also set default params for litellm completion/embedding calls. Here's how to do that:
from litellm import Router
fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}
router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
# normal call
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Deploy Router​
If you want a server to load balance across different LLM APIs, use our OpenAI Proxy Server
Queuing (Beta)​
Never fail a request due to rate limits
The LiteLLM Queuing endpoints can handle 100+ req/s. We use Celery workers to process requests.
This is pretty new, and might have bugs. Any contributions to improving our implementation are welcome
Quick Start​
- Add Redis credentials in a .env file
REDIS_HOST="my-redis-endpoint"
REDIS_PORT="my-redis-port"
REDIS_PASSWORD="my-redis-password" # [OPTIONAL] if self-hosted
REDIS_USERNAME="default" # [OPTIONAL] if self-hosted
- Start litellm server with your model config
$ litellm --config /path/to/config.yaml --use_queue
Here's an example config for gpt-3.5-turbo
config.yaml (This will load balance between OpenAI + Azure endpoints)
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: gpt-3.5-turbo
api_key:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2 # actual model name
api_key:
api_version: 2023-07-01-preview
api_base: https://openai-gpt-4-test-v-1.openai.azure.com/
- Test (in another window) → sends 100 simultaneous requests to the queue
$ litellm --test_async --num_requests 100
Available Endpoints​
/queue/request
- Queues a /chat/completions request. Returns a job id./queue/response/{id}
- Returns the status of a job. If completed, returns the response as well. Potential status's are:queued
andfinished
.
Hosted Request Queing api.litellm.ai​
Queue your LLM API requests to ensure you're under your rate limits
- Step 1: Step 1 Add a config to the proxy, generate a temp key
- Step 2: Queue a request to the proxy, using your generated_key
- Step 3: Poll the request
Step 1 Add a config to the proxy, generate a temp key​
import requests
import time
import os
# Set the base URL as needed
base_url = "https://api.litellm.ai"
# Step 1 Add a config to the proxy, generate a temp key
# use the same model_name to load balance
config = {
"model_list": [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.environ['OPENAI_API_KEY'],
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": "",
"api_base": "https://openai-gpt-4-test-v-1.openai.azure.com/",
"api_version": "2023-07-01-preview"
}
}
]
}
response = requests.post(
url=f"{base_url}/key/generate",
json={
"config": config,
"duration": "30d" # default to 30d, set it to 30m if you want a temp 30 minute key
},
headers={
"Authorization": "Bearer sk-hosted-litellm" # this is the key to use api.litellm.ai
}
)
print("\nresponse from generating key", response.text)
print("\n json response from gen key", response.json())
generated_key = response.json()["key"]
print("\ngenerated key for proxy", generated_key)
Output​
response from generating key {"key":"sk-...,"expires":"2023-12-22T03:43:57.615000+00:00"}
Step 2: Queue a request to the proxy, using your generated_key​
print("Creating a job on the proxy")
job_response = requests.post(
url=f"{base_url}/queue/request",
json={
'model': 'gpt-3.5-turbo',
'messages': [
{'role': 'system', 'content': f'You are a helpful assistant. What is your name'},
],
},
headers={
"Authorization": f"Bearer {generated_key}"
}
)
print(job_response.status_code)
print(job_response.text)
print("\nResponse from creating job", job_response.text)
job_response = job_response.json()
job_id = job_response["id"]
polling_url = job_response["url"]
polling_url = f"{base_url}{polling_url}"
print("\nCreated Job, Polling Url", polling_url)
Output​
Response from creating job
{"id":"0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7","url":"/queue/response/0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7","eta":5,"status":"queued"}
Step 3: Poll the request​
while True:
try:
print("\nPolling URL", polling_url)
polling_response = requests.get(
url=polling_url,
headers={
"Authorization": f"Bearer {generated_key}"
}
)
print("\nResponse from polling url", polling_response.text)
polling_response = polling_response.json()
status = polling_response.get("status", None)
if status == "finished":
llm_response = polling_response["result"]
print("LLM Response")
print(llm_response)
break
time.sleep(0.5)
except Exception as e:
print("got exception in polling", e)
break
Output​
Polling URL https://api.litellm.ai/queue/response/0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7
Response from polling url {"status":"queued"}
Polling URL https://api.litellm.ai/queue/response/0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7
Response from polling url {"status":"queued"}
Polling URL https://api.litellm.ai/queue/response/0e3d9e98-5d56-4d07-9cc8-c34b7e6658d7
Response from polling url
{"status":"finished","result":{"id":"chatcmpl-8NYRce4IeI4NzYyodT3NNp8fk5cSW","choices":[{"finish_reason":"stop","index":0,"message":{"content":"I am an AI assistant and do not have a physical presence or personal identity. You can simply refer to me as \"Assistant.\" How may I assist you today?","role":"assistant"}}],"created":1700624639,"model":"gpt-3.5-turbo-0613","object":"chat.completion","system_fingerprint":null,"usage":{"completion_tokens":33,"prompt_tokens":17,"total_tokens":50}}}