Skip to main content
Glama
Cam10001110101

mcp-server-ollama-deep-researcher

research

Perform in-depth topic research by combining web search results and LLM-based synthesis. Input a topic to generate detailed, locally processed insights using the MCP server's advanced capabilities.

Instructions

Research a topic using web search and LLM synthesis

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
topicYesThe topic to research

Implementation Reference

  • Input schema and description for the 'research' MCP tool.
    { name: "research", description: "Research a topic using web search and LLM synthesis", inputSchema: { type: "object", properties: { topic: { type: "string", description: "The topic to research" } }, required: ["topic"], }, },
  • TypeScript MCP CallTool handler for 'research': spawns uv run python on run_research.py with topic and config, awaits JSON summary output.
    case "research": { const topic = request.params.arguments?.topic as string | undefined; if (!topic) { return { content: [ { type: "text", text: "Research topic is required", }, ], isError: true, }; } try { // Validate API keys before starting research validateApiKeys(config.searchApi); // Validate max loops if (config.maxLoops < 1 || config.maxLoops > 10) { throw new Error("maxLoops must be between 1 and 10"); } // Use UV to run Python with the correct virtual environment const uvPath = "uv"; // Get absolute path to Python script from src directory const scriptPath = join(__dirname, "..", "src", "assistant", "run_research.py").replace(/\\/g, "/"); // Run the research script with arguments using uv run const pythonProcess: ChildProcess = spawn(uvPath, [ "run", "python", scriptPath, topic, config.maxLoops.toString(), config.llmModel, config.searchApi ], { env: { ...process.env, // Pass through existing environment variables PYTHONUNBUFFERED: "1", // Ensure Python output is not buffered PYTHONPATH: join(__dirname, "..", "src").replace(/\\/g, "/"), // Add src directory to Python path TAVILY_API_KEY: process.env.TAVILY_API_KEY || "", // Ensure API key is passed to Python process PERPLEXITY_API_KEY: process.env.PERPLEXITY_API_KEY || "", // Ensure API key is passed to Python process EXA_API_KEY: process.env.EXA_API_KEY || "" // Ensure API key is passed to Python process }, cwd: join(__dirname, "..").replace(/\\/g, "/") // Set working directory to project root }); // Collect output using Promise with 5 minute timeout const output = await new Promise<string>((resolve, reject) => { const timeout = setTimeout(() => { pythonProcess.kill(); reject(new Error('Research process timed out after 5 minutes')); }, 300000); // 5 minutes let stdout = ''; let stderr = ''; if (pythonProcess.stdout) { pythonProcess.stdout.on("data", (data: Buffer) => { const output = data.toString().trim(); if (output) { stdout += output; console.error(`[research] ${output}`); } }); } if (pythonProcess.stderr) { pythonProcess.stderr.on("data", (data: Buffer) => { const error = data.toString().trim(); if (error) { stderr += error; console.error(`[research error] ${error}`); } }); } pythonProcess.on("error", (error: Error) => { reject(new Error(`Failed to start Python process: ${error.message}`)); }); pythonProcess.on("close", (code: number) => { clearTimeout(timeout); if (code !== 0) { reject(new Error(`Python process exited with code ${code}. Error: ${stderr}`)); return; } try { const result = JSON.parse(stdout.trim()) as PythonResponse; if (result.error) { reject(new Error(result.error)); } else if (result.summary) { resolve(result.summary); } else { resolve('No summary available'); } } catch (e) { reject(new Error(`Failed to parse Python output: ${e}\nStdout: ${stdout}\nStderr: ${stderr}`)); } }); }); // Store completed research result const result: ResearchResult = { topic, summary: output, sources: [], timestamp: new Date().toISOString() }; researchResults.set(topicToUri(topic), result); // Update research state currentResearch = { topic, currentStep: "completed", loopCount: config.maxLoops, summary: output, sources: [] }; return { content: [ { type: "text", text: `Research completed. Summary:\n\n${output}`, }, ], }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); return { content: [ { type: "text", text: `Research failed: ${errorMessage}`, }, ], isError: true, }; } }
  • Python entrypoint invoked by TS handler: parses CLI args, calls LangGraph.invoke with research_topic and config, outputs JSON summary or error.
    import json import sys import warnings from assistant.graph import graph # Filter out warnings so they don't interfere with JSON output warnings.filterwarnings('ignore') def main(): try: topic = sys.argv[1] max_loops = int(sys.argv[2]) llm_model = sys.argv[3] search_api = sys.argv[4] result = graph.invoke( {'research_topic': topic}, {'configurable': { 'max_web_research_loops': max_loops, 'local_llm': llm_model, 'search_api': search_api }} ) # Ensure we're writing to stderr for logs and stdout for JSON only print(json.dumps({'summary': result.get('running_summary', 'No summary available')}), flush=True) except Exception as e: # Try to extract any partial results from the graph state try: partial_result = graph.get_state() summary = partial_result.get('running_summary', '') if summary: print(json.dumps({ 'summary': f"{summary}\n\nNote: Research process ended early due to error: {str(e)}", 'error': str(e) }), flush=True) return except: pass # If we couldn't get partial results, just return the error print(json.dumps({'error': str(e)}), flush=True) if __name__ == '__main__': main()
  • Core LangGraph implementation: defines workflow nodes (generate_query, web_research, summarize_sources, reflect_on_summary, finalize_summary) and edges for iterative research loop using LLM and web search.
    def generate_query(state: SummaryState, config: RunnableConfig): """ Generate a query for web search """ # Add timeout to config if not config: config = {} if "configurable" not in config: config["configurable"] = {} config["configurable"]["timeout"] = 300 # 5 minutes timeout configurable = Configuration.from_runnable_config(config) # Enable tracing if configured if configurable.langsmith_tracing and configurable.langsmith_api_key: os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project try: # Format the prompt query_writer_instructions_formatted = query_writer_instructions.format(research_topic=state.research_topic) # Generate a query configurable = Configuration.from_runnable_config(config) llm_json_mode = ChatOllama( model=configurable.local_llm, temperature=0, format="json", base_url=configurable.ollama_base_url ) result = llm_json_mode.invoke( [SystemMessage(content=query_writer_instructions_formatted), HumanMessage(content=f"Generate a query for web search:")] ) query = json.loads(result.content) return {"search_query": query['query']} except Exception as e: # If LLM fails, use the research topic as the query return {"search_query": state.research_topic} def web_research(state: SummaryState, config: RunnableConfig): """ Gather information from the web """ # Add timeout to config if not config: config = {} if "configurable" not in config: config["configurable"] = {} config["configurable"]["timeout"] = 300 # 5 minutes timeout # Configure configurable = Configuration.from_runnable_config(config) # Enable tracing if configured if configurable.langsmith_tracing and configurable.langsmith_api_key: os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project try: # Search the web if configurable.search_api == SearchAPI.TAVILY: search_results = tavily_search(state.search_query, include_raw_content=True, max_results=1) search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=True) elif configurable.search_api == SearchAPI.PERPLEXITY: search_results = perplexity_search(state.search_query, state.research_loop_count) search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=False) elif configurable.search_api == SearchAPI.EXA: search_results = exa_search(state.search_query, max_results=3) search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=True) else: raise ValueError(f"Unsupported search API: {configurable.search_api}") except Exception as e: # If we have a running summary, continue with a note about the error if state.running_summary: error_note = f"\n\nNote: Search failed during research loop {state.research_loop_count + 1} using {configurable.search_api.value} API. Error: {str(e)}" return { "sources_gathered": state.sources_gathered + [f"[Search failed in loop {state.research_loop_count + 1}]"], "research_loop_count": state.research_loop_count + 1, "web_research_results": state.web_research_results + [error_note], "running_summary": state.running_summary + error_note } # If this is the first search and it failed, raise the error raise return { "sources_gathered": [format_sources(search_results)], "research_loop_count": state.research_loop_count + 1, "web_research_results": [search_str] } def summarize_sources(state: SummaryState, config: RunnableConfig): """ Summarize the gathered sources """ # Add timeout to config if not config: config = {} if "configurable" not in config: config["configurable"] = {} config["configurable"]["timeout"] = 300 # 5 minutes timeout configurable = Configuration.from_runnable_config(config) # Enable tracing if configured if configurable.langsmith_tracing and configurable.langsmith_api_key: os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project try: # Existing summary existing_summary = state.running_summary # Most recent web research most_recent_web_research = state.web_research_results[-1] # Build the human message if existing_summary: human_message_content = ( f"<User Input> \n {state.research_topic} \n <User Input>\n\n" f"<Existing Summary> \n {existing_summary} \n <Existing Summary>\n\n" f"<New Search Results> \n {most_recent_web_research} \n <New Search Results>" ) else: human_message_content = ( f"<User Input> \n {state.research_topic} \n <User Input>\n\n" f"<Search Results> \n {most_recent_web_research} \n <Search Results>" ) # Run the LLM configurable = Configuration.from_runnable_config(config) llm = ChatOllama( model=configurable.local_llm, temperature=0, base_url=configurable.ollama_base_url ) result = llm.invoke( [SystemMessage(content=summarizer_instructions), HumanMessage(content=human_message_content)] ) running_summary = result.content # TODO: This is a hack to remove the <think> tags w/ Deepseek models # It appears very challenging to prompt them out of the responses while "<think>" in running_summary and "</think>" in running_summary: start = running_summary.find("<think>") end = running_summary.find("</think>") + len("</think>") running_summary = running_summary[:start] + running_summary[end:] return {"running_summary": running_summary} except Exception as e: # If LLM fails but we have existing summary, preserve it with error note if existing_summary: error_note = f"\n\nNote: Failed to summarize new sources due to LLM error: {str(e)}" return {"running_summary": existing_summary + error_note} # If this is the first summary and LLM failed, return raw search results return {"running_summary": f"Research on: {state.research_topic}\n\nRaw search results:\n{most_recent_web_research}"} def reflect_on_summary(state: SummaryState, config: RunnableConfig): """ Reflect on the summary and generate a follow-up query """ # Add timeout to config if not config: config = {} if "configurable" not in config: config["configurable"] = {} config["configurable"]["timeout"] = 300 # 5 minutes timeout configurable = Configuration.from_runnable_config(config) # Enable tracing if configured if configurable.langsmith_tracing and configurable.langsmith_api_key: os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project try: # Generate a query configurable = Configuration.from_runnable_config(config) llm_json_mode = ChatOllama( model=configurable.local_llm, temperature=0, format="json", base_url=configurable.ollama_base_url ) result = llm_json_mode.invoke( [SystemMessage(content=reflection_instructions.format(research_topic=state.research_topic)), HumanMessage(content=f"Identify a knowledge gap and generate a follow-up web search query based on our existing knowledge: {state.running_summary}")] ) try: follow_up_query = json.loads(result.content) query = follow_up_query.get('follow_up_query') if query: return {"search_query": query} except (json.JSONDecodeError, AttributeError): pass # Fall through to fallback except Exception as e: # Add error note to summary before falling through to fallback error_note = f"\n\nNote: Failed to generate follow-up query due to LLM error: {str(e)}" state.running_summary += error_note # Fallback: Generate a simple follow-up query based on research topic fallback_queries = [ f"latest developments in {state.research_topic}", f"important aspects of {state.research_topic}", f"key information about {state.research_topic}", f"Tell me more about {state.research_topic}" ] import random return {"search_query": random.choice(fallback_queries)} def finalize_summary(state: SummaryState): """ Finalize the summary """ # Format all accumulated sources into a single bulleted list all_sources = "\n".join(source for source in state.sources_gathered) state.running_summary = f"## Summary\n\n{state.running_summary}\n\n ### Sources:\n{all_sources}" return {"running_summary": state.running_summary} def route_research(state: SummaryState, config: RunnableConfig) -> Literal["finalize_summary", "web_research"]: """ Route the research based on the follow-up query """ configurable = Configuration.from_runnable_config(config) if state.research_loop_count <= configurable.max_web_research_loops: return "web_research" else: return "finalize_summary" # Add nodes and edges builder = StateGraph(SummaryState, input=SummaryStateInput, output=SummaryStateOutput, config_schema=Configuration) # Add nodes with tracing builder.add_node("generate_query", generate_query) builder.add_node("web_research", web_research) builder.add_node("summarize_sources", summarize_sources) builder.add_node("reflect_on_summary", reflect_on_summary) builder.add_node("finalize_summary", finalize_summary) # Add edges builder.add_edge(START, "generate_query") builder.add_edge("generate_query", "web_research") builder.add_edge("web_research", "summarize_sources") builder.add_edge("summarize_sources", "reflect_on_summary") builder.add_conditional_edges("reflect_on_summary", route_research) builder.add_edge("finalize_summary", END) # Compile the graph graph = builder.compile()

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cam10001110101/mcp-server-ollama-deep-researcher'

If you have feedback or need assistance with the MCP directory API, please join our Discord server