AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


【干货】自定义聊天模型,原来qwen就是这么集成到langchain的,看完我也会
发布日期:2024-05-10 13:58:30 浏览次数: 2430 来源:河指令


每次看到langchain总会好奇,这么多的模型是怎么集成起来的,如果我也想把自己的私人模型集成进去,要怎么做才行的?我们都知道,调用模型,无非拿到模型的api接口,直接调用不就完事了,这么简单,有手就行。


今天,我就带领大家看看它内部的集成原理,以下内容会分为两部分

1.先举个langchain集成自定义模型的例子

2.源码层面看通义千问模型是怎么集成的


那么,我们就直接上教程吧


langchain集成自定义模型

我们以往的调用模型都是这样的,我们也按这种方式来自定义模型

# 构建大语言模型对象
llm = ChatTongyi()
# 发送请求,调用qwen模型
llm.invoke("嗨,你好")

自定义llm模型可以用langchain中标准的 BaseChatModel 接口进行包装,里面有两个方法是必须要实现的,分别为:

_generate:用于根据提示生成聊天结果。

_llm_type (property):唯一标识模型的类型,也就是模型的型号,用于记录。


下面例子我们来定义一个CustomChatModelAdvanced模型,根据传递的消息提示,返回前几个字符,业务逻辑比较简单,主要用于理解调用模型的整个内部流程。

导入依赖包

from langchain_core.messages import (
   AIMessage,
   BaseMessage,
   FunctionMessage,
   HumanMessage,
   SystemMessage,
   ToolMessage,
)

from typing import Any, AsyncIterator, Dict, Iterator, List, Optional

from langchain_core.callbacks import (
   AsyncCallbackManagerForLLMRun,
   CallbackManagerForLLMRun,
)
from langchain_core.language_models import BaseChatModel, SimpleChatModel
from langchain_core.messages import AIMessageChunk, BaseMessage, HumanMessage
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.runnables import run_in_executor

模型的实现

重点看_generate方法的参数和返回的参数,该方法主要是根据传进来的消息列表,对最后一条记录的字符进行切割,从0开始到参数n的值结束,从方法定义可以看出,最终会返回ChatResult形式的内容。

class CustomChatModelAdvanced(BaseChatModel):
   model_name: str
   n: int

   def _generate(
       self,
       messages: List[BaseMessage],
       stop: Optional[List[str]] = None,
       run_manager: Optional[CallbackManagerForLLMRun] = None,
       **kwargs: Any,
 ) -> ChatResult:
       """
     重写BaseChatModel中的_generate方法以实现聊天模型逻辑

     这里可以自定义本地模型的调用

     参数:
     messages:可接受多条消息组成的提示
     stop: 字符串列表,模型应根据该字符串停止生成(非必须)
     run_manager:为LLM提供回调的运行管理器
     """
       # 如n=5,只返回消息前面的5个字符
       last_message = messages[-1]
       tokens = last_message.content[: self.n]
       message = AIMessage(
           content=tokens,
           additional_kwargs={},  # Used to add additional payload (e.g., function calling request)
           response_metadata={  # Use for response metadata
               "time_in_seconds": 3,
         },
     )
       
       # 初始化generation对象, 对里面的content指定值进行处理
       generation = ChatGeneration(message=message)
       # 最近返回ChatResult形式的消息结果
       return ChatResult(generations=[generation])

   # 如果模型可以生成流媒体方式的输出,则应该实现此方法,否则也可以不用实现
   def _stream(
       self,
       messages: List[BaseMessage],
       stop: Optional[List[str]] = None,
       run_manager: Optional[CallbackManagerForLLMRun] = None,
       **kwargs: Any,
 ) -> Iterator[ChatGenerationChunk]:
       
       last_message = messages[-1]
       tokens = last_message.content[: self.n]

       for token in tokens:
           chunk = ChatGenerationChunk(message=AIMessageChunk(content=token))

           if run_manager:
               run_manager.on_llm_new_token(token, chunk=chunk)

           yield chunk

       chunk = ChatGenerationChunk(
           message=AIMessageChunk(content="", response_metadata={"time_in_sec": 3})
     )
       if run_manager:
           run_manager.on_llm_new_token(token, chunk=chunk)
       yield chunk

   # 该方法必须是要实现的, 返回该大语言聊天模型使用的模型型号
   @property
   def _llm_type(self) -> str:
       return "echoing-chat-model-advanced"

   @property
   def _identifying_params(self) -> Dict[str, Any]:
     
       return {
           
           "model_name": self.model_name,
     }

