AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


自定义客服助手实战:使用LangGraph打造AI航空客服助理
发布日期:2024-08-18 08:13:23 浏览次数: 1676


客服助手机器人能够帮助团队更高效地处理日常咨询,但要建立一个能可靠处理各种任务且不会让用户抓狂的机器人是很难的。

在这篇文章中中,我们将为一家航空公司构建一个客户支持机器人,帮助用户查询和安排旅行。我们将学习使用LangGraph的中断和检查点以及更复杂的状态,来组织助手的工具,并管理用户的航班预订、酒店预订、汽车租赁和短途旅行。假定您已经熟悉LangGraph入门教程中介绍的概念。

最终,您将构建一个可用的机器人,并了解LangGraph的关键概念和架构。您将能够将这些设计模式应用到其他AI项目中。由于内容较多,本文将由浅入深,分四个阶段进行讲解,每个阶段都将打造出一个具备以上描述所有能力的机器人。但受限于LLM的能力,初期阶段的机器人的运行可能存在各类问题,但都将在后续阶段得到解决。

您的最终聊天机器人将与下图相似:

让我们开始吧!

准备工作

首先,我们需要搭建好环境,下载测试数据库,并定义我们在每个部分都会重复使用的工具。我们会使用OpenAI作为语言模型(LLM),并创建一些定制化的工具。这些工具大多数会连接到本地的 SQLite 数据库,无需额外依赖。此外,我们还会通过 Tavily 为代理提供网络搜索功能。

pip install -U langgraph langchain-community tavily-python pandas

数据下载和初始化

运行下面的代码来获取我们为这个教程准备的sqlite数据库,并更新它使其看起来像是当前数据:

import os
import shutil
import sqlite3

import pandas as pd
import requests

db_url = "https://storage.googleapis.com/benchmarks-artifacts/travel-db/travel2.sqlite"
local_file = "travel2.sqlite"
# The backup lets us restart for each tutorial section
backup_file = "travel2.backup.sqlite"
overwrite = False
if overwrite or not os.path.exists(local_file):
response = requests.get(db_url)
response.raise_for_status()# Ensure the request was successful
with open(local_file, "wb") as f:
f.write(response.content)
# Backup - we will use this to "reset" our DB in each section
shutil.copy(local_file, backup_file)
# Convert the flights to present time for our tutorial
conn = sqlite3.connect(local_file)
cursor = conn.cursor()

tables = pd.read_sql(
"SELECT name FROM sqlite_master WHERE type='table';", conn
).name.tolist()
tdf = {}
for t in tables:
tdf[t] = pd.read_sql(f"SELECT * from {t}", conn)

example_time = pd.to_datetime(
tdf["flights"]["actual_departure"].replace("\N", pd.NaT)
).max()
current_time = pd.to_datetime("now").tz_localize(example_time.tz)
time_diff = current_time - example_time

tdf["bookings"]["book_date"] = (
pd.to_datetime(tdf["bookings"]["book_date"].replace("\N", pd.NaT), utc=True)
+ time_diff
)

datetime_columns = [
"scheduled_departure",
"scheduled_arrival",
"actual_departure",
"actual_arrival",
]


for column in datetime_columns:
tdf["flights"][column] = (
pd.to_datetime(tdf["flights"][column].replace("\N", pd.NaT)) + time_diff
)

for table_name, df in tdf.items():
df.to_sql(table_name, conn, if_exists="replace", index=False)
del df
del tdf
conn.commit()
conn.close()

db = local_file

定义一系列工具

现在,我们来定义一些工具,以便这个客服助手可以搜索航空公司的政策手册,以及查询和管理航班、酒店、租车和远足活动的预订情况。

查询公司政策

助手需要检索政策信息来回答用户的问题

import re
from langchain_core.tools import tool

response = requests.get(
"https://storage.googleapis.com/benchmarks-artifacts/travel-db/swiss_faq.md"
)
response.raise_for_status()
faq_text = response.text
docs = [txt for txt in re.split(r"(?=\n##)", faq_text)]

embedding = OpenAIEmbeddings()
vectordb = Chroma.from_texts(texts=docs, embedding=embedding)
retriever = vectordb.as_retriever()

