AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


AI应用中基于语音驱动的RAG如何做?
发布日期:2024-08-30 12:21:24 浏览次数: 2121 来源:山行AI


介绍

在当今快节奏的世界中,信息触手可及,我们经常需要寻找复杂问题的答案。但是,如果您只需对设备提问,就能通过语音获得简洁准确的回答呢?

这就是语音驱动的(音频)RAG的出现,它是一种将语音命令的便利性与问答智能相结合的创新系统。

在这里,用户可以通过语音命令与知识库进行互动。通过利用语音识别和自然语言处理的最新进展,Audio RAG 能够无缝地将您的语音问题转换为文本,进而从庞大的知识库中检索相关信息。

Audio RAG 的工作原理

1.录制音频:系统首先使用麦克风录制您的语音。只需提出问题,Audio RAG 就会捕捉您的音频输入。2.语音转换为文本:一旦音频被录制,它将通过语音识别引擎进行处理。这个强大的工具会分析您的语音声学特性,并将其转换为文本格式,同时保留问题的含义和上下文。3.执行问答:在获得转录文本后,Audio RAG 利用先进的语言模型来理解问题背后的含义和意图。然后,它会搜索庞大的知识库,检索最相关的信息,为您提供简洁准确的答案。4.文本转换为语音:LLM 生成的回答将被转换为音频文件。

使用的技术栈

1.GROQ:这是一个快速的AI推理技术,由LPU™ AI推理技术提供支持,能够提供快速、经济且高效的AI。2.gTTS:gTTS(Google Text-to-Speech)是一个Python库和CLI工具,允许您与Google翻译的文本转语音API进行接口操作。它使您能够将文本转换为语音并保存为MP3文件。3.pydub:pydub是一个Python库,它提供了一个简单易用的高级接口,用于操作音频文件。它允许您对音频数据执行各种操作。4.LangChain:这是一个用于大语言模型应用开发的框架。5.HuggingFace嵌入模型:Hugging Face 是自然语言处理(NLP)和人工智能(AI)领域的一个著名平台,以其广泛的预训练模型和用于构建利用这些模型的应用程序的工具而闻名。Hugging Face 提供的一个关键功能是生成嵌入,这些嵌入是文本数据的密集向量表示。它们可用于各种任务,包括语义搜索、文本分类和聚类。6.Streamlit:Streamlit是一个开源Python框架,旨在快速轻松地构建和分享交互式数据应用程序。7.ChromaDB:ChromaDB 是一个开源的向量数据库,旨在促进向量嵌入的存储和检索,使其特别适用于涉及大语言模型(LLMs)和语义搜索的应用程序。

启用音频的RAG

代码实现

安装所需依赖

!pip install langchain langchain_community langchain_groq chromadb sentence_transformers!pip install -U langchain-huggingface!pip install -U langchain-chroma!pip install python-dotenv

创建 .env 文件并设置您的 Groq API 密钥

GROQ_API_KEY=<你的API密钥>

设置 Groq API 密钥

import osfrom dotenv import load_dotenv
# 从 .env 文件加载环境变量load_dotenv()

设置 LLM

llm = ChatGroq(model_name="llama3-70b-8192",temperature=0.1,max_tokens=1000,)

设置嵌入模型

from langchain_huggingface import HuggingFaceEmbeddings
model_name = "BAAI/bge-small-en-v1.5"model_kwargs = {"device": "cpu"}encode_kwargs = {"normalize_embeddings": False}embeddings = HuggingFaceEmbeddings(model_name=model_name,model_kwargs=model_kwargs,encode_kwargs=encode_kwargs)

设置文本分割辅助函数

# 文本分割器def text_splitter():text_splitter = RecursiveCharacterTextSplitter(chunk_size=512,chunk_overlap=20,length_function=len,)return text_splitter

设置 RetrievalQA 辅助函数

# RetrievalQAdef answer_question(question):retriever = vectorstore.as_retriever(search_kwargs={"k": 5})qa = RetrievalQA.from_chain_type(llm=llm,chain_type="stuff",retriever=retriever,return_source_documents=True)result = qa.invoke({"query": question})return result['result']

使用 Groq distil-whisper-large-v3-en 转录音频的辅助函数 

distil-whisper-large-v3-en:Distil-Whisper English 是 OpenAI Whisper 模型的一个压缩版本,旨在提供更快、成本更低的英语语音识别,同时保持相当的准确性。

支持语言:仅限英语

