mcp-server-ollama-deep-researcher
by Cam10001110101
Verified
- mcp-server-ollama-deep-researcher
- src
- assistant
import json
import os
from typing_extensions import Literal
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langchain_ollama import ChatOllama
from langgraph.graph import START, END, StateGraph
from langsmith import trace
from assistant.configuration import Configuration, SearchAPI
from assistant.utils import deduplicate_and_format_sources, tavily_search, format_sources, perplexity_search
from assistant.state import SummaryState, SummaryStateInput, SummaryStateOutput
from assistant.prompts import query_writer_instructions, summarizer_instructions, reflection_instructions
# Nodes
def generate_query(state: SummaryState, config: RunnableConfig):
""" Generate a query for web search """
# Add timeout to config
if not config:
config = {}
if "configurable" not in config:
config["configurable"] = {}
config["configurable"]["timeout"] = 300 # 5 minutes timeout
configurable = Configuration.from_runnable_config(config)
# Enable tracing if configured
if configurable.langsmith_tracing and configurable.langsmith_api_key:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
try:
# Format the prompt
query_writer_instructions_formatted = query_writer_instructions.format(research_topic=state.research_topic)
# Generate a query
configurable = Configuration.from_runnable_config(config)
llm_json_mode = ChatOllama(
model=configurable.local_llm,
temperature=0,
format="json",
base_url=configurable.ollama_base_url
)
result = llm_json_mode.invoke(
[SystemMessage(content=query_writer_instructions_formatted),
HumanMessage(content=f"Generate a query for web search:")]
)
query = json.loads(result.content)
return {"search_query": query['query']}
except Exception as e:
# If LLM fails, use the research topic as the query
return {"search_query": state.research_topic}
def web_research(state: SummaryState, config: RunnableConfig):
""" Gather information from the web """
# Add timeout to config
if not config:
config = {}
if "configurable" not in config:
config["configurable"] = {}
config["configurable"]["timeout"] = 300 # 5 minutes timeout
# Configure
configurable = Configuration.from_runnable_config(config)
# Enable tracing if configured
if configurable.langsmith_tracing and configurable.langsmith_api_key:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
try:
# Search the web
if configurable.search_api == SearchAPI.TAVILY:
search_results = tavily_search(state.search_query, include_raw_content=True, max_results=1)
search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=True)
elif configurable.search_api == SearchAPI.PERPLEXITY:
search_results = perplexity_search(state.search_query, state.research_loop_count)
search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=False)
else:
raise ValueError(f"Unsupported search API: {configurable.search_api}")
except Exception as e:
# If we have a running summary, continue with a note about the error
if state.running_summary:
error_note = f"\n\nNote: Search failed during research loop {state.research_loop_count + 1} using {configurable.search_api.value} API. Error: {str(e)}"
return {
"sources_gathered": state.sources_gathered + [f"[Search failed in loop {state.research_loop_count + 1}]"],
"research_loop_count": state.research_loop_count + 1,
"web_research_results": state.web_research_results + [error_note],
"running_summary": state.running_summary + error_note
}
# If this is the first search and it failed, raise the error
raise
return {
"sources_gathered": [format_sources(search_results)],
"research_loop_count": state.research_loop_count + 1,
"web_research_results": [search_str]
}
def summarize_sources(state: SummaryState, config: RunnableConfig):
""" Summarize the gathered sources """
# Add timeout to config
if not config:
config = {}
if "configurable" not in config:
config["configurable"] = {}
config["configurable"]["timeout"] = 300 # 5 minutes timeout
configurable = Configuration.from_runnable_config(config)
# Enable tracing if configured
if configurable.langsmith_tracing and configurable.langsmith_api_key:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
try:
# Existing summary
existing_summary = state.running_summary
# Most recent web research
most_recent_web_research = state.web_research_results[-1]
# Build the human message
if existing_summary:
human_message_content = (
f"<User Input> \n {state.research_topic} \n <User Input>\n\n"
f"<Existing Summary> \n {existing_summary} \n <Existing Summary>\n\n"
f"<New Search Results> \n {most_recent_web_research} \n <New Search Results>"
)
else:
human_message_content = (
f"<User Input> \n {state.research_topic} \n <User Input>\n\n"
f"<Search Results> \n {most_recent_web_research} \n <Search Results>"
)
# Run the LLM
configurable = Configuration.from_runnable_config(config)
llm = ChatOllama(
model=configurable.local_llm,
temperature=0,
base_url=configurable.ollama_base_url
)
result = llm.invoke(
[SystemMessage(content=summarizer_instructions),
HumanMessage(content=human_message_content)]
)
running_summary = result.content
# TODO: This is a hack to remove the <think> tags w/ Deepseek models
# It appears very challenging to prompt them out of the responses
while "<think>" in running_summary and "</think>" in running_summary:
start = running_summary.find("<think>")
end = running_summary.find("</think>") + len("</think>")
running_summary = running_summary[:start] + running_summary[end:]
return {"running_summary": running_summary}
except Exception as e:
# If LLM fails but we have existing summary, preserve it with error note
if existing_summary:
error_note = f"\n\nNote: Failed to summarize new sources due to LLM error: {str(e)}"
return {"running_summary": existing_summary + error_note}
# If this is the first summary and LLM failed, return raw search results
return {"running_summary": f"Research on: {state.research_topic}\n\nRaw search results:\n{most_recent_web_research}"}
def reflect_on_summary(state: SummaryState, config: RunnableConfig):
""" Reflect on the summary and generate a follow-up query """
# Add timeout to config
if not config:
config = {}
if "configurable" not in config:
config["configurable"] = {}
config["configurable"]["timeout"] = 300 # 5 minutes timeout
configurable = Configuration.from_runnable_config(config)
# Enable tracing if configured
if configurable.langsmith_tracing and configurable.langsmith_api_key:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
try:
# Generate a query
configurable = Configuration.from_runnable_config(config)
llm_json_mode = ChatOllama(
model=configurable.local_llm,
temperature=0,
format="json",
base_url=configurable.ollama_base_url
)
result = llm_json_mode.invoke(
[SystemMessage(content=reflection_instructions.format(research_topic=state.research_topic)),
HumanMessage(content=f"Identify a knowledge gap and generate a follow-up web search query based on our existing knowledge: {state.running_summary}")]
)
try:
follow_up_query = json.loads(result.content)
query = follow_up_query.get('follow_up_query')
if query:
return {"search_query": query}
except (json.JSONDecodeError, AttributeError):
pass # Fall through to fallback
except Exception as e:
# Add error note to summary before falling through to fallback
error_note = f"\n\nNote: Failed to generate follow-up query due to LLM error: {str(e)}"
state.running_summary += error_note
# Fallback: Generate a simple follow-up query based on research topic
fallback_queries = [
f"latest developments in {state.research_topic}",
f"important aspects of {state.research_topic}",
f"key information about {state.research_topic}",
f"Tell me more about {state.research_topic}"
]
import random
return {"search_query": random.choice(fallback_queries)}
def finalize_summary(state: SummaryState):
""" Finalize the summary """
# Format all accumulated sources into a single bulleted list
all_sources = "\n".join(source for source in state.sources_gathered)
state.running_summary = f"## Summary\n\n{state.running_summary}\n\n ### Sources:\n{all_sources}"
return {"running_summary": state.running_summary}
def route_research(state: SummaryState, config: RunnableConfig) -> Literal["finalize_summary", "web_research"]:
""" Route the research based on the follow-up query """
configurable = Configuration.from_runnable_config(config)
if state.research_loop_count <= configurable.max_web_research_loops:
return "web_research"
else:
return "finalize_summary"
# Add nodes and edges
builder = StateGraph(SummaryState, input=SummaryStateInput, output=SummaryStateOutput, config_schema=Configuration)
# Add nodes with tracing
builder.add_node("generate_query", generate_query)
builder.add_node("web_research", web_research)
builder.add_node("summarize_sources", summarize_sources)
builder.add_node("reflect_on_summary", reflect_on_summary)
builder.add_node("finalize_summary", finalize_summary)
# Add edges
builder.add_edge(START, "generate_query")
builder.add_edge("generate_query", "web_research")
builder.add_edge("web_research", "summarize_sources")
builder.add_edge("summarize_sources", "reflect_on_summary")
builder.add_conditional_edges("reflect_on_summary", route_research)
builder.add_edge("finalize_summary", END)
# Compile the graph
graph = builder.compile()