@tool
def lookup_policy(query: str) -> str:
"""请查阅公司政策以检查是否允许某些选项。在进行任何航班更改或执行其他“写入”事件之前,请先使用这个功能"""
docs = retriever.invoke(query, k=2)
return "\n\n".join([doc.page_content for doc in docs])

航班管理

定义 fetch_user_flight_information 工具,使客服代理能查看当前用户的航班信息。然后,我们再定义一些工具来搜索航班,并管理存储在SQL数据库中的乘客预订信息。

我们运用ensure_config功能通过可配置参数传入乘客ID。LLM无需明确提供这些信息,它们会在图形被调用时自动提供,以此保证每个用户都无法访问其他乘客的预订信息。

import sqlite3
from datetime import date, datetime
from typing import Optional

import pytz
from langchain_core.runnables import ensure_config


@tool
def fetch_user_flight_information() -> list[dict]:
"""获取用户的所有机票以及相应的航班信息和座位分配。

Returns:
一个字典列表,每个字典包含了用户所拥有的每一张机票的详细信息、相关的航班详情,以及座位分配情况。
"""
config = ensure_config()# Fetch from the context
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")

conn = sqlite3.connect(db)
cursor = conn.cursor()
query = """
SELECT
t.ticket_no, t.book_ref,
f.flight_id, f.flight_no, f.departure_airport, f.arrival_airport, f.scheduled_departure, f.scheduled_arrival,
bp.seat_no, tf.fare_conditions
FROM
tickets t
JOIN ticket_flights tf ON t.ticket_no = tf.ticket_no
JOIN flights f ON tf.flight_id = f.flight_id
JOIN boarding_passes bp ON bp.ticket_no = t.ticket_no AND bp.flight_id = f.flight_id
WHERE
t.passenger_id = ?
"""
cursor.execute(query, (passenger_id,))
rows = cursor.fetchall()
column_names = [column[0] for column in cursor.description]
results = [dict(zip(column_names, row)) for row in rows]

cursor.close()
conn.close()

return results

@tool
def search_flights(
departure_airport: Optional[str] = None,
arrival_airport: Optional[str] = None,
start_time: Optional[date | datetime] = None,
end_time: Optional[date | datetime] = None,
limit: int = 20,
) -> list[dict]:
"""根据出发机场、到达机场和出发时间范围搜索航班。"""
conn = sqlite3.connect(db)
cursor = conn.cursor()

query = "SELECT * FROM flights WHERE 1 = 1"
params = []

if departure_airport:
query += " AND departure_airport = ?"
params.append(departure_airport)

if arrival_airport:
query += " AND arrival_airport = ?"
params.append(arrival_airport)

if start_time:
query += " AND scheduled_departure >= ?"
params.append(start_time)

if end_time:
query += " AND scheduled_departure <= ?"
params.append(end_time)
query += " LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
column_names = [column[0] for column in cursor.description]
results = [dict(zip(column_names, row)) for row in rows]

cursor.close()
conn.close()

return results


@tool
def update_ticket_to_new_flight(ticket_no: str, new_flight_id: int) -> str:
"""将用户的机票更新为新的有效航班"""
config = ensure_config()
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")

conn = sqlite3.connect(db)
cursor = conn.cursor()

cursor.execute(
"SELECT departure_airport, arrival_airport, scheduled_departure FROM flights WHERE flight_id = ?",
(new_flight_id,),
)
new_flight = cursor.fetchone()
if not new_flight:
cursor.close()
conn.close()
return "Invalid new flight ID provided."
column_names = [column[0] for column in cursor.description]
new_flight_dict = dict(zip(column_names, new_flight))
timezone = pytz.timezone("Etc/GMT-3")
current_time = datetime.now(tz=timezone)
departure_time = datetime.strptime(
new_flight_dict["scheduled_departure"], "%Y-%m-%d %H:%M:%S.%f%z"
)
time_until = (departure_time - current_time).total_seconds()
if time_until < (3 * 3600):
return f"Not permitted to reschedule to a flight that is less than 3 hours from the current time. Selected flight is at {departure_time}."