我们来测试一下吧

模型已经集成了,我们看看效果吧

# 构建模型对象
model = CustomChatModelAdvanced(n=3, model_name="my_custom_model")

# 调用,传递的是Message格式的消息列表
model.invoke(
 [
       HumanMessage(content="hello!"),
       AIMessage(content="Hi there human!"),
       HumanMessage(content="Meow!"),
 ]
)

输出结果,可以看到content='Meo'已经得到结果了

AIMessage(content='Meo', response_metadata={'time_in_seconds': 3}, id='run-3297d98c-856f-4d33-a42f-73f93aba862e-0')
# 字符串 单条记录的时候
model.invoke("hello")

模型输出:

AIMessage(content='hel', response_metadata={'time_in_seconds': 3}, id='run-c6bc7882-60a2-40cf-87ba-7794ba9e1ddb-0')

从以上例子我们可以看出,只要实现了_generate,_llm_type这两个方法,langchian自定义集成模型就可以使用了。


通义千问的集成

下面我们看看通义千问中怎么实现的,我们根据上面的例子, 先找到langchain中的ChatTongyi对象源代码,代码路径为

# 通义千问的源代码地址 ChatTongyi对象
https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/chat_models/tongyi.py
   
# Tongyi()方式的可以看这里
https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/llms/tongyi.py

直接找到核心的_generate方法。

代码如下,可以看到,参数和我们上面例子中的都一致, 返回的结果类型都是ChatResult,有变化的是方法内部的业务逻辑。

这里我们重点看resp = self.completion_with_retry(**params)代码就好,我们看他是怎么处理这个返回结果的,怎么进行发送请求调用的。

def _generate(
       self,
       messages: List[BaseMessage],
       stop: Optional[List[str]] = None,
       run_manager: Optional[CallbackManagerForLLMRun] = None,
       **kwargs: Any,
 ) -> ChatResult:
       generations = []
       # 是否进行流式处理
       if self.streaming:
           generation: Optional[ChatGenerationChunk] = None
           for chunk in self._stream(
               messages, stop=stop, run_manager=run_manager, **kwargs
         ):
               if generation is None:
                   generation = chunk
               else:
                   generation += chunk
           assert generation is not None
           generations.append(self._chunk_to_generation(generation))
       else:
           # 不进行流式处理,直接返回结果
           params: Dict[str, Any] = self._invocation_params(
               messages=messages, stop=stop, **kwargs
         )
           # 发送请求调用,如出现错误就使用重试机制
           resp = self.completion_with_retry(**params)
           generations.append(
               ChatGeneration(**self._chat_generation_from_qwen_resp(resp))
         )
       # 返回结果
       return ChatResult(
           generations=generations,
           llm_output={
               "model_name": self.model_name,
         },
     )

我们看下completion_with_retry方法的处理逻辑,先看参数,**_kwargs是一个字典参数,表示将字典中的键值对展开成一系列的关键字参数,就是把键值对当成参数传递进来。

该方法主要干的事情就是重试机制,如果resp = self.client.call(**_kwargs)出现错误,就自动进行重试,这句代码是该方法的核心,它是怎么发送请求的, 我们看下self.client怎么来的。

直接在当前文件搜索,可以找到该属性被赋值的地方。

def validate_environment(cls, values: Dict) -> Dict:
       """Validate that api key and python package exists in environment."""
       values["dashscope_api_key"] = convert_to_secret_str(
           get_from_dict_or_env(values, "dashscope_api_key", "DASHSCOPE_API_KEY")
     )
       try:
           # 引用依赖包
           import dashscope
       except ImportError:
           raise ImportError(
               "Could not import dashscope python package. "
               "Please install it with `pip install dashscope --upgrade`."
         )
       try:
           # dashscope依赖包赋值给client
           values["client"] = dashscope.Generation
       except AttributeError:
           raise ValueError(
               "`dashscope` has no `Generation` attribute, this is likely "
               "due to an old version of the dashscope package. Try upgrading it "
               "with `pip install --upgrade dashscope`."
         )

       return values

