AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


使用 Nginx 对 LLM 服务进行负载均衡实践

发布日期:2025-03-04 18:58:41 浏览次数: 1562 来源:Michael阿明
推荐语

使用Nginx对LLM服务进行负载均衡,实现高并发场景下的性能优化。

核心内容:
1. Nginx作为高性能Web服务器的简介及其特点
2. Nginx的安装步骤及配置文件编辑
3. 负载均衡算法介绍及测试负载均衡的chat服务实例

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家
    想象一下有很多的算法服务实例,负载不均衡会造成:


    • 有的节点计算压力很重,用户延迟变长
    • 有的节点在闲置,资源浪费

    需要将请求分发到不同的节点进行处理,让每个节点的负载在合适的水平,这就是负载均衡。

    1. 简介

    nginx 是一款开源的、高性能的 Web 服务器,同时也广泛用作 反向代理服务器、负载均衡器 和 HTTP 缓存。 它的设计目标是解决传统服务器(如 Apache)在高并发场景下的性能瓶颈,现已成为全球最流行的 Web 服务器之一。

    特点:

    • 高性能:基于事件驱动的异步架构,单机支持数万并发连接。

    • 轻量级:内存占用低,配置简单。

    • 算法灵活:轮询(Round Robin)、加权轮询(Weighted)、IP Hash、最少连接(Least Connections)等。

    2. Nginx安装

    以 ubuntu 为例

    sudo apt install nginx

    编辑配置文件 /etc/nginx/nginx.conf,向 http 添加以下内容

    http {
        upstream backend {
            least_conn;  # 均衡算法
            server 127.0.0.1:8001;  # 后端服务1
            server 127.0.0.1:8002;  # 后端服务2
        }
        server {
            listen 80;
            location / {
                proxy_pass http://backend;
            }
        }
        
        log_format upstream_log '$remote_addr - $remote_user [$time_local] "$request" '
                               '$status $body_bytes_sent "$http_referer" '
                               '"$http_user_agent" "$http_x_forwarded_for" '
                               'to: $upstream_addr';

        access_log /var/log/nginx/upstream.log upstream_log;
    }

    完整配置如下:

    # cat /etc/nginx/nginx.conf
    user www-data;
    worker_processes auto;
    pid /run/nginx.pid;
    include /etc/nginx/modules-enabled/*.conf;

    events {
            worker_connections 768;
            # multi_accept on;
    }

    http {
        ##
        # Basic Settings
        ##

        sendfile on;
        tcp_nopush on;
        types_hash_max_size 2048;
        # server_tokens off;

        # server_names_hash_bucket_size 64;
        # server_name_in_redirect off;

        include /etc/nginx/mime.types;
        default_type application/octet-stream;

        ##
        # Logging Settings
        ##

        log_format upstream_log '$remote_addr:$remote_port $request_uri - $remote_user [$time_local] "$request" '
                               '$status $body_bytes_sent "$http_referer" '
                               '"$http_user_agent" "$http_x_forwarded_for" '
                               'to: $upstream_addr';

        access_log /var/log/nginx/access.log upstream_log;
        error_log /var/log/nginx/error.log;

        ##
        # SSL Settings
        ##

        ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3; # Dropping SSLv3, ref: POODLE
        ssl_prefer_server_ciphers on;

        ##
        # Gzip Settings
        ##

        gzip on;
        # gzip_vary on;
        # gzip_proxied any;
        # gzip_comp_level 6;
        # gzip_buffers 16 8k;
        # gzip_http_version 1.1;
        # gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;

        ##
        # Upstream Servers
        ##

        upstream backend {
            least_conn;
            server 127.0.0.1:8001;  # 后端服务1
            server 127.0.0.1:8002;  # 后端服务2
        }

        ##
        # Server Blocks
        ##

        server {
            listen 80;
            server_name localhost;
            location / {
                proxy_pass http://backend;
                proxy_set_header Host $host;
                proxy_set_header X-Real-IP $remote_addr;
                proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
                proxy_set_header X-Forwarded-Proto $scheme;
                #proxy_http_version 1.1;          # 确保使用 HTTP/1.1 <button class="citation-flag" data-index="7">
                #proxy_set_header Connection '';
            }
        }
        add_header X-Upstream $upstream_addr always;  # 在响应头中暴露后端地址
        ##
        # Virtual Host Configs
        ##

        include /etc/nginx/conf.d/*.conf;
        include /etc/nginx/sites-enabled/*;
    }

    #mail {
    #       # See sample authentication script at:
    #       # http://wiki.nginx.org/ImapAuthenticateWithApachePhpScript
    #
    #       # auth_http localhost/auth.php;
    #       # pop3_capabilities "TOP" "USER";
    #       # imap_capabilities "IMAP4rev1" "UIDPLUS";
    #
    #       server {
    #               listen     localhost:110;
    #               protocol   pop3;
    #               proxy      on;
    #       }
    #
    #       server {
    #               listen     localhost:143;
    #               protocol   imap;
    #               proxy      on;
    #       }
    #}

    编写完配置,systemctl reload nginx 重启,nginx -t 确认配置ok

    3. 均衡算法

    以上配置中 least_conn 是均衡算法,可以有多种算法可选

    • 轮询(Round Robin) 默认算法,无需显式声明 工作原理:按顺序逐一分发请求到后端服务器 加权版本:通过 weight 参数调整分配比例
    upstream backend {
        server 127.0.0.1:8001 weight=3;  # 60%的请求
        server 127.0.0.1:8002 weight=2;  # 40%的请求
    }
    • 最少连接(Least Connections) 语法:least_conn 工作原理:优先将请求发给当前连接数最少的后端 适用场景:后端服务器处理数量差异较大时
    upstream backend {
        least_conn;
        server 127.0.0.1:8001;
        server 127.0.0.1:8002;
    }
    • IP 哈希(IP Hash) 语法:ip_hash 工作原理:根据客户端IP的哈希值固定分配后端服务器 适用场景:需要会话保持(Session Persistence)
    upstream backend {
        ip_hash;
        server 127.0.0.1:8001;
        server 127.0.0.1:8002;
    }
    • 通用哈希(Generic Hash) 语法:hash key [consistent] 工作原理:根据自定义的键(如URI、请求参数)计算哈希值 consistent:启用一致性哈希,减少后端增减时的影响

    一致性哈希通过哈希环和虚拟节点机制,有效解决了传统哈希算法在动态环境下的数据迁移和负载不均问题。 其核心在于减少节点变动的影响范围,并利用虚拟节点实现数据分布的平衡性,是分布式系统中实现高可用和可扩展性的关键技术

    upstream backend {
        hash $request_uri consistent;  # 按请求URI分配
        server 127.0.0.1:8001;
        server 127.0.0.1:8002;
    }

    所有算法均可配合权重使用:

    server 127.0.0.1:8001 weight=5 max_fails=3 fail_timeout=30s;

    4. 测试负载均衡

    开启两个http服务(端口 8001,8002),使用python脚本 server.py 启动

    # server.py
    from http.server import BaseHTTPRequestHandler, HTTPServer
    import socket

    class DebugRequestHandler(BaseHTTPRequestHandler):
        def do_GET(self):
            # 打印客户端信息
            client_ip, client_port = self.client_address
            print(f"\n--- 新请求 @{self.server.server_port} ---")
            print(f"[客户端] {client_ip}:{client_port}")
            print(f"[方法] {self.command}")
            print(f"[路径] {self.path}")
            print(f"[请求头] {self.headers}")

            # 返回响应 
            self.send_response(200)
            self.send_header("Content-type""text/plain")
            self.end_headers()
            self.wfile.write(f"响应来自: {self.server.server_port}".encode())

        def do_POST(self):
            # 处理 POST 请求 
            content_length = int(self.headers["Content-Length"])
            post_data = self.rfile.read(content_length).decode("utf-8")
            
            print(f"\n--- POST 请求 @{self.server.server_port} ---")
            print(f"[数据] {post_data}")
            self.do_GET()  # 复用 GET 响应逻辑

    def run(server_class=HTTPServer, handler_class=DebugRequestHandler, port=8001):
        server_address = ("0.0.0.0", port)  # 绑定到所有接口
        httpd = server_class(server_address, handler_class)
        print(f"服务启动于 0.0.0.0:{port}...")
        httpd.serve_forever()

    if __name__ == "__main__":
        # 启动两个服务实例(分别在 8001 和 8002 端口)
        import threading
        threading.Thread(target=run, kwargs={"port"8001}).start()
        threading.Thread(target=run, kwargs={"port"8002}).start()

    python3 server.py

    压力测试

    apt install apache2-utils
    ab -n 1000 -c 10 http://localhost/

    ab 是 Apache Bench 的缩写,它是 Apache HTTP 服务器项目中的一个性能测试工具,用于对 Web 服务器发起压力测试。

    命令分解

    1. **ab**:工具名称(Apache Bench)。
    2. **-n 1000**:总请求数为 1000(-n 表示 number of requests)。
    3. **-c 10**:并发用户数为 10(-c 表示 concurrency,即同时发送的请求数)。
    4. **http://localhost/**:目标测试地址(可以是本地服务或远程 URL)。

    模拟 10 个并发用户 同时访问 http://localhost/,每个用户连续发送请求,直到总请求数达到 1000。通过该命令可以测试:

    • 服务器的 吞吐量(每秒处理多少请求)。
    • 请求的 平均响应时间
    • 服务器的 并发处理能力
    • 是否出现错误(如超时、5xx 错误等)。

    示例输出解读

    运行命令后,输出结果示例如下:

    This is ApacheBench, Version 2.3 <$Revision: 1879490 $>
    Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
    Licensed to The Apache Software Foundation, http://www.apache.org/

    Benchmarking localhost (be patient)
    Completed 100 requests
    Completed 200 requests
    Completed 300 requests
    Completed 400 requests
    Completed 500 requests
    Completed 600 requests
    Completed 700 requests
    Completed 800 requests
    Completed 900 requests
    Completed 1000 requests
    Finished 1000 requests


    Server Software:        nginx/1.18.0
    Server Hostname:        localhost
    Server Port:            80

    Document Path:          /
    Document Length:        18 bytes

    Concurrency Level:      10
    Time taken for tests:   1.201 seconds
    Complete requests:      1000
    Failed requests:        0
    Total transferred:      178000 bytes
    HTML transferred:       18000 bytes
    Requests per second:    832.30 [#/sec] (mean)
    Time per request:       12.015 [ms] (mean)
    Time per request:       1.201 [ms] (mean, across all concurrent requests)
    Transfer rate:          144.68 [Kbytes/sec] received

    Connection Times (ms)
                  min  mean[+/-sd] median   max
    Connect:        0    0   0.0      0       1
    Processing:     2   12   1.9     12      26
    Waiting:        1   12   1.9     12      25
    Total:          2   12   1.9     12      26

    Percentage of the requests served within a certain time (ms)
      50%     12
      66%     12
      75%     13
      80%     13
      90%     14
      95%     15
      98%     16
      99%     18
     100%     26 (longest request)

    关键指标

    1. Requests per second813.01 [#/sec]
      表示服务器每秒能处理 832 个请求(吞吐量越高,性能越好)。

    2. Time per request

    • 12.015 [ms] (mean):每个请求的平均耗时(从用户角度看)。
    • 1.201 [ms] (mean, across all concurrent requests):服务器平均处理每个请求的时间。
    • Connection Times
      显示连接、处理、等待时间的分布(单位:毫秒)。

    • Percentage served within a certain time
      响应时间的分布情况(如 90% 的请求在 14ms 内完成)。

    • 服务端的日志输出:在这里插入图片描述我们去统计下两个服务端的被请求次数:

      tail -f /var/log/nginx/access.log
      127.0.0.1:48878 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8001
      127.0.0.1:48882 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8002
      127.0.0.1:48892 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8001
      127.0.0.1:48904 / - - [02/Mar/2025:21:10:11 +0800] "GET / HTTP/1.0" 200 18 "-" "ApacheBench/2.3" "-" to: 127.0.0.1:8002
      root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 1000 |  grep ":8001" | wc -l
      506
      root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 1000 |  grep ":8002" | wc -l
      494

      或者

      root@MichaelMing:~# awk '{print $NF}' /var/log/nginx/access.log | sort | uniq -c
          300 "ApacheBench/2.3"
            3 "curl/7.81.0"
        93576 -
          984 127.0.0.1:8001
         1018 127.0.0.1:8002

      我们去查看两个端口号,发现两个端口的服务被请求的次数是基本均衡的(506/494),Nginx起到了负载均衡的作用

      5. chat服务例子

      假设 MichaelAI有两个 api 接口 chatcompletions

      from http.server import BaseHTTPRequestHandler, HTTPServer
      import json
      from urllib.parse import urlparse, parse_qs
      from datetime import datetime

      class MultiAPIHandler(BaseHTTPRequestHandler):
          def send_json_response(self, data, status=200):
              self.send_response(status)
              self.send_header("Content-type""application/json")
              self.end_headers()
              self.wfile.write(json.dumps(data).encode("utf-8"))
          
          def do_POST(self):
              # 统一入口路由分发
              parsed_path = urlparse(self.path)
              path = parsed_path.path
              
              try:
                  content_length = int(self.headers["Content-Length"])
                  post_data = self.rfile.read(content_length).decode("utf-8")
                  
                  print(f"\n--- {path} 请求 @{self.server.server_port} ---")
                  print(f"[客户端] {self.client_address[0]}:{self.client_address[1]}")
                  print(f"[数据] {post_data}")
                  
                  if path == "/chat":
                      self.handle_chat(post_data)
                  elif path == "/completions":
                      self.handle_completions(post_data)
                  else:
                      self.send_error(404, "API not found")
                      
              except Exception as e:
                  self.send_json_response({
                      "error": str(e),
                      "server": self.server.server_port,
                      "timestamp": datetime.now().isoformat()
                  }, 400)

          def handle_chat(self, data):
              # 模拟对话接口处理
              try:
                  input_data = json.loads(data)
                  message = input_data.get("message""")
                  
                  response = {
                      "original": message,
                      "response": f"Processed by chat API: {message.upper()}",
                      "server": self.server.server_port,
                      "timestamp": datetime.now().isoformat()
                  }
                  self.send_json_response(response)
                  
              except json.JSONDecodeError:
                  raise ValueError("Invalid JSON format")

          def handle_completions(self, data):
              # 文本补全接口
              response = {
                  "completions": [
                      {"text": f"Completion 1: {data[:5]}...""index": 0},
                      {"text": f"Completion 2: {data[-5:]}...""index": 1}
                  ],
                  "server": self.server.server_port,
                  "timestamp": datetime.now().isoformat()
              }
              self.send_json_response(response)

      def run(server_class=HTTPServer, handler_class=MultiAPIHandler, port=8001):
          server_address = ("0.0.0.0", port)
          httpd = server_class(server_address, handler_class)
          print(f"多API服务启动于 0.0.0.0:{port}...")
          httpd.serve_forever()

      if __name__ == "__main__":
          import threading
          # 启动两个服务实例
          threading.Thread(target=run, kwargs={"port": 8001}).start()
          threading.Thread(target=run, kwargs={"port": 8002}).start()
      • 测试 chat api

      chat_data.json 示例: {"message": "Hello, this is a test message"}

      ab -n 100 -c 10 -T "application/json" -p json_data \
      http://localhost:8001/chat

      服务端日志

      --- /chat 请求 @8001 ---
      [客户端] 127.0.0.1:54724
      [数据] {"message""Hello, this is a test message"}

      127.0.0.1 - - [02/Mar/2025 22:41:22] "POST /chat HTTP/1.0" 200 -
      • 测试 completions api
      text.txt 示例:
      你好,我是Michael阿明开发的智能助手!
      ab -n 10000 -c 100 -T "text/plain" -p text.txt \
      http://localhost:8002/completions

      服务端日志

      --- /completions 请求 @8002 ---
      [客户端] 127.0.0.1:47246
      [数据] 你好,我是Michael阿明开发的智能助手!

      127.0.0.1 - - [02/Mar/2025 22:49:26] "POST /completions HTTP/1.0" 200 
      • 同时测试两个接口
      # 同时压测两个接口
      echo "chat接口压测结果:" && \
      ab -n 500 -c 50 -T "application/json" -p json_data http://localhost:80/chat && \
      echo "completions接口压测结果:" && \
      ab -n 500 -c 50 -T "text/plain" -p text.txt http://localhost:80/completions

      completions 接口调用分布:两个机器基本接近

      root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 20000 |  grep "completions" | grep 8001 | wc -l
      747
      root@MichaelMing:~# cat /var/log/nginx/access.log | tail -n 20000 |  grep "completions" | grep 8002 | wc -l
      755
      • 客户端调用
      import requests
      import asyncio
      import aiohttp

      # 同步客户端(基于requests)
      class SyncAPIClient:
          def __init__(self, base_url):
              self.base_url = base_url

          def call_chat(self, message):
              url = f"{self.base_url}/chat"
              data = {"message": message}
              response = requests.post(url, json=data)
              return response.json()

          def call_completions(self, text):
              url = f"{self.base_url}/completions"
              response = requests.post(url, data=text)
              return response.json()

      # 异步客户端(基于aiohttp)
      class AsyncAPIClient:
          def __init__(self, base_url):
              self.base_url = base_url

          async def call_chat(self, message):
              url = f"{self.base_url}/chat"
              data = {"message": message}
              async with aiohttp.ClientSession() as session:
                  async with session.post(url, json=data) as response:
                      return await response.json()

          async def call_completions(self, text):
              url = f"{self.base_url}/completions"
              async with aiohttp.ClientSession() as session:
                  async with session.post(url, data=text) as response:
                      return await response.json()

      # 同步调用示例
      sync_client = SyncAPIClient("http://localhost")
      chat_response = sync_client.call_chat("Hello, sync chat!")
      print("Chat响应:", chat_response)

      completion_response = sync_client.call_completions("test text")
      print("Completions响应:", completion_response)

      # 异步调用示例
      async def main():
          async_client = AsyncAPIClient("http://localhost")

          # 并发调用两个接口 
          chat_task = asyncio.create_task(async_client.call_chat("Hello, async chat!"))
          completion_task = asyncio.create_task(async_client.call_completions("async test text"))

          results = await asyncio.gather(chat_task, completion_task)
          print("异步Chat响应:", results[0])
          print("异步Completions响应:", results[1])

      asyncio.run(main())

      输出:

      root@MichaelMing:~# python3 /mnt/d/opt/client.py
      Chat响应: {'original''Hello, sync chat!''response''Processed by chat API: HELLO, SYNC CHAT!''server': 8001, 'timestamp''2025-03-02T23:11:54.426626'}
      Completions响应: {'completions': [{'text''Completion 1: test ...''index': 0}, {'text''Completion 2:  text...''index': 1}], 'server': 8002, 'timestamp''2025-03-02T23:11:54.430562'}
      异步Chat响应: {'original''Hello, async chat!''response''Processed by chat API: HELLO, ASYNC CHAT!''server': 8001, 'timestamp''2025-03-02T23:11:54.444508'}
      异步Completions响应: {'completions': [{'text''Completion 1: async...''index': 0}, {'text''Completion 2:  text...''index': 1}], 'server': 8002, 'timestamp''2025-03-02T23:11:54.444973'}

      用代码进行压力测试

      # 并发压测示例(使用异步客户端)
      async def stress_test():
          client = AsyncAPIClient("http://localhost")
          tasks = []

          for _ in range(100):
              tasks.append(client.call_chat("test message"))
              tasks.append(client.call_completions("test text"))

          responses = await asyncio.gather(*tasks)
          print(f"成功处理 {len(responses)} 个请求")

      asyncio.run(stress_test())
      在这里插入图片描述

    53AI,企业落地大模型首选服务商

    产品:场景落地咨询+大模型应用平台+行业解决方案

    承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

    联系我们

    售前咨询
    186 6662 7370
    预约演示
    185 8882 0121

    微信扫码

    添加专属顾问

    回到顶部

    加载中...

    扫码咨询