cursor.execute(
"SELECT flight_id FROM ticket_flights WHERE ticket_no = ?", (ticket_no,)
)
current_flight = cursor.fetchone()
if not current_flight:
cursor.close()
conn.close()
return "No existing ticket found for the given ticket number."

# Check the signed-in user actually has this ticket
cursor.execute(
"SELECT * FROM tickets WHERE ticket_no = ? AND passenger_id = ?",
(ticket_no, passenger_id),
)
current_ticket = cursor.fetchone()
if not current_ticket:
cursor.close()
conn.close()
return f"Current signed-in passenger with ID {passenger_id} not the owner of ticket {ticket_no}"

cursor.execute(
"UPDATE ticket_flights SET flight_id = ? WHERE ticket_no = ?",
(new_flight_id, ticket_no),
)
conn.commit()

cursor.close()
conn.close()
return "Ticket successfully updated to new flight."


@tool
def cancel_ticket(ticket_no: str) -> str:
"""取消用户的机票并从数据库中删除它。"""
config = ensure_config()
configuration = config.get("configurable", {})
passenger_id = configuration.get("passenger_id", None)
if not passenger_id:
raise ValueError("No passenger ID configured.")
conn = sqlite3.connect(db)
cursor = conn.cursor()

cursor.execute(
"SELECT flight_id FROM ticket_flights WHERE ticket_no = ?", (ticket_no,)
)
existing_ticket = cursor.fetchone()
if not existing_ticket:
cursor.close()
conn.close()
return "No existing ticket found for the given ticket number."

cursor.execute(
"SELECT flight_id FROM tickets WHERE ticket_no = ? AND passenger_id = ?",
(ticket_no, passenger_id),
)
current_ticket = cursor.fetchone()
if not current_ticket:
cursor.close()
conn.close()
return f"Current signed-in passenger with ID {passenger_id} not the owner of ticket {ticket_no}"

cursor.execute("DELETE FROM ticket_flights WHERE ticket_no = ?", (ticket_no,))
conn.commit()

cursor.close()
conn.close()
return "Ticket successfully cancelled."

租车服务

一旦用户预订了航班,他们可能会想要安排交通工具。定义一些“租车”工具,让用户在目的地搜索和预订汽车。

from datetime import date, datetime
from typing import Optional, Union


@tool
def search_car_rentals(
location: Optional[str] = None,
name: Optional[str] = None,
price_tier: Optional[str] = None,
start_date: Optional[Union[datetime, date]] = None,
end_date: Optional[Union[datetime, date]] = None,
) -> list[dict]:
"""
根据位置、公司名称、价格等级、开始日期和结束日期来搜索租车服务。

    参数:
        location (Optional[str]): 租车服务的位置。
        name (Optional[str]): 租车公司的名称。
        price_tier (Optional[str]): 租车的价格等级。
        start_date (Optional[Union[datetime, date]]): 租车的开始日期。
        end_date (Optional[Union[datetime, date]]): 租车的结束日期。

    返回:
        list[dict]: 匹配搜索条件的租车服务列表。
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()

query = "SELECT * FROM car_rentals WHERE 1=1"
params = []

if location:
query += " AND location LIKE ?"
params.append(f"%{location}%")
if name:
query += " AND name LIKE ?"
params.append(f"%{name}%")

cursor.execute(query, params)
results = cursor.fetchall()

conn.close()

return [
dict(zip([column[0] for column in cursor.description], row)) for row in results
]

@tool
def book_car_rental(rental_id: int) -> str:
"""
通过租车ID来预订租车服务。

    参数:
        rental_id (int): 要预订的租车服务的ID。

    返回:
        str: 预订成功与否的消息。
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()

cursor.execute("UPDATE car_rentals SET booked = 1 WHERE id = ?", (rental_id,))
conn.commit()

if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully booked."
else:
conn.close()
return f"No car rental found with ID {rental_id}."