该方法是校验传递的参数和配置信息的,最重要的两句代码为

import dashscope

values["client"] = dashscope.Generation


看到这里应该恍然大悟了,dashscope是阿里的模型服务灵积依赖包,里面提供了发送http请求,调用各种大模型的工具,下面我们看下这个工具的一个简单例子。


from http import HTTPStatus
from dashscope import Generation
import os

qwen_api_key = "你的qwen-api-key"
os.environ["DASHSCOPE_API_KEY"] = qwen_api_key


# 消息提示
messages = [{'role': 'system', 'content': '你是一个专业的人工助手.'},
         {'role': 'user', 'content': '如何做西红柿炒鸡蛋?'}]

# 根据用户消息提示,调用qwen模型
response = Generation.call(model="qwen-turbo",
                          api_key=qwen_api_key,
                          messages=messages,
                          result_format="message")
print(response)

远程调用qwen模型,这里的call方法是不是很熟悉,和我们上面的resp = self.client.call(**_kwargs)是不是一样的,看到这里,你应该知道调用的内部原理了吧。

输出结果

{"status_code": 200, "request_id": "85af3c37-6076-99aa-8511-93f4c3b40051", "code": "", "message": "", "output": {"text": null, "finish_reason": null, "choices": [{"finish_reason": "stop", "message": {"role": "assistant", "content": "做西红柿炒鸡蛋是一道非常经典的家常菜,下面是简单的步骤:\n\n所需材料:\n- 鸡蛋 3-4个\n- 西红柿 2个\n- 葱花适量\n- 盐适量\n- 白糖少许\n- 食用油适量\n\n步骤:\n1. 准备食材:鸡蛋打入碗中,加入少许盐,搅拌均匀;西红柿洗净切块,葱切葱花备用。\n\n2. 热锅凉油:在锅中加入适量的食用油,热油后转中小火,等油温稍微升高时,将打好的鸡蛋液倒入锅中,轻轻搅拌,让鸡蛋快速凝固成块状,炒至八成熟后盛出备用。\n\n3. 炒西红柿:锅中留少许底油,放入切好的西红柿块,用中小火慢慢翻炒,炒至西红柿出汁,变得稍微软烂。\n\n4. 加入调料:西红柿炒出汁后,加入适量的白糖(可帮助中和西红柿的酸味),再加入少许盐调味。如果喜欢更香一些,可以加入一点点生抽。\n\n5. 拌入鸡蛋:将之前炒好的鸡蛋倒入锅中,和西红柿一起翻炒均匀,让鸡蛋充分吸收西红柿的汤汁。\n\n6. 出锅装盘:最后撒上葱花,翻炒均匀后即可出锅,一道美味的西红柿炒鸡蛋就做好了。\n\n记得根据个人口味调整盐和糖的量,希望你能成功做出美味的西红柿炒鸡蛋!"}}]}, "usage": {"input_tokens": 26, "output_tokens": 327, "total_tokens": 353}}

我们来梳理一下

new ChatTongyi() 中的

_generate方法主要是根据传递的参数判断是不是流媒体方式的输出的,如果是就走它的处理逻辑,返回的是流式输出对象,如果不是就走一次性返回结果的方式。

获取返回结果的是通过该代码

resp = self.client.call(**_kwargs),构建一个dashscope对象,调用请返回结果的就是它了。


总结:本文主要是熟悉langchain集成自定义模型,集成也很简单,需要实现BaseChatModel中的_generate_llm_type方法,定义属于自己的业务逻辑,并返回处理的结果。最后我们还看了qwen集成的例子,知道了它的_generate处理了很多事情,参数的验证,发送请求如果出现错误还有重试机制等,最后通过阿里的模型服务灵积dashscope依赖包,发送请求拿到结果。




53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询