import osfrom groq import Groq
# 初始化 Groq 客户端client = Groq()
# 指定音频文件的路径filename = "/content/recorded_audio.wav"
def transcribe_audio(filename):# 打开音频文件with open(filename, "rb") as file:# 创建音频文件的转录transcription = client.audio.transcriptions.create(file=(filename, file.read()),# 必需的音频文件model="distil-whisper-large-v3-en",# 必需的用于转录的模型prompt="Specify context or spelling",# 可选的上下文或拼写提示response_format="json",# 可选的响应格式language="en",# 可选的语言temperature=0.0# 可选的温度参数)# 打印转录文本print(transcription.text)return transcription.text

将文本转换为语音的辅助函数

from gtts import gTTS
def text_to_audio(text):# 将文本转换为语音tts = gTTS(text=text, lang='en', slow=False)# 将音频保存为MP3文件mp3_file = "temp_audio.mp3"tts.save(mp3_file)return mp3_file

为 Streamlit 应用程序准备的完整代码实现.

请编写一个名为 audio_rag.py 的 Python 脚本。

import streamlit as stfrom time import sleep#from st_audiorec import st_audiorecfrom streamlit_mic_recorder import mic_recorderfrom streamlit_chat import messageimport osfrom groq import Groqfrom langchain_groq import ChatGroqfrom langchain_community.embeddings import HuggingFaceBgeEmbeddingsfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom langchain.document_loaders import PyPDFLoaderfrom langchain_chroma import Chromafrom langchain.chains import RetrievalQAfrom chromadb.config import Settingsimport chromadbfrom gtts import gTTSfrom pydub import AudioSegmentimport osfrom dotenv import load_dotenv
# 从 .env 文件加载环境变量load_dotenv()设置 Chroma 配置
chroma_setting = Settings(anonymized_telemetry=False)获取最新的文件路径
def newest(path):files = os.listdir(path)paths = [os.path.join(path, basename) for basename in files]newest_file_path = max(paths, key=os.path.getctime)return os.path.basename(newest_file_path)文本转换为音频的辅助函数
def text_to_audio(text):# 将文本转换为语音tts = gTTS(text=text, lang='en', slow=False)# 将音频保存为MP3文件mp3_file = "temp_audio.mp3"tts.save(mp3_file)return mp3_file保存上传的文件
def save_uploaded_file(uploaded_file, directory):try:with open(os.path.join(directory, uploaded_file.name), "wb") as f:f.write(uploaded_file.getbuffer())return st.success(f"已保存文件:{uploaded_file.name} 到 {directory}")except Exception as e:return st.error(f"保存文件出错:{e}")创建用于保存上传文件的目录
upload_dir = "uploaded_files"os.makedirs(upload_dir, exist_ok=True)设置 LLM
llm = ChatGroq(model_name="llama3-70b-8192",temperature=0.1,max_tokens=1000,)设置嵌入模型
model_name ="BAAI/bge-small-en-v1.5"model_kwargs ={"device":"cpu"}encode_kwargs ={"normalize_embeddings":False}embeddings = HuggingFaceBgeEmbeddings(model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs)设置文本分割器
def text_splitter():text_splitter = RecursiveCharacterTextSplitter(chunk_size=512,chunk_overlap=20,length_function=len,)return text_splitter设置 RetrievalQA
def answer_question(question,vectorstore):retriever = vectorstore.as_retriever(search_kwargs={"k": 5})qa = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True)result = qa.invoke({"query": question})return result['result']初始化 Groq 客户端
groq_client = Groq()指定音频文件的路径
filename = "recorded_audio.wav"转录音频的辅助函数
def transcribe_audio(filename):# 打开音频文件with open(filename, "rb") as file:# 创建音频文件的转录transcription = groq_client.audio.transcriptions.create(file=(filename, file.read()), # 必需的音频文件model="distil-whisper-large-v3-en", # 必需的用于转录的模型prompt="指定上下文或拼写",# 可选response_format="json",# 可选language="en",# 可选temperature=0.0# 可选)# 打印转录文本print(transcription.text)return transcription.text初始化 session state 变量
if 'stop' not in st.session_state:st.session_state.stop = False设置页面配置
st.set_page_config(page_title="音频和书籍应用程序",page_icon="?",# 您可以使用表情符号或图片URLlayout="wide")创建两列布局
col1, col2 = st.columns([1, 2])# 调整比例控制列的宽度在左列显示内容
with col1:st.markdown("""<h1 style='text-align: center;'>? 启用音频的 ? 知识应用程序</h1><h5 style='text-align: center;'>您的音频问答系统一站式解决方案!</h5>""",unsafe_allow_html=True)st.write("欢迎使用启用音频的 RAG 应用程序!")st.image("audio.jpeg", caption="音频驱动的 RAG", output_format="auto")if st.button("停止进程"):st.session_state.stop = True# 设置停止标志为 True
if st.session_state.stop:st.write("进程已停止。您可以刷新页面重新开始。")在右列显示 PDF 上传和阅读器
with col2:st.title("PDF 上传和阅读器")uploaded_file = st.file_uploader("选择一个 PDF 文件", type="pdf")persist_directory_path = "chromanew"if uploaded_file is not None:save_uploaded_file(uploaded_file, upload_dir)file_name = uploaded_file.nameloader = PyPDFLoader(f"uploaded_files/{file_name}")pages = loader.load_and_split(text_splitter())persist_directory = persist_directory_path + "_" + file_name.split(".")[0]if os.path.exists(persist_directory):client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)vectorstore = Chroma(embedding_function=embeddings, client=client, persist_directory=persist_directory, collection_name=file_name.split(".")[0], client_settings=chroma_setting,)print(f"向量存储中加载的文档数量:{len(vectorstore.get()['documents'])}")else:client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)vectorstore = Chroma(embedding_function=embeddings, client=client, persist_directory=persist_directory, collection_name=file_name.split(".")[0], client_settings=chroma_setting)MAX_BATCH_SIZE = 100for i in range(0, len(pages), MAX_BATCH_SIZE):i_end = min(len(pages), i+MAX_BATCH_SIZE)batch = pages[i:i_end]vectorstore.add_documents(batch)print(f"向量存储中加载的文档数量:{len(vectorstore.get()['documents'])}")创建一个按钮来启动进程
if 'start_process' not in st.session_state:st.session_state.start_process = False
if st.button("启动进程"):st.session_state.start_process = True
if st.session_state.start_process:options = os.listdir("uploaded_files")none_list = ["none"]options += none_listselected_option = st.selectbox("选择一个选项:", options)file_name = newest("uploaded_files") if selected_option == "none" else selected_optionst.write(f"您选择了:{selected_option}")st.title("音频录制 - 根据所选选项提问")
with st.spinner("正在进行音频录制..."):audio = mic_recorder(start_prompt="开始录制",stop_prompt="停止录制",just_once=False,key='recorder')if audio:st.audio(audio['bytes'], format='audio/wav')with open("recorded_audio.wav", "wb") as f:f.write(audio['bytes'])st.success("音频录制完成!")with st.spinner("正在进行音频转录..."):text = transcribe_audio(filename)transcription = textst.markdown(text)if "chat_history" not in st.session_state:st.session_state.chat_history = []for i, chat in enumerate(st.session_state.chat_history):message(chat["question"], is_user=True, key=f"question_{i}")message(chat["response"], is_user=False, key=f"response_{i}")if transcription:with st.spinner("正在生成回应..."):persist_directory = persist_directory_path + "_" + file_name.split(".")[0]client = chromadb.PersistentClient(path=persist_directory, settings=chroma_setting)vectorstore = Chroma(embedding_function=embeddings,client=client,persist_directory=persist_directory,collection_name=file_name.split(".")[0],client_settings=chroma_setting)response = answer_question(transcription, vectorstore)st.success("回应已生成")aud_file = text_to_audio(response)st.session_state.chat_history.append({"question": transcription, "response": response})message(transcription, is_user=True)message(response, is_user=False)st.title("音频播放")st.audio(aud_file, format='audio/wav', start_time=0)

运行 Streamlit 应用程序

streamlit run audio_rag.py

应用程序处理快照

1.浏览并上传 PDF 文件

2.现在向量存储已经加载,点击“启动进程”按钮。

3.开始录制您的问题。一旦问题录制完成,RAG 过程将被触发。应用程序屏幕上会显示相应的回答以及对应的音频。

4.如果您想在已上传的文档上进行问答,请选择知识来源选择框中的选项。

5.上传另一个文档。如果您想在当前上传的文档中进行问答,请选择“无”选项。


6.按下停止按钮以停止进程。


7.刷新屏幕以重新开始。


结论

在本文中,我们探讨了使用Python开发音频驱动的问答系统(Audio RAG)。我介绍了其中的关键步骤,包括录制音频、将语音转换为文本以及使用RAG模型进行问答。通过结合这些技术,我们可以创建一个强大的系统,使用户能够使用语音命令与知识库进行交互。



53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

与创始人交个朋友

回到顶部

 
扫码咨询