Skip to main content
Glama
Cam10001110101

mcp-server-ollama-deep-researcher

research

Perform in-depth topic research by combining web search results with LLM synthesis to generate comprehensive insights locally via Ollama.

Instructions

Research a topic using web search and LLM synthesis

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
topicYesThe topic to research

Implementation Reference

  • src/index.ts:96-154 (registration)
    Registers the available MCP tools including the 'research' tool with its input schema
    server.setRequestHandler(ListToolsRequestSchema, async () => {
      const tools = [
        {
          name: "research",
          description: "Research a topic using web search and LLM synthesis",
          inputSchema: {
            type: "object",
            properties: {
              topic: {
                type: "string",
                description: "The topic to research"
              }
            },
            required: ["topic"],
          },
        },
        {
          name: "get_status",
          description: "Get the current status of any ongoing research",
          inputSchema: {
            type: "object",
            properties: {
              _dummy: {
                type: "string",
                description: "No parameters needed",
                const: "dummy"
              }
            },
            required: ["_dummy"],
            additionalProperties: false
          } as const,
        },
        {
          name: "configure",
          description: "Configure the research parameters (max loops, LLM model, search API)",
          inputSchema: {
            type: "object",
            properties: {
              maxLoops: {
                type: "number",
                description: "Maximum number of research loops (1-10)"
              },
              llmModel: {
                type: "string",
                description: "Ollama model to use (e.g. llama3.2)"
              },
              searchApi: {
                type: "string",
                enum: ["perplexity", "tavily", "exa"],
                description: "Search API to use for web research"
              }
            },
            required: [],
          },
        },
      ];
    
      return { tools };
    });
  • MCP CallTool handler for 'research': validates params, spawns run_research.py subprocess with config, captures JSON summary output
        case "research": {
          const topic = request.params.arguments?.topic as string | undefined;
          if (!topic) {
            return {
              content: [
                {
                  type: "text",
                  text: "Research topic is required",
                },
              ],
              isError: true,
            };
          }
    
          try {
            // Validate API keys before starting research
            validateApiKeys(config.searchApi);
    
            // Validate max loops
            if (config.maxLoops < 1 || config.maxLoops > 10) {
              throw new Error("maxLoops must be between 1 and 10");
            }
    
            // Use UV to run Python with the correct virtual environment
            const uvPath = "uv";
    
            // Get absolute path to Python script from src directory
    const scriptPath = join(__dirname, "..", "src", "assistant", "run_research.py").replace(/\\/g, "/");
    
            // Run the research script with arguments using uv run
            const pythonProcess: ChildProcess = spawn(uvPath, [
              "run",
              "python",
              scriptPath,
              topic,
              config.maxLoops.toString(),
              config.llmModel,
              config.searchApi
            ], {
              env: {
                ...process.env,  // Pass through existing environment variables
                PYTHONUNBUFFERED: "1",  // Ensure Python output is not buffered
                PYTHONPATH: join(__dirname, "..", "src").replace(/\\/g, "/"),  // Add src directory to Python path
                TAVILY_API_KEY: process.env.TAVILY_API_KEY || "",  // Ensure API key is passed to Python process
                PERPLEXITY_API_KEY: process.env.PERPLEXITY_API_KEY || "",  // Ensure API key is passed to Python process
                EXA_API_KEY: process.env.EXA_API_KEY || ""  // Ensure API key is passed to Python process
              },
              cwd: join(__dirname, "..").replace(/\\/g, "/")  // Set working directory to project root
            });
    
            // Collect output using Promise with 5 minute timeout
            const output = await new Promise<string>((resolve, reject) => {
              const timeout = setTimeout(() => {
                pythonProcess.kill();
                reject(new Error('Research process timed out after 5 minutes'));
              }, 300000); // 5 minutes
              let stdout = '';
              let stderr = '';
    
              if (pythonProcess.stdout) {
                pythonProcess.stdout.on("data", (data: Buffer) => {
                  const output = data.toString().trim();
                  if (output) {
                    stdout += output;
                    console.error(`[research] ${output}`);
                  }
                });
              }
              if (pythonProcess.stderr) {
                pythonProcess.stderr.on("data", (data: Buffer) => {
                  const error = data.toString().trim();
                  if (error) {
                    stderr += error;
                    console.error(`[research error] ${error}`);
                  }
                });
              }
    
              pythonProcess.on("error", (error: Error) => {
                reject(new Error(`Failed to start Python process: ${error.message}`));
              });
    
              pythonProcess.on("close", (code: number) => {
                clearTimeout(timeout);
                if (code !== 0) {
                  reject(new Error(`Python process exited with code ${code}. Error: ${stderr}`));
                  return;
                }
    
                try {
                  const result = JSON.parse(stdout.trim()) as PythonResponse;
                  if (result.error) {
                    reject(new Error(result.error));
                  } else if (result.summary) {
                    resolve(result.summary);
                  } else {
                    resolve('No summary available');
                  }
                } catch (e) {
                  reject(new Error(`Failed to parse Python output: ${e}\nStdout: ${stdout}\nStderr: ${stderr}`));
                }
              });
            });
    
            // Store completed research result
            const result: ResearchResult = {
              topic,
              summary: output,
              sources: [],
              timestamp: new Date().toISOString()
            };
            researchResults.set(topicToUri(topic), result);
    
            // Update research state
            currentResearch = {
              topic,
              currentStep: "completed",
              loopCount: config.maxLoops,
              summary: output,
              sources: []
            };
    
            return {
              content: [
                {
                  type: "text",
                  text: `Research completed. Summary:\n\n${output}`,
                },
              ],
            };
          } catch (error) {
            const errorMessage = error instanceof Error ? error.message : String(error);
            return {
              content: [
                {
                  type: "text",
                  text: `Research failed: ${errorMessage}`,
                },
              ],
              isError: true,
            };
          }
        }
  • Executes the research by invoking the LangGraph with topic and configurable parameters, outputs JSON summary
    def main():
        try:
            topic = sys.argv[1]
            max_loops = int(sys.argv[2])
            llm_model = sys.argv[3]
            search_api = sys.argv[4]
    
            result = graph.invoke(
                {'research_topic': topic}, 
                {'configurable': {
                    'max_web_research_loops': max_loops, 
                    'local_llm': llm_model, 
                    'search_api': search_api
                }}
            )
            # Ensure we're writing to stderr for logs and stdout for JSON only
            print(json.dumps({'summary': result.get('running_summary', 'No summary available')}), flush=True)
        except Exception as e:
            # Try to extract any partial results from the graph state
            try:
                partial_result = graph.get_state()
                summary = partial_result.get('running_summary', '')
                if summary:
                    print(json.dumps({
                        'summary': f"{summary}\n\nNote: Research process ended early due to error: {str(e)}",
                        'error': str(e)
                    }), flush=True)
                    return
            except:
                pass
            # If we couldn't get partial results, just return the error
            print(json.dumps({'error': str(e)}), flush=True)
    
    if __name__ == '__main__':
        main()
  • Compiles the LangGraph state machine orchestrating query generation, web research, summarization, reflection, and finalization for the research tool
    builder = StateGraph(SummaryState, input=SummaryStateInput, output=SummaryStateOutput, config_schema=Configuration)
    
    # Add nodes with tracing
    builder.add_node("generate_query", generate_query)
    builder.add_node("web_research", web_research)
    builder.add_node("summarize_sources", summarize_sources)
    builder.add_node("reflect_on_summary", reflect_on_summary)
    builder.add_node("finalize_summary", finalize_summary)
    
    # Add edges
    builder.add_edge(START, "generate_query")
    builder.add_edge("generate_query", "web_research")
    builder.add_edge("web_research", "summarize_sources")
    builder.add_edge("summarize_sources", "reflect_on_summary")
    builder.add_conditional_edges("reflect_on_summary", route_research)
    builder.add_edge("finalize_summary", END)
    
    # Compile the graph
    graph = builder.compile()
  • Graph node functions implementing the iterative research loop: query gen, search, summarize, reflect, route
    # Nodes   
    def generate_query(state: SummaryState, config: RunnableConfig):
        """ Generate a query for web search """
        
        # Add timeout to config
        if not config:
            config = {}
        if "configurable" not in config:
            config["configurable"] = {}
        config["configurable"]["timeout"] = 300  # 5 minutes timeout
        
        configurable = Configuration.from_runnable_config(config)
        
        # Enable tracing if configured
        if configurable.langsmith_tracing and configurable.langsmith_api_key:
            os.environ["LANGCHAIN_TRACING_V2"] = "true"
            os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
            os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
            os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
        
        try:
            # Format the prompt
            query_writer_instructions_formatted = query_writer_instructions.format(research_topic=state.research_topic)
    
            # Generate a query
            configurable = Configuration.from_runnable_config(config)
            llm_json_mode = ChatOllama(
                model=configurable.local_llm,
                temperature=0,
                format="json",
                base_url=configurable.ollama_base_url
            )
            result = llm_json_mode.invoke(
                [SystemMessage(content=query_writer_instructions_formatted),
                HumanMessage(content=f"Generate a query for web search:")]
            )   
            query = json.loads(result.content)
            
            return {"search_query": query['query']}
        except Exception as e:
            # If LLM fails, use the research topic as the query
            return {"search_query": state.research_topic}
    
    def web_research(state: SummaryState, config: RunnableConfig):
        """ Gather information from the web """
        
        # Add timeout to config
        if not config:
            config = {}
        if "configurable" not in config:
            config["configurable"] = {}
        config["configurable"]["timeout"] = 300  # 5 minutes timeout
        
        # Configure 
        configurable = Configuration.from_runnable_config(config)
        
        # Enable tracing if configured
        if configurable.langsmith_tracing and configurable.langsmith_api_key:
            os.environ["LANGCHAIN_TRACING_V2"] = "true"
            os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
            os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
            os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
    
        try:
            # Search the web
            if configurable.search_api == SearchAPI.TAVILY:
                search_results = tavily_search(state.search_query, include_raw_content=True, max_results=1)
                search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=True)
            elif configurable.search_api == SearchAPI.PERPLEXITY:
                search_results = perplexity_search(state.search_query, state.research_loop_count)
                search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=False)
            elif configurable.search_api == SearchAPI.EXA:
                search_results = exa_search(state.search_query, max_results=3)
                search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=True)
            else:
                raise ValueError(f"Unsupported search API: {configurable.search_api}")
        except Exception as e:
            # If we have a running summary, continue with a note about the error
            if state.running_summary:
                error_note = f"\n\nNote: Search failed during research loop {state.research_loop_count + 1} using {configurable.search_api.value} API. Error: {str(e)}"
                return {
                    "sources_gathered": state.sources_gathered + [f"[Search failed in loop {state.research_loop_count + 1}]"],
                    "research_loop_count": state.research_loop_count + 1,
                    "web_research_results": state.web_research_results + [error_note],
                    "running_summary": state.running_summary + error_note
                }
            # If this is the first search and it failed, raise the error
            raise
            
        return {
            "sources_gathered": [format_sources(search_results)], 
            "research_loop_count": state.research_loop_count + 1, 
            "web_research_results": [search_str]
        }
    
    def summarize_sources(state: SummaryState, config: RunnableConfig):
        """ Summarize the gathered sources """
        
        # Add timeout to config
        if not config:
            config = {}
        if "configurable" not in config:
            config["configurable"] = {}
        config["configurable"]["timeout"] = 300  # 5 minutes timeout
        
        configurable = Configuration.from_runnable_config(config)
        
        # Enable tracing if configured
        if configurable.langsmith_tracing and configurable.langsmith_api_key:
            os.environ["LANGCHAIN_TRACING_V2"] = "true"
            os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
            os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
            os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
        
        try:
            # Existing summary
            existing_summary = state.running_summary
    
            # Most recent web research
            most_recent_web_research = state.web_research_results[-1]
    
            # Build the human message
            if existing_summary:
                human_message_content = (
                    f"<User Input> \n {state.research_topic} \n <User Input>\n\n"
                    f"<Existing Summary> \n {existing_summary} \n <Existing Summary>\n\n"
                    f"<New Search Results> \n {most_recent_web_research} \n <New Search Results>"
                )
            else:
                human_message_content = (
                    f"<User Input> \n {state.research_topic} \n <User Input>\n\n"
                    f"<Search Results> \n {most_recent_web_research} \n <Search Results>"
                )
    
            # Run the LLM
            configurable = Configuration.from_runnable_config(config)
            llm = ChatOllama(
                model=configurable.local_llm,
                temperature=0,
                base_url=configurable.ollama_base_url
            )
            result = llm.invoke(
                [SystemMessage(content=summarizer_instructions),
                HumanMessage(content=human_message_content)]
            )
    
            running_summary = result.content
    
            # TODO: This is a hack to remove the <think> tags w/ Deepseek models 
            # It appears very challenging to prompt them out of the responses 
            while "<think>" in running_summary and "</think>" in running_summary:
                start = running_summary.find("<think>")
                end = running_summary.find("</think>") + len("</think>")
                running_summary = running_summary[:start] + running_summary[end:]
    
            return {"running_summary": running_summary}
        except Exception as e:
            # If LLM fails but we have existing summary, preserve it with error note
            if existing_summary:
                error_note = f"\n\nNote: Failed to summarize new sources due to LLM error: {str(e)}"
                return {"running_summary": existing_summary + error_note}
            # If this is the first summary and LLM failed, return raw search results
            return {"running_summary": f"Research on: {state.research_topic}\n\nRaw search results:\n{most_recent_web_research}"}
    
    def reflect_on_summary(state: SummaryState, config: RunnableConfig):
        """ Reflect on the summary and generate a follow-up query """
    
        # Add timeout to config
        if not config:
            config = {}
        if "configurable" not in config:
            config["configurable"] = {}
        config["configurable"]["timeout"] = 300  # 5 minutes timeout
        
        configurable = Configuration.from_runnable_config(config)
        
        # Enable tracing if configured
        if configurable.langsmith_tracing and configurable.langsmith_api_key:
            os.environ["LANGCHAIN_TRACING_V2"] = "true"
            os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
            os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
            os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
        
        try:
            # Generate a query
            configurable = Configuration.from_runnable_config(config)
            llm_json_mode = ChatOllama(
                model=configurable.local_llm,
                temperature=0,
                format="json",
                base_url=configurable.ollama_base_url
            )
            result = llm_json_mode.invoke(
                [SystemMessage(content=reflection_instructions.format(research_topic=state.research_topic)),
                HumanMessage(content=f"Identify a knowledge gap and generate a follow-up web search query based on our existing knowledge: {state.running_summary}")]
            )   
            
            try:
                follow_up_query = json.loads(result.content)
                query = follow_up_query.get('follow_up_query')
                if query:
                    return {"search_query": query}
            except (json.JSONDecodeError, AttributeError):
                pass  # Fall through to fallback
                
        except Exception as e:
            # Add error note to summary before falling through to fallback
            error_note = f"\n\nNote: Failed to generate follow-up query due to LLM error: {str(e)}"
            state.running_summary += error_note
    
        # Fallback: Generate a simple follow-up query based on research topic
        fallback_queries = [
            f"latest developments in {state.research_topic}",
            f"important aspects of {state.research_topic}",
            f"key information about {state.research_topic}",
            f"Tell me more about {state.research_topic}"
        ]
        import random
        return {"search_query": random.choice(fallback_queries)}
    
    def finalize_summary(state: SummaryState):
        """ Finalize the summary """
        
        # Format all accumulated sources into a single bulleted list
        all_sources = "\n".join(source for source in state.sources_gathered)
        state.running_summary = f"## Summary\n\n{state.running_summary}\n\n ### Sources:\n{all_sources}"
        return {"running_summary": state.running_summary}
    
    def route_research(state: SummaryState, config: RunnableConfig) -> Literal["finalize_summary", "web_research"]:
        """ Route the research based on the follow-up query """
    
        configurable = Configuration.from_runnable_config(config)
        if state.research_loop_count <= configurable.max_web_research_loops:
            return "web_research"
        else:
            return "finalize_summary" 
        
    # Add nodes and edges 

Tool Definition Quality

Score is being calculated. Check back soon.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cam10001110101/mcp-server-ollama-deep-researcher'

If you have feedback or need assistance with the MCP directory API, please join our Discord server