微信扫码
与创始人交个朋友
我要投稿
每次看到langchain
总会好奇,这么多的模型是怎么集成起来的,如果我也想把自己的私人模型集成进去,要怎么做才行的?我们都知道,调用模型,无非拿到模型的api
接口,直接调用不就完事了,这么简单,有手就行。
今天,我就带领大家看看它内部的集成原理,以下内容会分为两部分
1.先举个langchain
集成自定义模型的例子
2.源码层面看通义千问模型是怎么集成的
那么,我们就直接上教程吧
langchain
集成自定义模型我们以往的调用模型都是这样的,我们也按这种方式来自定义模型
# 构建大语言模型对象
llm = ChatTongyi()
# 发送请求,调用qwen模型
llm.invoke("嗨,你好")
自定义llm
模型可以用langchain
中标准的 BaseChatModel
接口进行包装,里面有两个方法是必须要实现的,分别为:
_generate
:用于根据提示生成聊天结果。
_llm_type
(property):唯一标识模型的类型,也就是模型的型号,用于记录。
下面例子我们来定义一个CustomChatModelAdvanced
模型,根据传递的消息提示,返回前几个字符,业务逻辑比较简单,主要用于理解调用模型的整个内部流程。
from langchain_core.messages import (
AIMessage,
BaseMessage,
FunctionMessage,
HumanMessage,
SystemMessage,
ToolMessage,
)
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models import BaseChatModel, SimpleChatModel
from langchain_core.messages import AIMessageChunk, BaseMessage, HumanMessage
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.runnables import run_in_executor
重点看_generate
方法的参数和返回的参数,该方法主要是根据传进来的消息列表,对最后一条记录的字符进行切割,从0开始到参数n的值结束,从方法定义可以看出,最终会返回ChatResult
形式的内容。
class CustomChatModelAdvanced(BaseChatModel):
model_name: str
n: int
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""
重写BaseChatModel中的_generate方法以实现聊天模型逻辑
这里可以自定义本地模型的调用
参数:
messages:可接受多条消息组成的提示
stop: 字符串列表,模型应根据该字符串停止生成(非必须)
run_manager:为LLM提供回调的运行管理器
"""
# 如n=5,只返回消息前面的5个字符
last_message = messages[-1]
tokens = last_message.content[: self.n]
message = AIMessage(
content=tokens,
additional_kwargs={}, # Used to add additional payload (e.g., function calling request)
response_metadata={ # Use for response metadata
"time_in_seconds": 3,
},
)
# 初始化generation对象, 对里面的content指定值进行处理
generation = ChatGeneration(message=message)
# 最近返回ChatResult形式的消息结果
return ChatResult(generations=[generation])
# 如果模型可以生成流媒体方式的输出,则应该实现此方法,否则也可以不用实现
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
last_message = messages[-1]
tokens = last_message.content[: self.n]
for token in tokens:
chunk = ChatGenerationChunk(message=AIMessageChunk(content=token))
if run_manager:
run_manager.on_llm_new_token(token, chunk=chunk)
yield chunk
chunk = ChatGenerationChunk(
message=AIMessageChunk(content="", response_metadata={"time_in_sec": 3})
)
if run_manager:
run_manager.on_llm_new_token(token, chunk=chunk)
yield chunk
# 该方法必须是要实现的, 返回该大语言聊天模型使用的模型型号
@property
def _llm_type(self) -> str:
return "echoing-chat-model-advanced"
@property
def _identifying_params(self) -> Dict[str, Any]:
return {
"model_name": self.model_name,
}
模型已经集成了,我们看看效果吧
# 构建模型对象
model = CustomChatModelAdvanced(n=3, model_name="my_custom_model")
# 调用,传递的是Message格式的消息列表
model.invoke(
[
HumanMessage(content="hello!"),
AIMessage(content="Hi there human!"),
HumanMessage(content="Meow!"),
]
)
输出结果,可以看到content='Meo'
已经得到结果了
AIMessage(content='Meo', response_metadata={'time_in_seconds': 3}, id='run-3297d98c-856f-4d33-a42f-73f93aba862e-0')
# 字符串 单条记录的时候
model.invoke("hello")
模型输出:
AIMessage(content='hel', response_metadata={'time_in_seconds': 3}, id='run-c6bc7882-60a2-40cf-87ba-7794ba9e1ddb-0')
从以上例子我们可以看出,只要实现了_generate
,_llm_type
这两个方法,langchian
自定义集成模型就可以使用了。
下面我们看看通义千问中怎么实现的,我们根据上面的例子, 先找到langchain
中的ChatTongyi
对象源代码,代码路径为
# 通义千问的源代码地址 ChatTongyi对象
https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/chat_models/tongyi.py
# Tongyi()方式的可以看这里
https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/llms/tongyi.py
直接找到核心的_generate
方法。
代码如下,可以看到,参数和我们上面例子中的都一致, 返回的结果类型都是ChatResult
,有变化的是方法内部的业务逻辑。
这里我们重点看resp = self.completion_with_retry(**params)
代码就好,我们看他是怎么处理这个返回结果的,怎么进行发送请求调用的。
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
generations = []
# 是否进行流式处理
if self.streaming:
generation: Optional[ChatGenerationChunk] = None
for chunk in self._stream(
messages, stop=stop, run_manager=run_manager, **kwargs
):
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
generations.append(self._chunk_to_generation(generation))
else:
# 不进行流式处理,直接返回结果
params: Dict[str, Any] = self._invocation_params(
messages=messages, stop=stop, **kwargs
)
# 发送请求调用,如出现错误就使用重试机制
resp = self.completion_with_retry(**params)
generations.append(
ChatGeneration(**self._chat_generation_from_qwen_resp(resp))
)
# 返回结果
return ChatResult(
generations=generations,
llm_output={
"model_name": self.model_name,
},
)
我们看下completion_with_retry
方法的处理逻辑,先看参数,**_kwargs是一个字典参数,表示将字典中的键值对展开成一系列的关键字参数,就是把键值对当成参数传递进来。
该方法主要干的事情就是重试机制,如果resp = self.client.call(**_kwargs)
出现错误,就自动进行重试,这句代码是该方法的核心,它是怎么发送请求的, 我们看下self.client
怎么来的。
直接在当前文件搜索,可以找到该属性被赋值的地方。
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that api key and python package exists in environment."""
values["dashscope_api_key"] = convert_to_secret_str(
get_from_dict_or_env(values, "dashscope_api_key", "DASHSCOPE_API_KEY")
)
try:
# 引用依赖包
import dashscope
except ImportError:
raise ImportError(
"Could not import dashscope python package. "
"Please install it with `pip install dashscope --upgrade`."
)
try:
# dashscope依赖包赋值给client
values["client"] = dashscope.Generation
except AttributeError:
raise ValueError(
"`dashscope` has no `Generation` attribute, this is likely "
"due to an old version of the dashscope package. Try upgrading it "
"with `pip install --upgrade dashscope`."
)
return values
该方法是校验传递的参数和配置信息的,最重要的两句代码为
import dashscope
values["client"] = dashscope.Generation
看到这里应该恍然大悟了,dashscope
是阿里的模型服务灵积依赖包,里面提供了发送http请求,调用各种大模型的工具,下面我们看下这个工具的一个简单例子。
from http import HTTPStatus
from dashscope import Generation
import os
qwen_api_key = "你的qwen-api-key"
os.environ["DASHSCOPE_API_KEY"] = qwen_api_key
# 消息提示
messages = [{'role': 'system', 'content': '你是一个专业的人工助手.'},
{'role': 'user', 'content': '如何做西红柿炒鸡蛋?'}]
# 根据用户消息提示,调用qwen模型
response = Generation.call(model="qwen-turbo",
api_key=qwen_api_key,
messages=messages,
result_format="message")
print(response)
远程调用qwen模型,这里的call方法是不是很熟悉,和我们上面的resp = self.client.call(**_kwargs)
是不是一样的,看到这里,你应该知道调用的内部原理了吧。
输出结果
{"status_code": 200, "request_id": "85af3c37-6076-99aa-8511-93f4c3b40051", "code": "", "message": "", "output": {"text": null, "finish_reason": null, "choices": [{"finish_reason": "stop", "message": {"role": "assistant", "content": "做西红柿炒鸡蛋是一道非常经典的家常菜,下面是简单的步骤:\n\n所需材料:\n- 鸡蛋 3-4个\n- 西红柿 2个\n- 葱花适量\n- 盐适量\n- 白糖少许\n- 食用油适量\n\n步骤:\n1. 准备食材:鸡蛋打入碗中,加入少许盐,搅拌均匀;西红柿洗净切块,葱切葱花备用。\n\n2. 热锅凉油:在锅中加入适量的食用油,热油后转中小火,等油温稍微升高时,将打好的鸡蛋液倒入锅中,轻轻搅拌,让鸡蛋快速凝固成块状,炒至八成熟后盛出备用。\n\n3. 炒西红柿:锅中留少许底油,放入切好的西红柿块,用中小火慢慢翻炒,炒至西红柿出汁,变得稍微软烂。\n\n4. 加入调料:西红柿炒出汁后,加入适量的白糖(可帮助中和西红柿的酸味),再加入少许盐调味。如果喜欢更香一些,可以加入一点点生抽。\n\n5. 拌入鸡蛋:将之前炒好的鸡蛋倒入锅中,和西红柿一起翻炒均匀,让鸡蛋充分吸收西红柿的汤汁。\n\n6. 出锅装盘:最后撒上葱花,翻炒均匀后即可出锅,一道美味的西红柿炒鸡蛋就做好了。\n\n记得根据个人口味调整盐和糖的量,希望你能成功做出美味的西红柿炒鸡蛋!"}}]}, "usage": {"input_tokens": 26, "output_tokens": 327, "total_tokens": 353}}
我们来梳理一下
new ChatTongyi()
中的
_generate
方法主要是根据传递的参数判断是不是流媒体方式的输出的,如果是就走它的处理逻辑,返回的是流式输出对象,如果不是就走一次性返回结果的方式。
获取返回结果的是通过该代码
resp = self.client.call(**_kwargs)
,构建一个dashscope
对象,调用请返回结果的就是它了。
总结:本文主要是熟悉langchain
集成自定义模型,集成也很简单,需要实现BaseChatModel
中的_generate
、_llm_type
方法,定义属于自己的业务逻辑,并返回处理的结果。最后我们还看了qwen
集成的例子,知道了它的_generate
处理了很多事情,参数的验证,发送请求如果出现错误还有重试机制等,最后通过阿里的模型服务灵积dashscope
依赖包,发送请求拿到结果。
53AI,企业落地应用大模型首选服务商
产品:大模型应用平台+智能体定制开发+落地咨询服务
承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-05-28
2024-04-26
2024-08-21
2024-04-11
2024-07-09
2024-08-13
2024-07-18
2024-10-25
2024-07-01
2024-06-17