微信扫码
添加专属顾问
我要投稿
使用Nginx对LLM服务进行负载均衡,实现高并发场景下的性能优化。 核心内容: 1. Nginx作为高性能Web服务器的简介及其特点 2. Nginx的安装步骤及配置文件编辑 3. 负载均衡算法介绍及测试负载均衡的chat服务实例
需要将请求分发到不同的节点进行处理,让每个节点的负载在合适的水平,这就是负载均衡。
nginx
是一款开源的、高性能的 Web 服务器,同时也广泛用作 反向代理服务器、负载均衡器 和 HTTP 缓存。 它的设计目标是解决传统服务器(如 Apache)在高并发场景下的性能瓶颈,现已成为全球最流行的 Web 服务器之一。
特点:
高性能:基于事件驱动的异步架构,单机支持数万并发连接。
轻量级:内存占用低,配置简单。
算法灵活:轮询(Round Robin)、加权轮询(Weighted)、IP Hash、最少连接(Least Connections)等。
以 ubuntu 为例
sudo apt install nginx
编辑配置文件 /etc/nginx/nginx.conf
,向 http 添加以下内容
http {
upstream backend {
least_conn; # 均衡算法
server 127.0.0.1:8001; # 后端服务1
server 127.0.0.1:8002; # 后端服务2
}
server {
listen 80;
location / {
proxy_pass http://backend;
}
}
log_format upstream_log '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'to: $upstream_addr';
access_log /var/log/nginx/upstream.log upstream_log;
}
完整配置如下:
# cat /etc/nginx/nginx.conf
user www-data;
worker_processes auto;
pid /run/nginx.pid;
include /etc/nginx/modules-enabled/*.conf;
events {
worker_connections 768;
# multi_accept on;
}
http {
##
# Basic Settings
##
sendfile on;
tcp_nopush on;
types_hash_max_size 2048;
# server_tokens off;
# server_names_hash_bucket_size 64;
# server_name_in_redirect off;
include /etc/nginx/mime.types;
default_type application/octet-stream;
##
# Logging Settings
##
log_format upstream_log '$remote_addr:$remote_port $request_uri - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'to: $upstream_addr';
access_log /var/log/nginx/access.log upstream_log;
error_log /var/log/nginx/error.log;
##
# SSL Settings
##
ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3; # Dropping SSLv3, ref: POODLE
ssl_prefer_server_ciphers on;
##
# Gzip Settings
##
gzip on;
# gzip_vary on;
# gzip_proxied any;
# gzip_comp_level 6;
# gzip_buffers 16 8k;
# gzip_http_version 1.1;
# gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
##
# Upstream Servers
##
upstream backend {
least_conn;
server 127.0.0.1:8001; # 后端服务1
server 127.0.0.1:8002; # 后端服务2
}
##
# Server Blocks
##
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
#proxy_http_version 1.1; # 确保使用 HTTP/1.1 <button class="citation-flag" data-index="7">
#proxy_set_header Connection '';
}
}
add_header X-Upstream $upstream_addr always; # 在响应头中暴露后端地址
##
# Virtual Host Configs
##
include /etc/nginx/conf.d/*.conf;
include /etc/nginx/sites-enabled/*;
}
#mail {
# # See sample authentication script at:
# # http://wiki.nginx.org/ImapAuthenticateWithApachePhpScript
#
# # auth_http localhost/auth.php;
# # pop3_capabilities "TOP" "USER";
# # imap_capabilities "IMAP4rev1" "UIDPLUS";
#
# server {
# listen localhost:110;
# protocol pop3;
# proxy on;
# }
#
# server {
# listen localhost:143;
# protocol imap;
# proxy on;
# }
#}
编写完配置,systemctl reload nginx
重启,nginx -t
确认配置ok
以上配置中 least_conn
是均衡算法,可以有多种算法可选
upstream backend {
server 127.0.0.1:8001 weight=3; # 60%的请求
server 127.0.0.1:8002 weight=2; # 40%的请求
}
upstream backend {
least_conn;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
}
upstream backend {
ip_hash;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
}
❝一致性哈希通过哈希环和虚拟节点机制,有效解决了传统哈希算法在动态环境下的数据迁移和负载不均问题。 其核心在于减少节点变动的影响范围,并利用虚拟节点实现数据分布的平衡性,是分布式系统中实现高可用和可扩展性的关键技术
upstream backend {
hash $request_uri consistent; # 按请求URI分配
server 127.0.0.1:8001;
server 127.0.0.1:8002;
}
所有算法均可配合权重使用:
server 127.0.0.1:8001 weight=5 max_fails=3 fail_timeout=30s;
开启两个http服务(端口 8001,8002),使用python脚本 server.py
启动
# server.py
from http.server import BaseHTTPRequestHandler, HTTPServer
import socket
class DebugRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# 打印客户端信息
client_ip, client_port = self.client_address
print(f"\n--- 新请求 @{self.server.server_port} ---")
print(f"[客户端] {client_ip}:{client_port}")
print(f"[方法] {self.command}")
print(f"[路径] {self.path}")
print(f"[请求头] {self.headers}")
# 返回响应
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(f"响应来自: {self.server.server_port}".encode())
def do_POST(self):
# 处理 POST 请求
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length).decode("utf-8")
print(f"\n--- POST 请求 @{self.server.server_port} ---")
print(f"[数据] {post_data}")
self.do_GET() # 复用 GET 响应逻辑
def run(server_class=HTTPServer, handler_class=DebugRequestHandler, port=8001):
server_address = ("0.0.0.0", port) # 绑定到所有接口
httpd = server_class(server_address, handler_class)
print(f"服务启动于 0.0.0.0:{port}...")
httpd.serve_forever()
if __name__ == "__main__":
# 启动两个服务实例(分别在 8001 和 8002 端口)
import threading
threading.Thread(target=run, kwargs={"port": 8001}).start()
threading.Thread(target=run, kwargs={"port": 8002}).start()
python3 server.py
压力测试
apt install apache2-utils
ab -n 1000 -c 10 http://localhost/
ab
是 Apache Bench 的缩写,它是 Apache HTTP 服务器项目中的一个性能测试工具,用于对 Web 服务器发起压力测试。
命令分解
ab
**:工具名称(Apache Bench)。-n 1000
**:总请求数为 1000(-n
表示 number of requests
)。-c 10
**:并发用户数为 10(-c
表示 concurrency
,即同时发送的请求数)。http://localhost/
**:目标测试地址(可以是本地服务或远程 URL)。模拟 10 个并发用户 同时访问 http://localhost/
,每个用户连续发送请求,直到总请求数达到 1000。通过该命令可以测试:
示例输出解读
运行命令后,输出结果示例如下:
This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests
Server Software: nginx/1.18.0
Server Hostname: localhost
Server Port: 80
Document Path: /
Document Length: 18 bytes
Concurrency Level: 10
Time taken for tests: 1.201 seconds
Complete requests: 1000
Failed requests: 0
Total transferred: 178000 bytes
HTML transferred: 18000 bytes
Requests per second: 832.30 [#/sec] (mean)
Time per request: 12.015 [ms] (mean)
Time per request: 1.201 [ms] (mean, across all concurrent requests)
Transfer rate: 144.68 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.0 0 1
Processing: 2 12 1.9 12 26
Waiting: 1 12 1.9 12 25
Total: 2 12 1.9 12 26
Percentage of the requests served within a certain time (ms)
50% 12
66% 12
75% 13
80% 13
90% 14
95% 15
98% 16
99% 18
100% 26 (longest request)
关键指标
Requests per second:813.01 [#/sec]
表示服务器每秒能处理 832 个请求(吞吐量越高,性能越好)。
Time per request:
12.015 [ms] (mean)
:每个请求的平均耗时(从用户角度看)。1.201 [ms] (mean, across all concurrent requests)
:服务器平均处理每个请求的时间。Connection Times:
显示连接、处理、等待时间的分布(单位:毫秒)。
Percentage served within a certain time:
响应时间的分布情况(如 90% 的请求在 14ms 内完成)。
服务端的日志输出:我们去统计下两个服务端的被请求次数:
tail -f /var/log/nginx/access.log
127.0.0.1:48878 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8001
127.0.0.1:48882 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8002
127.0.0.1:48892 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8001
127.0.0.1:48904 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8002
root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 1000 | grep ":8001" | wc -l
506
root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 1000 | grep ":8002" | wc -l
494
或者
root@MichaelMing:~# awk '{print $NF}' /var/log/nginx/access.log | sort | uniq -c
300 "ApacheBench/2.3"
3 "curl/7.81.0"
93576 -
984 127.0.0.1:8001
1018 127.0.0.1:8002
我们去查看两个端口号,发现两个端口的服务被请求的次数是基本均衡的(506/494),Nginx起到了负载均衡的作用
假设 MichaelAI
有两个 api 接口 chat
、completions
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from urllib.parse import urlparse, parse_qs
from datetime import datetime
class MultiAPIHandler(BaseHTTPRequestHandler):
def send_json_response(self, data, status=200):
self.send_response(status)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode("utf-8"))
def do_POST(self):
# 统一入口路由分发
parsed_path = urlparse(self.path)
path = parsed_path.path
try:
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length).decode("utf-8")
print(f"\n--- {path} 请求 @{self.server.server_port} ---")
print(f"[客户端] {self.client_address[0]}:{self.client_address[1]}")
print(f"[数据] {post_data}")
if path == "/chat":
self.handle_chat(post_data)
elif path == "/completions":
self.handle_completions(post_data)
else:
self.send_error(404, "API not found")
except Exception as e:
self.send_json_response({
"error": str(e),
"server": self.server.server_port,
"timestamp": datetime.now().isoformat()
}, 400)
def handle_chat(self, data):
# 模拟对话接口处理
try:
input_data = json.loads(data)
message = input_data.get("message", "")
response = {
"original": message,
"response": f"Processed by chat API: {message.upper()}",
"server": self.server.server_port,
"timestamp": datetime.now().isoformat()
}
self.send_json_response(response)
except json.JSONDecodeError:
raise ValueError("Invalid JSON format")
def handle_completions(self, data):
# 文本补全接口
response = {
"completions": [
{"text": f"Completion 1: {data[:5]}...", "index": 0},
{"text": f"Completion 2: {data[-5:]}...", "index": 1}
],
"server": self.server.server_port,
"timestamp": datetime.now().isoformat()
}
self.send_json_response(response)
def run(server_class=HTTPServer, handler_class=MultiAPIHandler, port=8001):
server_address = ("0.0.0.0", port)
httpd = server_class(server_address, handler_class)
print(f"多API服务启动于 0.0.0.0:{port}...")
httpd.serve_forever()
if __name__ == "__main__":
import threading
# 启动两个服务实例
threading.Thread(target=run, kwargs={"port": 8001}).start()
threading.Thread(target=run, kwargs={"port": 8002}).start()
❝chat_data.json 示例: {"message": "Hello, this is a test message"}
ab -n 100 -c 10 -T "application/json" -p json_data \
http://localhost:8001/chat
服务端日志
--- /chat 请求 @8001 ---
[客户端] 127.0.0.1:54724
[数据] {"message": "Hello, this is a test message"}
127.0.0.1 - - [02/Mar/2025 22:41:22] "POST /chat HTTP/1.0" 200 -
text.txt 示例:
你好,我是Michael阿明开发的智能助手!
ab -n 10000 -c 100 -T "text/plain" -p text.txt \
http://localhost:8002/completions
服务端日志
--- /completions 请求 @8002 ---
[客户端] 127.0.0.1:47246
[数据] 你好,我是Michael阿明开发的智能助手!
127.0.0.1 - - [02/Mar/2025 22:49:26] "POST /completions HTTP/1.0" 200
# 同时压测两个接口
echo "chat接口压测结果:" && \
ab -n 500 -c 50 -T "application/json" -p json_data http://localhost:80/chat && \
echo "completions接口压测结果:" && \
ab -n 500 -c 50 -T "text/plain" -p text.txt http://localhost:80/completions
completions 接口调用分布:两个机器基本接近
root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 20000 | grep "completions" | grep 8001 | wc -l
747
root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 20000 | grep "completions" | grep 8002 | wc -l
755
import requests
import asyncio
import aiohttp
# 同步客户端(基于requests)
class SyncAPIClient:
def __init__(self, base_url):
self.base_url = base_url
def call_chat(self, message):
url = f"{self.base_url}/chat"
data = {"message": message}
response = requests.post(url, json=data)
return response.json()
def call_completions(self, text):
url = f"{self.base_url}/completions"
response = requests.post(url, data=text)
return response.json()
# 异步客户端(基于aiohttp)
class AsyncAPIClient:
def __init__(self, base_url):
self.base_url = base_url
async def call_chat(self, message):
url = f"{self.base_url}/chat"
data = {"message": message}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
return await response.json()
async def call_completions(self, text):
url = f"{self.base_url}/completions"
async with aiohttp.ClientSession() as session:
async with session.post(url, data=text) as response:
return await response.json()
# 同步调用示例
sync_client = SyncAPIClient("http://localhost")
chat_response = sync_client.call_chat("Hello, sync chat!")
print("Chat响应:", chat_response)
completion_response = sync_client.call_completions("test text")
print("Completions响应:", completion_response)
# 异步调用示例
async def main():
async_client = AsyncAPIClient("http://localhost")
# 并发调用两个接口
chat_task = asyncio.create_task(async_client.call_chat("Hello, async chat!"))
completion_task = asyncio.create_task(async_client.call_completions("async test text"))
results = await asyncio.gather(chat_task, completion_task)
print("异步Chat响应:", results[0])
print("异步Completions响应:", results[1])
asyncio.run(main())
输出:
root@MichaelMing:~# python3 /mnt/d/opt/client.py
Chat响应: {'original': 'Hello, sync chat!', 'response': 'Processed by chat API: HELLO, SYNC CHAT!', 'server': 8001, 'timestamp': '2025-03-02T23:11:54.426626'}
Completions响应: {'completions': [{'text': 'Completion 1: test ...', 'index': 0}, {'text': 'Completion 2: text...', 'index': 1}], 'server': 8002, 'timestamp': '2025-03-02T23:11:54.430562'}
异步Chat响应: {'original': 'Hello, async chat!', 'response': 'Processed by chat API: HELLO, ASYNC CHAT!', 'server': 8001, 'timestamp': '2025-03-02T23:11:54.444508'}
异步Completions响应: {'completions': [{'text': 'Completion 1: async...', 'index': 0}, {'text': 'Completion 2: text...', 'index': 1}], 'server': 8002, 'timestamp': '2025-03-02T23:11:54.444973'}
用代码进行压力测试
# 并发压测示例(使用异步客户端)
async def stress_test():
client = AsyncAPIClient("http://localhost")
tasks = []
for _ in range(100):
tasks.append(client.call_chat("test message"))
tasks.append(client.call_completions("test text"))
responses = await asyncio.gather(*tasks)
print(f"成功处理 {len(responses)} 个请求")
asyncio.run(stress_test())
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-03-06
我让DeepSeek设计了一个智能运维平台
2025-03-06
一个指令完成所有工作!Manus AI 预示了未来几年Agent的疯狂发展
2025-03-06
华为昇腾DeepSeek一体机深度拆解(附核心标的)
2025-03-05
DeepSeek 给 API 网关上了一波热度
2025-03-05
star 34.6k!通过DeepSeek实现AI自动化操作浏览器!
2025-03-05
DeepSeek时代:关于AI服务器的技术思考(PCIe篇)
2025-03-04
详细版教程|使用 Higress AI 网关,4步实现 DeepSeek 联网搜索功能
2025-03-04
AI全息透明显示音箱,会是音箱市场的“哪吒”吗?
2024-03-30
2024-05-09
2024-07-07
2024-07-23
2024-06-23
2024-07-01
2024-06-24
2024-10-20
2024-06-08
2024-12-29