微信扫码
与创始人交个朋友
我要投稿
为高质量支撑 2024 年客服大模型商用,中国移动构建了客服大模型“混合云”生产环境,确保大模型应用安全稳定运行、智算资源高效利用。面对当前跨云调用拓扑的复杂性,以及运维保障与业务运营中服务质量观测指标的缺失问题,多团队共同合作基于 eBPF 与 Wasm 技术构建客服大模型生产运行态可观测能力。
在基于基础大模型构建的客服大模型中,虽然这些基础模型在自然语言处理方面表现出强大的计算能力,但其训练、微调和推理的细节仍难以完全理解和控制。尤其是在行业应用中,这些模型在响应性能、准确性和数据安全性方面常常表现为“黑盒”。因此,亟需开发针对大型语言模型的可观测性解决方案,以实现模型的可感知、可维护、可评估和可信任,从而提升平台能力和服务质量。
当前,大模型的应用面临一些挑战。在业务稳定性方面,为避免对流量的影响,业务团队严格禁止安装 APM 探针。此外,由于不同团队负责多个大模型,统一拓扑和指标的协调难度较大。因此,在构建可观测性解决方案时,必须做到对业务的零侵扰。近年中国移动磐基 PaaS 平台已经携手 DeepFlow 借助 eBPF 技术实现了全栈且无侵扰的应用可观测性。磐基 PaaS 平台将 eBPF 数据与现有的可观测数据整合,提供了开箱即用的应用可观测性,全栈无盲点的调用链追踪等能力。
在客服大模型应用上线后,集成了eBPF 可观测性能力,快速开启了应用的“黑盒”视图,实现了开箱即用的全景调用拓扑、丰富的应用和网络指标,以及调用详情和链路。尽管通过 eBPF 提升了应用的可观测性,但对于大模型服务的关键性能指标,如 TTFT(Time To First Token)和 TPOT(Time Per Output Token),仍缺乏有效的采集和展示。需要不断丰富大模型系统的指标数据,利用 eBPF 技术的优势,实现更直观的监控和展示,以全面了解大模型系统的性能表现。本文将分享如何利用 DeepFlow 提供的 WebAssembly(Wasm)插件来解析业务流量获取大模型所需指标。
客服大模型上线磐基 eBPF 可观测性能力后,客服大模型立即具备了开箱即用的可观测性功能。这使得其能够快速获取全面的系统监控视图,实时追踪和分析复杂的调用链路,有效提升问题诊断和解决效率。同时,无需额外的代码插入,便可自动生成应用和网络性能指标,轻松实现资源优化和性能提升。
客服大模型支撑系统
与基础大模型
部署 agent,后续将逐步推广到客服大模型的其他服务,将使得全景拓扑越来越完整。因为客服大模型跨多个云、多个基础大模型,需统一性能指标输出标准,大模型应用中涉及多语言、架构复杂,传统 APM 方式费时费力,故最终考虑使用 eBPF+Wasm 来实现对于业务指标的采集。经过前期多次调研沟通,利用 DeepFlow 提供的 Wasm 插件,能够解析大模型流式请求来采集业务指标量,确认方案可行并生产落地,本分详细分享下落地的整个过程。
首先,我们对当前 AI 应用关注的指标进行了分类,主要分为基础设施、网络、应用和业务四类。前三类指标可以通过传统监控和 eBPF 可观测能力轻松实现,而业务指标是目前的难点,也是这次讨论的重点。以下是这四类指标的详细说明:
通用指标 | 说明 | 备注 |
---|---|---|
CPU 使用率 | (总 CPU 时间 - 空闲 CPU 时间) / 总 CPU 时间 * 100 | 现有指标 |
内存使用 | (总内存 - 空闲内存) / 总内存 * 100 | 现有指标 |
磁盘 I/O | 监控读写次数和速度 | 现有指标 |
系统负载 | 一段时间内的平均负载 | 现有指标 |
通用指标 | 说明 | 备注 |
---|---|---|
吞吐量 | 发送与接收字节的总和(Byte/s) | 现有指标 |
TCP重传比例 | (TCP 客户端重传+TCP服务端重传)/包数 | 现有指标 |
TCP建连-失败比例 | 建连-失败次数 /关闭连接数 | 现有指标 |
TCP建连时延 | 采集周期内所有TCP建连时延的平均值 | 现有指标 |
TCP活跃连接数 | 采集周期内活跃的连接数 | 现有指标 |
通用指标 | 说明 | 备注 |
---|---|---|
请求速率 | 平均每秒请求总数(req/s) | 现有指标 |
累计服务端异常比例 | 服务端异常/响应 | 现有指标 |
响应时延 | 采集周期内所有应用时延的平均值 | 现有指标 |
大模型指标 | 说明 | 备注 |
---|---|---|
TTFT (Time To First Token) | 首 Token 生成时间 | 通过 Wasm 插件采集(本文分享) |
TPOT (Time Per Output Token) | 每个输出 Token 生成时间 | 通过 Wasm 插件采集(本文分享) |
Token 产出率 | 请求中生成的平均 Token 数量 | 通过 Wasm 插件采集(请关注 PR) |
请求速率 | 大模型服务每秒的请求总数 | 复用应用指标-请求速率 |
请求耗时 | 大模型服务请求的平均耗时 | 复用应用指标-响应时延 |
服务并发量 | 大模型服务在处理的长连接请求总数 | 复用网络指标-TCP 活跃连接数 |
DeepFlow 的 Wasm 插件机制[1]提供了一个可编程的、安全的、资源消耗可控的运行沙箱,能够以热插拔的形式增强协议解析、支持私有协议、实现零侵扰分布式追踪和自定义数据脱敏。完全可以做到以对大模型业务零侵扰的形式来获取流量解析业务指标。开始编写插件之前需要先将指标量的计算方式分析清楚。客服大模型与基础大模型之间通过 HTTP 请求来进行交付,请求主要分为两种类型:流式和非流式。流式调用通常采用 HTTP 分块传输编码(Chunked Transfer Encoding),以便逐步发送数据。非流式调用则遵循传统的 HTTP 请求-响应模式。本文章分析的客服大模型与通用大模型之间为了提升服务体验,采用的流式方式进行数据传输。
HTTP 分块传输编码允许服务器将响应拆分为多个部分发送给客户端,而无需预先知道内容的总大小。
客户大模型与基础大模型之间的交互过程由请求和响应两部分组成。首先,客户大模型发起请求,基础大模型返回初始响应,该响应仅包含头部信息,无任何业务数据。随后,基础大模型逐步返回包含 Token 的响应。当所有 Token 传输完成时,发送一个标识完成的响应,称为 Finish 响应。基于交互过程则可确认业务指标的计算方式,如下:
以 vllm/**/**/cm/as**st
接口发起请求,报文详情如下:
这段 Go 程序实现了一个 Wasm 插件,专注于统计大模型 HTTP 流式请求的性能指标。它能够解析 HTTP 流请求和响应,以收集首 Token 延迟(TTFT)和每个输出 Token 的延迟(TPOT)。这种设计使得程序员可以高效地监控和优化大模型的性能表现。
type llmParser struct {
httpStream map[uint64] *StreamInfo
}
type StreamInfo struct {
reqTime uint64 // 请求时间
respFirstChunkedTime uint64 //首次分块响应的时间
totalToken uint64 // 总的输出Token数量
flag int //标记是否已经记录了首次分块响应的时间
}
func checker(payload []byte) (protoNum uint8, protoStr string) {
// 读取HTTP请求。
req, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(payload)))
if err != nil {
return 0, ""
}
query := req.URL.Path
// 检查请求路径是否包含/generate_stream,
if strings.Contains(query, "/generate_stream") {
sdk.Warn(fmt.Sprintf("check: %s", query))
// 如果包含,返回协议编号1和字符串http_stream,
return 1, "http_stream"
}
return 0, "" // 否则返回0和空字符串。
}
func (p *llmParser) HookIn() []sdk.HookBitmap {
return []sdk.HookBitmap{
sdk.HOOK_POINT_PAYLOAD_PARSE,
}
}
func (p *llmParser) OnCheckPayload(baseCtx *sdk.ParseCtx) (protoNum uint8, protoStr string) {
if baseCtx.EbpfType != sdk.EbpfTypeNone {
return 0, ""
}
// 获取负载数据
payload, err := baseCtx.GetPayload()
if err != nil {
return 0, ""
}
// 如果是请求方向,调用 checker 函数进行检查
if baseCtx.Direction == sdk.DirectionRequest {
return checker(payload)
}
return 0, ""
}
func (p *llmParser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.Action {
// 检查 baseCtx.L7 是否为 1。
if baseCtx.L7 != 1 {
return sdk.ActionNext()
}
// 尝试从 baseCtx 中获取 payload
payload, err := baseCtx.GetPayload()
if err != nil {
return sdk.ActionAbortWithErr(err)
}
var attr = []sdk.KeyVal{}
// 获取当前流的 ID flowId。
var flowId = baseCtx.FlowID
// 检查 p.httpStream 中是否存在该流的信息,如果不存在则初始化一个新的 StreamInfo 对象。
if p.httpStream[flowId] == nil {
p.httpStream[flowId] = &StreamInfo{}
}
switch baseCtx.Direction {
case sdk.DirectionRequest: // 如果 baseCtx.Direction 是 sdk.DirectionRequest,则处理
req, err := http.ReadRequest(bufio.NewReader(bytes.NewReader(payload)))
if err != nil {
return sdk.ActionNext()
}
p.httpStream[flowId].reqTime = baseCtx.Time // 记录请求时间 reqTime
info := &sdk.L7ProtocolInfo{
Req: &sdk.Request{
Resource: req.URL.Path,
},
Resp: &sdk.Response{},
}
// 返回 sdk.ParseActionAbortWithL7Info,表示终止处理并返回 L7 协议信息
return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{info})
case sdk.DirectionResponse:
// 使用 bufio.Reader 读取响应数据
r := bufio.NewReader(bytes.NewReader(payload))
bs, _, err := r.ReadLine()
if err == io.EOF {
return sdk.ActionNext()
}
regex := regexp.MustCompile(`^HTTP/[1-2]\.[01] \d{3} .*$`)
if regex.MatchString(string(bs)) {
return sdk.ActionNext()
}
// 如果是结束标志(0),计算并记录时间差和每个 token 的时间
if string(bs) == "0" {
attr = []sdk.KeyVal{
{
// 首 Token 响应时间 - 请求发出时间
Key: "ttft",
Val: fmt.Sprintf("%d", p.httpStream[flowId].respFirstChunkedTime-p.httpStream[flowId].reqTime),
},
{
// (结束时间 - 首 Token 响应时间)/ 总 Token 数
Key: "tpot",
Val: fmt.Sprintf("%d", (baseCtx.Time-p.httpStream[flowId].respFirstChunkedTime)/p.httpStream[flowId].totalToken),
},
}
info := &sdk.L7ProtocolInfo{
Req: &sdk.Request{},
Resp: &sdk.Response{},
Kv: attr,
}
if _, exists := p.httpStream[flowId]; exists {
delete(p.httpStream, flowId)
}
return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{info})
}
bs, _, err = r.ReadLine()
if err == io.EOF {
return sdk.ActionNext()
}
// 如果是响应首包,记录首次分块时间和 token 数
if p.httpStream[flowId].flag == 0 {
p.httpStream[flowId].flag = 1
p.httpStream[flowId].respFirstChunkedTime = baseCtx.Time
p.httpStream[flowId].totalToken = uint64(len(bs))
return sdk.ActionNext()
}
// 继续读取后续分块,累加 token 数
p.httpStream[flowId].totalToken += uint64(len(bs))
bs, _, err = r.ReadLine()
// 如果读取到 EOF,删除流信息并返回
if err == io.EOF {
if _, exists := p.httpStream[flowId]; exists {
delete(p.httpStream, flowId)
}
return sdk.ActionNext()
}
return sdk.ActionNext()
default:
return sdk.ActionNext()
}
}
func main() {
sdk.Warn("llm wasm plugin loaded") // 输出日志信息,表示插件已加载。
llm := &llmParser{ // 创建 llmParser 实例
httpStream: map[uint64]*StreamInfo{},
}
sdk.SetParser(llm) // 设置解析器为 llmParser 实例
}
编译:tinygo build -o llm.wasm -target wasi -gc=precise -panic=trap -scheduler=none -no-debug
上传:deepflow-ctl plugin create --type wasm --image llm.wasm --name llm ./llm/llm.go
查看:deepflow-ctl plugin list
完整代码将会提交至 DeepFlow 社区。我们已与社区沟通,后续将提供 PR。欢迎大家积极参与和交流。
我们目前通过 Grafana 构建了仪表板来展示和验证采集的指标数据。这些数据正在集成到磐基可观测性用户界面,并与全景调用拓扑进行融合。采集 Token 的响应时间(如 TTFT 和 TPOT)能够帮助 AI 应用快速识别性能瓶颈,从而制定有效的优化方案。Token 产出率不仅可以用于外部模型的计费监控,还能帮助评估和优化本地大模型的性能。通过这些指标,可以在效率、性能、准确性和成本之间实现平衡,确保模型的最佳运行状态。
除了 TTFT、TPOT 指标之外,我们还将逐步在大模型推理场景中通过 DeepFlow Agent 实现更多观测能力,包括 Prompt Input/Output 回溯,Prompt 调用链追踪,Prompt Token 消耗用量监测等功能,为大模型工程人员和应用开发人员提供推理服务的性能评估、Token 消耗监测、推理质量回溯、Token 信息安全回溯等观测能力,持续提升大模型推理服务使用质量,持续提升大模型应用的监控效率、故障诊断效率。
在大模型训练及微调场景中,我们还将逐步通过 DeepFlow 的 eBPF 零侵扰观测能力,实现(1)函数级 GPU 性能持续剖析,(2)函数级 HBM 性能持续剖析,(3)毫秒级 RDMA 通信对性能持续剖析,(4)异构 GPU 性能深度指标实时监控等功能,实现训练过程中计算性能抖动、计算性能降速、计算任务挂死等异常的分钟级监控、诊断,提升大模型开发效能,加速大模型训练过程。
DeepFlow 的 Wasm 插件机制: https://deepflow.io/docs/zh/integration/process/wasm-plugin/
eBPF 零侵扰可观测性 Meetup · 上海站开始报名啦!本次活动主题为《大模型全生命周期管理与 AI 应用的全栈可观测性》,精彩议程大咖云集,欢迎扫描二维码锁定席位~
https://deepflow.io
https://github.com/deepflowio/deepflow
400 9696 121
关于 DeepFlow
53AI,企业落地应用大模型首选服务商
产品:大模型应用平台+智能体定制开发+落地咨询服务
承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-05-28
2024-08-13
2024-04-26
2024-08-21
2024-07-09
2024-06-13
2024-08-04
2024-04-11
2024-07-18
2024-07-01