@tool
def update_car_rental(
rental_id: int,
start_date: Optional[Union[datetime, date]] = None,
end_date: Optional[Union[datetime, date]] = None,
) -> str:
"""
通过租车ID来更新租车服务的开始和结束日期。

    参数:
        rental_id (int): 要更新的租车服务的ID。
        start_date (Optional[Union[datetime, date]]): 新的租车开始日期。
        end_date (Optional[Union[datetime, date]]): 新的租车结束日期。

    返回:
        str: 更新成功与否的消息。
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()

if start_date:
cursor.execute(
"UPDATE car_rentals SET start_date = ? WHERE id = ?",
(start_date, rental_id),
)
if end_date:
cursor.execute(
"UPDATE car_rentals SET end_date = ? WHERE id = ?", (end_date, rental_id)
)

conn.commit()
if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully updated."
else:
conn.close()
return f"No car rental found with ID {rental_id}."


@tool
def cancel_car_rental(rental_id: int) -> str:
"""
通过租车ID来取消租车服务。

    参数:
        rental_id (int): 要取消的租车服务的ID。

    返回:
        str: 取消成功与否的消息。
"""
conn = sqlite3.connect(db)
cursor = conn.cursor()

cursor.execute("UPDATE car_rentals SET booked = 0 WHERE id = ?", (rental_id,))
conn.commit()

if cursor.rowcount > 0:
conn.close()
return f"Car rental {rental_id} successfully cancelled."
else:
conn.close()
return f"No car rental found with ID {rental_id}."

酒店预订

用户需要住宿,因此定义一些工具来搜索和管理酒店预订。

@tool
def search_hotels(
    location: Optional[str] = None,
    name: Optional[str] = None,
    price_tier: Optional[str] = None,
    checkin_date: Optional[Union[datetime, date]] = None,
    checkout_date: Optional[Union[datetime, date]] = None,
)
 -> list[dict]:

    """
    根据位置、名称、价格等级、入住日期和退房日期来搜索酒店。  
  
    参数:  
        location (Optional[str]): 酒店的位置。  
        name (Optional[str]): 酒店的名称。  
        price_tier (Optional[str]): 酒店的价格等级。  
        checkin_date  
          
        # 入住日期和退房日期,用于搜索酒店  
        checkin_date (Optional[Union[datetime, date]]): 酒店的入住日期。  
        checkout_date (Optional[Union[datetime, date]]): 酒店的退房日期。  
  
    返回:  
        list[dict]: 符合搜索条件的酒店列表。
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = "SELECT * FROM hotels WHERE 1=1"
    params = []

    if location:
            query += " AND location LIKE ?"
            params.append(f"%{location}%")
    if name:
        query += " AND name LIKE ?"
        params.append(f"%{name}%")
    # For the sake of this tutorial, we will let you match on any dates and price tier.
    cursor.execute(query, params)
    results = cursor.fetchall()

    conn.close()

    return [
        dict(zip([column[0for column in cursor.description], row)) for row in results
    ]

@tool
def book_hotel(hotel_id: int) -> str:
    """
    通过酒店ID进行预订。  
  
    参数:  
        hotel_id (int): 要预订的酒店的ID。  
  
    返回:  
        str: 预订成功与否的消息。
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute("UPDATE hotels SET booked = 1 WHERE id = ?", (hotel_id,))
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Hotel {hotel_id} successfully booked."
    else:
        conn.close()
        return f"No hotel found with ID {hotel_id}."

@tool
def update_hotel(
    hotel_id: int,
    checkin_date: Optional[Union[datetime, date]] = None,
    checkout_date: Optional[Union[datetime, date]] = None,
)
 -> str:

    """
    通过酒店ID更新酒店预订的入住和退房日期。  
  
    参数:  
        hotel_id (int): 要更新预订的酒店的ID。  
        checkin_date (Optional[Union[datetime, date]]): 新的入住日期。  
        checkout_date (Optional[Union[datetime, date]]): 新的退房日期。  
  
    返回:  
        str: 更新成功与否的消息。
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    if checkin_date:
        cursor.execute(
            "UPDATE hotels SET checkin_date = ? WHERE id = ?", (checkin_date, hotel_id)
        )
    if checkout_date:
        cursor.execute(
            "UPDATE hotels SET checkout_date = ? WHERE id = ?",
            (checkout_date, hotel_id),
        )

    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Hotel {hotel_id} successfully updated."
    else:
        conn.close()
        return f"No hotel found with ID {hotel_id}."

@tool
def cancel_hotel(hotel_id: int) -> str:
    """
    通过酒店ID取消酒店预订。  
  
    参数:  
        hotel_id (int): 要取消预订的酒店的ID。  
  
    返回:  
        str: 取消成功与否的消息。
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute("UPDATE hotels SET booked = 0 WHERE id = ?", (hotel_id,))
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Hotel {hotel_id} successfully cancelled."
    else:
        conn.close()
        return f"No hotel found with ID {hotel_id}."

游玩活动

定义一些工具让用户在到达目的地后能搜索活动并进行预订。

@tool
def search_trip_recommendations(
    location: Optional[str] = None,
    name: Optional[str] = None,
    keywords: Optional[str] = None,
)
 -> list[dict]:

    """
    根据位置、名称和关键词搜索旅行推荐。  
  
    参数:  
        location (Optional[str]): 旅行推荐的地点。  
        name (Optional[str]): 旅行推荐的名字。  
        keywords (Optional[str]): 与旅行推荐相关的关键词。  
  
    返回:  
        list[dict]: 符合搜索条件的旅行推荐列表。
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = "SELECT * FROM trip_recommendations WHERE 1=1"
    params = []
    if location:
        query += " AND location LIKE ?"
        params.append(f"%{location}%")
    if name:
        query += " AND name LIKE ?"
        params.append(f"%{name}%")
    if keywords:
        keyword_list = keywords.split(",")
        keyword_conditions = " OR ".join(["keywords LIKE ?" for _ in keyword_list])
        query += f" AND ({keyword_conditions})"
        params.extend([f"%{keyword.strip()}%" for keyword in keyword_list])

    cursor.execute(query, params)
    results = cursor.fetchall()

    conn.close()

    return [
        dict(zip([column[0for column in cursor.description], row)) for row in results
    ]

@tool
def book_excursion(recommendation_id: int) -> str:
    """
    通过推荐ID预订远足活动。  
  
    参数:  
        recommendation_id (int): 要预订的旅行推荐的ID。  
  
    返回:  
        str: 预订成功与否的消息。
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute(
        "UPDATE trip_recommendations SET booked = 1 WHERE id = ?", (recommendation_id,)
    )
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Trip recommendation {recommendation_id} successfully booked."
    else:
        conn.close()
        return f"No trip recommendation found with ID {recommendation_id}."

@tool
def update_excursion(recommendation_id: int, details: str) -> str:
    """
    通过推荐ID更新旅行推荐的细节。  
  
    参数:  
        recommendation_id (int): 要更新的旅行推荐的ID。  
        details (str): 旅行推荐的新细节。  
  
    返回:  
        str: 更新成功与否的消息。
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute(
        "UPDATE trip_recommendations SET details = ? WHERE id = ?",
        (details, recommendation_id),
    )
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Trip recommendation {recommendation_id} successfully updated."
    else:
        conn.close()
        return f"No trip recommendation found with ID {recommendation_id}."


@tool
def cancel_excursion(recommendation_id: int) -> str:
    """
    通过推荐ID取消旅行推荐。  
  
    参数:  
        recommendation_id (int): 要取消的旅行推荐的ID。  
  
    返回:  
        str: 取消成功与否的消息。
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    cursor.execute(
        "UPDATE trip_recommendations SET booked = 0 WHERE id = ?", (recommendation_id,)
    )
    conn.commit()

    if cursor.rowcount > 0:
        conn.close()
        return f"Trip recommendation {recommendation_id} successfully cancelled."
    else:
        conn.close()
        return f"No trip recommendation found with ID {recommendation_id}."

辅助工具

定义辅助函数在我们调试图形时美化打印信息,并给我们的工具节点提供错误处理(通过将错误添加到聊天历史记录中)。

from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda

from langgraph.prebuilt import ToolNode


def handle_tool_error(state) -> dict:
error = state.get("error")
tool_calls = state["messages"][-1].tool_calls
return {
"messages": [
ToolMessage(
content=f"Error: {repr(error)}\n please fix your mistakes.",
tool_call_id=tc["id"],
)
for tc in tool_calls
]
}


def create_tool_node_with_fallback(tools: list) -> dict:
return ToolNode(tools).with_fallbacks(
[RunnableLambda(handle_tool_error)], exception_key="error"
)


def _print_event(event: dict, _printed: set, max_length=500):
current_state = event.get("dialog_state")
if current_state:
print("Currently in: ", current_state[-1])
message = event.get("messages")
if message:
if isinstance(message, list):
message = message[-1]
if message.id not in _printed:
msg_repr = message.pretty_repr(html=True)
if len(msg_repr) > max_length:
msg_repr = msg_repr[:max_length] + " ... (truncated)"
print(msg_repr)
_printed.add(message.id)

Zero-shot Agent

在构建任何系统时,最佳实践是从最简单的可行方案开始,这里我们先定义一个 Zero-shot Agent,我们的目标是引导它明智地使用这些工具来帮助用户。然而,这个agent存在一些限制,比如机器人可能在未经用户确认的情况下执行不希望的操作,处理复杂查询时可能遇到困难,或者在回答时缺乏针对性。这些问题我们会在后续文章进行改进。我们的简单两节点图如下所示:

状态

定义我们的StateGraph的状态为一个包含仅可添加的消息列表的类型化字典。这些消息构成了聊天历史,这就是我们简单助手所需要的所有状态。

from typing import Annotated

from typing_extensions import TypedDict

from langgraph.graph.message import AnyMessage, add_messages


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

这段代码利用了 Python 的类型注解特性来定义一个名为 State 的结构体,通过使用 TypedDict 来指定该结构体包含的键及其对应的值类型。这里涉及到几个重要的Python类型注解概忈,我将它们逐一解释给你。

TypedDict

TypedDict 允许你定义一个特定的字典类型,这个字典类型会预期一组特定的键,每个键都有自己期望的值类型。这对于确保字典在整个代码库中使用时类型的一致性非常有用。

Annotated

Annotated 类型是从 Python 3.9 引入的,它允许你将额外的信息附加到类型注解上。这些信息本身不会影响类型检查的行为,但可以被工具或第三方库用来提供额外的指导或功能。

AnyMessage 和 add_messages

AnyMessage 是在 langgraph.graph.message 模块中定义的一个类型,代表了某种消息的类型。而 add_messages 则是一个函数(用于处理消息列表),它被作为元数据附加到 State 字典的 messages 字段上。这意味着 messages 字段应该是一个 AnyMessage 类型对象的列表,而且还有一些额外的处理。

代理助手

接下来,定义助手函数。这个函数接收图形状态,将其格式化为prompt,然后调用LLM以预测最佳的响应。

from langchain_core.runnables import Runnable, RunnableConfig
from langchain_community.tools.tavily_search import TavilySearchResults

from langchain_core.prompts import ChatPromptTemplate
os.environ["TAVILY_API_KEY"] = "xxx"
class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            passenger_id = config.get("passenger_id"None)
            state = {**state, "user_info": passenger_id}
            result = self.runnable.invoke(state)
            # 如果大型语言模型返回了一个空响应,我们将 re-prompt 它给出一个实际的响应
            if not result.tool_calls and (
                    not result.content
                    or isinstance(result.content, list)
                    and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user""请给出一个真实的输出。")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}

llm = ChatOpenAI(model_name="gpt-4o", temperature=1)

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "你是一个为瑞士航空提供帮助的客户支持助手。"
            "使用提供的工具来搜索航班、公司政策和其他信息以帮助回答用户的查询。"
            "在搜索时,要有毅力。如果第一次搜索没有结果,就扩大你的查询范围。"
            "如果搜索结果为空,不要放弃,先扩大搜索范围。"
            "\n\n当前用户:\n<User>\n{user_info}\n</User>"
            "\n当前时间:{time}。",
        ),
        ("placeholder""{messages}"),
    ]
).partial(time=datetime.now())

part_1_tools = [
    TavilySearchResults(max_results=1),
    fetch_user_flight_information,
    search_flights,
    lookup_policy,
    update_ticket_to_new_flight,
    cancel_ticket,
    search_car_rentals,
    book_car_rental,
    update_car_rental,
    cancel_car_rental,
    search_hotels,
    book_hotel,
    update_hotel,
    cancel_hotel,
    search_trip_recommendations,
    book_excursion,
    update_excursion,
    cancel_excursion,
]
part_1_assistant_runnable = primary_assistant_prompt | llm.bind_tools(part_1_tools)

定义图

现在,我们来创建图。这张图是我们这部分的最终助手。

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition, ToolNode

builder = StateGraph(State)

builder.add_node("assistant", Assistant(part_1_assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(part_1_tools))

builder.set_entry_point("assistant")
builder.add_conditional_edges(
    "assistant",
    tools_condition,
)
builder.add_edge("tools""assistant")

# 检查点器允许图保存其状态, 这是整个图的完整记忆
memory = SqliteSaver.from_conn_string(":memory:")
# 编译图
part_1_graph = builder.compile(checkpointer=memory)

测试

现在是时候试试我们强大的聊天机器人了!让我们运行以下的对话轮次。如果它遇到了“递归限制”,那就意味着代理无法在分配的步骤数内得到答案。没关系!在后面的文章,我们还有更多的技巧去解决这类问题。

import uuid  
import shutil  
  
# 假设这是用户与助手之间可能发生的对话示例  
tutorial_questions = [  
    "你好,我的航班是什么时候?",  
    "我可以把我的航班改签到更早的时间吗?我想今天晚些时候离开。",  
    "那就把我的航班改签到下周某个时间吧",  
    "下一个可用的选项很好",  
    "住宿和交通方面有什么建议?",  
    "我想在为期一周的住宿中选择一个经济实惠的酒店(7天),并且我还想租一辆车。",  
    "好的,你能为你推荐的酒店预订吗?听起来不错。",  
    "是的,去预订任何中等价位且有可用性的酒店。",  
    "对于汽车,我有哪些选择?",  
    "太棒了,我们只选择最便宜的选项。预订7天。",  
    "那么,你对我的旅行有什么建议?",  
    "在我在那里的时候,有哪些活动是可用的?",  
    "有趣 - 我喜欢博物馆,有哪些选择?",  
    "好的,那就为我在那里的第二天预订一个。",  
]  
  
# 使用备份文件以便我们可以从每个部分的原始位置重新启动  
shutil.copy(backup_file, db)  
thread_id = str(uuid.uuid4())  
  
config = {  
    "configurable": {  
        # passenger_id 在我们的航班工具中使用  
        # 以获取用户的航班信息  
        "passenger_id""3442 587242",  
        # 检查点通过 thread_id 访问  
        "thread_id": thread_id,  
    }  
}  
  
  
_printed = set()  
for question in tutorial_questions:  
    events = part_1_graph.stream(  
        {"messages": ("user", question)}, config, stream_mode="values"  
    )  
    for event in events:  
        _print_event(event, _printed)


返回结果如下:


总结

我们的简单助手做得不错!它能够对所有问题作出相当好的回应,快速地进行情境回应,并成功完成所有任务。

如果这只是一个简单的问答机器人,我们可能会对上述结果感到满意。但由于我们的客户支持机器人代表用户采取行动,因此上述的一些行为让人有些担忧:

助手在我们关注住宿的时候预定了一辆汽车,然后不得不取消并稍后重新预定:糟糕!用户应该在预定之前有最终决策权,以避免不必要的费用。
助手在寻找推荐时有些困难。我们可以通过添加更详细的指导和使用工具的示例来改进这一点,但是对每个工具都这样做,可能会导致提示过长和代理 overwhelmed。
助手必须进行明确的搜索才能获取用户的相关信息。我们可以通过立即抓取用户的相关旅行详情来节省大量时间,以便助手可以直接回应。


53AI,企业落地应用大模型首选服务商

产品:大模型应用平台+智能体定制开发+落地咨询服务

承诺:先做场景POC验证,看到效果再签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询