今天一起从零搭建一个基于本地运行的 Qwen 3 和 Gemma 3 模型的 RAG 系统,结合文档处理、向量搜索和网络搜索功能,为用户提供准确且上下文相关的回答,项目来自 Unwind AI 的教程,开源地址见阅读原文,今天咱们一起来解读项目的搭建和技术重点。

项目概述
· 名称:Qwen 3 Local RAG Reasoning Agent
· 目标:通过本地运行的轻量级 LLM 和向量数据库,构建一个高效的 RAG 系统,支持文档问答、网页内容提取和网络搜索。
· 核心功能:
1. 文档处理:支持上传 PDF 文件或输入网页 URL,提取内容并进行智能分块。
2. 向量搜索:使用 Qdrant 向量数据库存储文档嵌入(embeddings),实现高效的相似性搜索。
3. 网络搜索:当文档知识不足时,可通过 Exa API 进行网络搜索,补充答案。
4. 灵活模式:支持 RAG 模式(结合文档和搜索)和直接 LLM 交互模式。
5. 隐私保护:所有处理都在本地完成,适合处理敏感数据。
技术架构
1. 语言模型:
· 支持多种本地模型:Qwen 3(1.7B、8B)、Gemma 3(1B、4B)、DeepSeek(1.5B)。
· 通过 Ollama 框架在本地运行模型,降低对云服务的依赖。
2. 文档处理:
· 使用 PyPDFLoader 处理 PDF 文件,WebBaseLoader 提取网页内容。
· RecursiveCharacterTextSplitter 将文档切分为小块,便于嵌入和搜索。
3. 向量数据库:
· 使用 Qdrant 存储文档的嵌入向量,支持高效的相似性搜索。
· 嵌入模型:Ollama 提供的 snowflake-arctic-embed。
4. 网络搜索:
· 通过 Exa API 实现网络搜索,支持自定义域名过滤。
5. 用户界面:
· 使用 Streamlit 构建交互式 Web 界面,方便用户上传文件、输入 URL 和提问。
主要功能
1. 文档问答:
· 用户上传 PDF 或输入 URL,系统将内容转为嵌入向量,存储在 Qdrant 中。
· 用户提问时,系统通过相似性搜索找到相关文档片段,生成答案。
2. 网络搜索补充:
· 如果文档中没有足够信息,系统会自动或手动(通过开关)触发网络搜索,获取补充信息。
· 搜索结果会明确标注来源。
3. 灵活配置:
· 可选择不同模型(如 Qwen 3 或 Gemma 3)。
· 可调整相似性阈值,控制文档检索的严格程度。
· 支持禁用 RAG 模式,直接与 LLM 对话。
4. 隐私与离线支持:
· 所有模型和处理都在本地运行,无需将数据发送到云端。
· 适合需要数据隐私的场景或无网络环境。
使用方法
1. 环境准备:
· 安装 Ollama 和 Python 3.8+。
· 通过 Docker 运行 Qdrant 向量数据库。
· 获取 Exa API 密钥(可选,用于网络搜索)。
2. 安装依赖:
pip install -r requirements.txt
3. 拉取模型:
ollama pull qwen3:1.7bollama pull snowflake-arctic-embed
4. 运行 Qdrant:
docker run -p 6333:6333 -p 6334:6334 -v "$(pwd)/qdrant_storage:/qdrant/storage:z" qdrant/qdrant
5. 启动应用:
streamlit run qwen_local_rag_agent.py
6. 操作:
· 在 Streamlit 界面上传 PDF 或输入 URL。
· 调整模型、RAG 模式或搜索设置。
· 输入问题,获取带来源的答案。
应用场景
· 学术研究:快速查询上传的论文或网页内容,结合网络搜索补充最新信息。
· 企业文档管理:处理内部文档(如手册、报告),提供智能问答。
· 隐私敏感场景:在本地处理法律、医疗等敏感文档,避免数据外泄。
· 离线环境:在无网络情况下,利用本地模型和文档进行知识查询。
项目优势
· 开源免费:代码公开,可自由修改和部署。
· 本地化:无需依赖云服务,保护数据隐私。
· 模块化:支持多种模型和配置,易于扩展。
· 用户友好:Streamlit 界面简单直观,适合非技术用户。
总结
这个项目是一个功能强大且灵活的本地 RAG 系统,结合了本地语言模型、向量数据库和网络搜索,适合需要隐私保护、离线操作或定制化知识查询的场景。通过简单的配置,用户可以快速构建一个智能问答助手,处理文档和网页内容,同时保持数据安全。
agnopypdfexaqdrant-clientlangchain-qdrantlangchain-communitystreamlitollama
import osimport tempfilefrom datetime import datetimefrom typing import Listimport streamlit as stimport bs4from agno.agent import Agentfrom agno.models.ollama import Ollamafrom langchain_community.document_loaders import PyPDFLoader, WebBaseLoaderfrom langchain.text_splitter import RecursiveCharacterTextSplitterfrom langchain_qdrant import QdrantVectorStorefrom qdrant_client import QdrantClientfrom qdrant_client.models import Distance, VectorParamsfrom langchain_core.embeddings import Embeddingsfrom agno.tools.exa import ExaToolsfrom agno.embedder.ollama import OllamaEmbedderclass OllamaEmbedderr(Embeddings):def __init__(self, model_name="snowflake-arctic-embed"):"""Initialize the OllamaEmbedderr with a specific model.Args:model_name (str): The name of the model to use for embedding."""self.embedder = OllamaEmbedder(id=model_name, dimensions=1024)def embed_documents(self, texts: List[str]) -> List[List[float]]:return [self.embed_query(text) for text in texts]def embed_query(self, text: str) -> List[float]:return self.embedder.get_embedding(text)# ConstantsCOLLECTION_NAME = "test-qwen-r1"# Streamlit App Initializationst.title("? Qwen 3 Local RAG Reasoning Agent")# --- Add Model Info Boxes ---st.info("**Qwen3:** The latest generation of large language models in Qwen series, offering a comprehensive suite of dense and mixture-of-experts (MoE) models.")st.info("**Gemma 3:** These models are multimodal—processing text and images—and feature a 128K context window with support for over 140 languages.")# -------------------------# Session State Initializationif 'model_version' not in st.session_state:st.session_state.model_version = "qwen3:1.7b" # Default to lighter modelif 'vector_store' not in st.session_state:st.session_state.vector_store = Noneif 'processed_documents' not in st.session_state:st.session_state.processed_documents = []if 'history' not in st.session_state:st.session_state.history = []if 'exa_api_key' not in st.session_state:st.session_state.exa_api_key = ""if 'use_web_search' not in st.session_state:st.session_state.use_web_search = Falseif 'force_web_search' not in st.session_state:st.session_state.force_web_search = Falseif 'similarity_threshold' not in st.session_state:st.session_state.similarity_threshold = 0.7if 'rag_enabled' not in st.session_state:st.session_state.rag_enabled = True # RAG is enabled by default# Sidebar Configurationst.sidebar.header("⚙️ Settings")# Model Selectionst.sidebar.header("? Model Choice")model_help = """- qwen3:1.7b: Lighter model (MoE)- gemma3:1b: More capable but requires better GPU/RAM(32k context window)- gemma3:4b: More capable and MultiModal (Vision)(128k context window)- deepseek-r1:1.5b- qwen3:8b: More capable but requires better GPU/RAMChoose based on your hardware capabilities."""st.session_state.model_version = st.sidebar.radio("Select Model Version",options=["qwen3:1.7b", "gemma3:1b", "gemma3:4b", "deepseek-r1:1.5b", "qwen3:8b"],help=model_help)st.sidebar.info("Run ollama pull qwen3:1.7b")# RAG Mode Togglest.sidebar.header("? RAG Mode")st.session_state.rag_enabled = st.sidebar.toggle("Enable RAG", value=st.session_state.rag_enabled)# Clear Chat Buttonif st.sidebar.button("✨ Clear Chat"):st.session_state.history = []st.rerun()# Show API Configuration only if RAG is enabledif st.session_state.rag_enabled:st.sidebar.header("? Search Tuning")st.session_state.similarity_threshold = st.sidebar.slider("Similarity Threshold",min_value=0.0,max_value=1.0,value=0.7,help="Lower values will return more documents but might be less relevant. Higher values are more strict.")# Add in the sidebar configuration section, after the existing API inputsst.sidebar.header("? Web Search")st.session_state.use_web_search = st.sidebar.checkbox("Enable Web Search Fallback", value=st.session_state.use_web_search)if st.session_state.use_web_search:exa_api_key = st.sidebar.text_input("Exa AI API Key",type="password",value=st.session_state.exa_api_key,help="Required for web search fallback when no relevant documents are found")st.session_state.exa_api_key = exa_api_key# Optional domain filteringdefault_domains = ["arxiv.org", "wikipedia.org", "github.com", "medium.com"]custom_domains = st.sidebar.text_input("Custom domains (comma-separated)",value=",".join(default_domains),help="Enter domains to search from, e.g.: arxiv.org,wikipedia.org")search_domains = [d.strip() for d in custom_domains.split(",") if d.strip()]# Utility Functionsdef init_qdrant() -> QdrantClient | None:"""Initialize Qdrant client with local Docker setup.Returns:QdrantClient: The initialized Qdrant client if successful.None: If the initialization fails."""try:return QdrantClient(url="http://localhost:6333")except Exception as e:st.error(f"? Qdrant connection failed: {str(e)}")return None# Document Processing Functionsdef process_pdf(file) -> List:"""Process PDF file and add source metadata."""try:with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:tmp_file.write(file.getvalue())loader = PyPDFLoader(tmp_file.name)documents = loader.load()# Add source metadatafor doc in documents:doc.metadata.update({"source_type": "pdf","file_name": file.name,"timestamp": datetime.now().isoformat()})text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=200)return text_splitter.split_documents(documents)except Exception as e:st.error(f"? PDF processing error: {str(e)}")return []def process_web(url: str) -> List:"""Process web URL and add source metadata."""try:loader = WebBaseLoader(web_paths=(url,),bs_kwargs=dict(parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header", "content", "main"))))documents = loader.load()# Add source metadatafor doc in documents:doc.metadata.update({"source_type": "url","url": url,"timestamp": datetime.now().isoformat()})text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=200)return text_splitter.split_documents(documents)except Exception as e:st.error(f"? Web processing error: {str(e)}")return []# Vector Store Managementdef create_vector_store(client, texts):"""Create and initialize vector store with documents."""try:# Create collection if neededtry:client.create_collection(collection_name=COLLECTION_NAME,vectors_config=VectorParams(size=1024,distance=Distance.COSINE))st.success(f"? Created new collection: {COLLECTION_NAME}")except Exception as e:if "already exists" not in str(e).lower():raise e# Initialize vector storevector_store = QdrantVectorStore(client=client,collection_name=COLLECTION_NAME,embedding=OllamaEmbedderr())# Add documentswith st.spinner('? Uploading documents to Qdrant...'):vector_store.add_documents(texts)st.success("✅ Documents stored successfully!")return vector_storeexcept Exception as e:st.error(f"? Vector store error: {str(e)}")return Nonedef get_web_search_agent() -> Agent:"""Initialize a web search agent."""return Agent(name="Web Search Agent",model=Ollama(id="llama3.2"),tools=[ExaTools(api_key=st.session_state.exa_api_key,include_domains=search_domains,num_results=5)],instructions="""You are a web search expert. Your task is to:1. Search the web for relevant information about the query2. Compile and summarize the most relevant information3. Include sources in your response""",show_tool_calls=True,markdown=True,)def get_rag_agent() -> Agent:"""Initialize the main RAG agent."""return Agent(name="Qwen 3 RAG Agent",model=Ollama(id=st.session_state.model_version),instructions="""You are an Intelligent Agent specializing in providing accurate answers.When asked a question:- Analyze the question and answer the question with what you know.When given context from documents:- Focus on information from the provided documents- Be precise and cite specific detailsWhen given web search results:- Clearly indicate that the information comes from web search- Synthesize the information clearlyAlways maintain high accuracy and clarity in your responses.""",show_tool_calls=True,markdown=True,)def check_document_relevance(query: str, vector_store, threshold: float = 0.7) -> tuple[bool, List]:if not vector_store:return False, []retriever = vector_store.as_retriever(search_type="similarity_score_threshold",search_kwargs={"k": 5, "score_threshold": threshold})docs = retriever.invoke(query)return bool(docs), docschat_col, toggle_col = st.columns([0.9, 0.1])with chat_col:prompt = st.chat_input("Ask about your documents..." if st.session_state.rag_enabled else "Ask me anything...")with toggle_col:st.session_state.force_web_search = st.toggle('?', help="Force web search")# Check if RAG is enabledif st.session_state.rag_enabled:qdrant_client = init_qdrant()# --- Document Upload Section (Moved to Main Area) ---with st.expander("? Upload Documents or URLs for RAG", expanded=False):if not qdrant_client:st.warning("⚠️ Please configure Qdrant API Key and URL in the sidebar to enable document processing.")else:uploaded_files = st.file_uploader("Upload PDF files",accept_multiple_files=True,type='pdf')url_input = st.text_input("Enter URL to scrape")if uploaded_files:st.write(f"Processing {len(uploaded_files)} PDF file(s)...")all_texts = []for file in uploaded_files:if file.name not in st.session_state.processed_documents:with st.spinner(f"Processing {file.name}... "):texts = process_pdf(file)if texts:all_texts.extend(texts)st.session_state.processed_documents.append(file.name)else:st.write(f"? {file.name} already processed.")if all_texts:with st.spinner("Creating vector store..."):st.session_state.vector_store = create_vector_store(qdrant_client, all_texts)if url_input:if url_input not in st.session_state.processed_documents:with st.spinner(f"Scraping and processing {url_input}..."):texts = process_web(url_input)if texts:st.session_state.vector_store = create_vector_store(qdrant_client, texts)st.session_state.processed_documents.append(url_input)else:st.write(f"? {url_input} already processed.")if st.session_state.vector_store:st.success("Vector store is ready.")elif not uploaded_files and not url_input:st.info("Upload PDFs or enter a URL to populate the vector store.")# Display sources in sidebarif st.session_state.processed_documents:st.sidebar.header("? Processed Sources")for source in st.session_state.processed_documents:if source.endswith('.pdf'):st.sidebar.text(f"? {source}")else:st.sidebar.text(f"? {source}")if prompt:# Add user message to historyst.session_state.history.append({"role": "user", "content": prompt})with st.chat_message("user"):st.write(prompt)if st.session_state.rag_enabled:# Existing RAG flow remains unchangedwith st.spinner("?Evaluating the Query..."):try:rewritten_query = promptwith st.expander("Evaluating the query"):st.write(f"User's Prompt: {prompt}")except Exception as e:st.error(f"❌ Error rewriting query: {str(e)}")rewritten_query = prompt# Step 2: Choose search strategy based on force_web_search togglecontext = ""docs = []if not st.session_state.force_web_search and st.session_state.vector_store:# Try document search firstretriever = st.session_state.vector_store.as_retriever(search_type="similarity_score_threshold",search_kwargs={"k": 5,"score_threshold": st.session_state.similarity_threshold})docs = retriever.invoke(rewritten_query)if docs:context = "nn".join([d.page_content for d in docs])st.info(f"? Found {len(docs)} relevant documents (similarity > {st.session_state.similarity_threshold})")elif st.session_state.use_web_search:st.info("? No relevant documents found in database, falling back to web search...")# Step 3: Use web search if:# 1. Web search is forced ON via toggle, or# 2. No relevant documents found AND web search is enabled in settingsif (st.session_state.force_web_search or not context) and st.session_state.use_web_search and st.session_state.exa_api_key:with st.spinner("? Searching the web..."):try:web_search_agent = get_web_search_agent()web_results = web_search_agent.run(rewritten_query).contentif web_results:context = f"Web Search Results:n{web_results}"if st.session_state.force_web_search:st.info("ℹ️ Using web search as requested via toggle.")else:st.info("ℹ️ Using web search as fallback since no relevant documents were found.")except Exception as e:st.error(f"❌ Web search error: {str(e)}")# Step 4: Generate response using the RAG agentwith st.spinner("? Thinking..."):try:rag_agent = get_rag_agent()if context:full_prompt = f"""Context: {context}Original Question: {prompt}Please provide a comprehensive answer based on the available information."""else:full_prompt = f"Original Question: {prompt}n"st.info("ℹ️ No relevant information found in documents or web search.")response = rag_agent.run(full_prompt)# Add assistant response to historyst.session_state.history.append({"role": "assistant","content": response.content})# Display assistant responsewith st.chat_message("assistant"):st.write(response.content)# Show sources if availableif not st.session_state.force_web_search and 'docs' in locals() and docs:with st.expander("? See document sources"):for i, doc in enumerate(docs, 1):source_type = doc.metadata.get("source_type", "unknown")source_icon = "?" if source_type == "pdf" else "?"source_name = doc.metadata.get("file_name" if source_type == "pdf" else "url", "unknown")st.write(f"{source_icon} Source {i} from {source_name}:")st.write(f"{doc.page_content[:200]}...")except Exception as e:st.error(f"❌ Error generating response: {str(e)}")else:# Simple mode without RAGwith st.spinner("? Thinking..."):try:rag_agent = get_rag_agent()web_search_agent = get_web_search_agent() if st.session_state.use_web_search else None# Handle web search if forced or enabledcontext = ""if st.session_state.force_web_search and web_search_agent:with st.spinner("? Searching the web..."):try:web_results = web_search_agent.run(prompt).contentif web_results:context = f"Web Search Results:n{web_results}"st.info("ℹ️ Using web search as requested.")except Exception as e:st.error(f"❌ Web search error: {str(e)}")# Generate responseif context:full_prompt = f"""Context: {context}Question: {prompt}Please provide a comprehensive answer based on the available information."""else:full_prompt = promptresponse = rag_agent.run(full_prompt)response_content = response.content# Extract thinking process and final responseimport rethink_pattern = r'<think>(.*?)</think>'think_match = re.search(think_pattern, response_content, re.DOTALL)if think_match:thinking_process = think_match.group(1).strip()final_response = re.sub(think_pattern, '', response_content, flags=re.DOTALL).strip()else:thinking_process = Nonefinal_response = response_content# Add assistant response to history (only the final response)st.session_state.history.append({"role": "assistant","content": final_response})# Display assistant responsewith st.chat_message("assistant"):if thinking_process:with st.expander("? See thinking process"):st.markdown(thinking_process)st.markdown(final_response)except Exception as e:st.error(f"❌ Error generating response: {str(e)}")else:st.warning("You can directly talk to qwen and gemma models locally! Toggle the RAG mode to upload documents!")


