Skip to main content
Glama
Cam10001110101

mcp-server-ollama-deep-researcher

research

Perform in-depth topic research by combining web search results with LLM synthesis to generate comprehensive insights locally via Ollama.

Instructions

Research a topic using web search and LLM synthesis

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
topicYesThe topic to research

Implementation Reference

  • src/index.ts:96-154 (registration)
    Registers the available MCP tools including the 'research' tool with its input schema
    server.setRequestHandler(ListToolsRequestSchema, async () => {
      const tools = [
        {
          name: "research",
          description: "Research a topic using web search and LLM synthesis",
          inputSchema: {
            type: "object",
            properties: {
              topic: {
                type: "string",
                description: "The topic to research"
              }
            },
            required: ["topic"],
          },
        },
        {
          name: "get_status",
          description: "Get the current status of any ongoing research",
          inputSchema: {
            type: "object",
            properties: {
              _dummy: {
                type: "string",
                description: "No parameters needed",
                const: "dummy"
              }
            },
            required: ["_dummy"],
            additionalProperties: false
          } as const,
        },
        {
          name: "configure",
          description: "Configure the research parameters (max loops, LLM model, search API)",
          inputSchema: {
            type: "object",
            properties: {
              maxLoops: {
                type: "number",
                description: "Maximum number of research loops (1-10)"
              },
              llmModel: {
                type: "string",
                description: "Ollama model to use (e.g. llama3.2)"
              },
              searchApi: {
                type: "string",
                enum: ["perplexity", "tavily", "exa"],
                description: "Search API to use for web research"
              }
            },
            required: [],
          },
        },
      ];
    
      return { tools };
    });
  • MCP CallTool handler for 'research': validates params, spawns run_research.py subprocess with config, captures JSON summary output
        case "research": {
          const topic = request.params.arguments?.topic as string | undefined;
          if (!topic) {
            return {
              content: [
                {
                  type: "text",
                  text: "Research topic is required",
                },
              ],
              isError: true,
            };
          }
    
          try {
            // Validate API keys before starting research
            validateApiKeys(config.searchApi);
    
            // Validate max loops
            if (config.maxLoops < 1 || config.maxLoops > 10) {
              throw new Error("maxLoops must be between 1 and 10");
            }
    
            // Use UV to run Python with the correct virtual environment
            const uvPath = "uv";
    
            // Get absolute path to Python script from src directory
    const scriptPath = join(__dirname, "..", "src", "assistant", "run_research.py").replace(/\\/g, "/");
    
            // Run the research script with arguments using uv run
            const pythonProcess: ChildProcess = spawn(uvPath, [
              "run",
              "python",
              scriptPath,
              topic,
              config.maxLoops.toString(),
              config.llmModel,
              config.searchApi
            ], {
              env: {
                ...process.env,  // Pass through existing environment variables
                PYTHONUNBUFFERED: "1",  // Ensure Python output is not buffered
                PYTHONPATH: join(__dirname, "..", "src").replace(/\\/g, "/"),  // Add src directory to Python path
                TAVILY_API_KEY: process.env.TAVILY_API_KEY || "",  // Ensure API key is passed to Python process
                PERPLEXITY_API_KEY: process.env.PERPLEXITY_API_KEY || "",  // Ensure API key is passed to Python process
                EXA_API_KEY: process.env.EXA_API_KEY || ""  // Ensure API key is passed to Python process
              },
              cwd: join(__dirname, "..").replace(/\\/g, "/")  // Set working directory to project root
            });
    
            // Collect output using Promise with 5 minute timeout
            const output = await new Promise<string>((resolve, reject) => {
              const timeout = setTimeout(() => {
                pythonProcess.kill();
                reject(new Error('Research process timed out after 5 minutes'));
              }, 300000); // 5 minutes
              let stdout = '';
              let stderr = '';
    
              if (pythonProcess.stdout) {
                pythonProcess.stdout.on("data", (data: Buffer) => {
                  const output = data.toString().trim();
                  if (output) {
                    stdout += output;
                    console.error(`[research] ${output}`);
                  }
                });
              }
              if (pythonProcess.stderr) {
                pythonProcess.stderr.on("data", (data: Buffer) => {
                  const error = data.toString().trim();
                  if (error) {
                    stderr += error;
                    console.error(`[research error] ${error}`);
                  }
                });
              }
    
              pythonProcess.on("error", (error: Error) => {
                reject(new Error(`Failed to start Python process: ${error.message}`));
              });
    
              pythonProcess.on("close", (code: number) => {
                clearTimeout(timeout);
                if (code !== 0) {
                  reject(new Error(`Python process exited with code ${code}. Error: ${stderr}`));
                  return;
                }
    
                try {
                  const result = JSON.parse(stdout.trim()) as PythonResponse;
                  if (result.error) {
                    reject(new Error(result.error));
                  } else if (result.summary) {
                    resolve(result.summary);
                  } else {
                    resolve('No summary available');
                  }
                } catch (e) {
                  reject(new Error(`Failed to parse Python output: ${e}\nStdout: ${stdout}\nStderr: ${stderr}`));
                }
              });
            });
    
            // Store completed research result
            const result: ResearchResult = {
              topic,
              summary: output,
              sources: [],
              timestamp: new Date().toISOString()
            };
            researchResults.set(topicToUri(topic), result);
    
            // Update research state
            currentResearch = {
              topic,
              currentStep: "completed",
              loopCount: config.maxLoops,
              summary: output,
              sources: []
            };
    
            return {
              content: [
                {
                  type: "text",
                  text: `Research completed. Summary:\n\n${output}`,
                },
              ],
            };
          } catch (error) {
            const errorMessage = error instanceof Error ? error.message : String(error);
            return {
              content: [
                {
                  type: "text",
                  text: `Research failed: ${errorMessage}`,
                },
              ],
              isError: true,
            };
          }
        }
  • Executes the research by invoking the LangGraph with topic and configurable parameters, outputs JSON summary
    def main():
        try:
            topic = sys.argv[1]
            max_loops = int(sys.argv[2])
            llm_model = sys.argv[3]
            search_api = sys.argv[4]
    
            result = graph.invoke(
                {'research_topic': topic}, 
                {'configurable': {
                    'max_web_research_loops': max_loops, 
                    'local_llm': llm_model, 
                    'search_api': search_api
                }}
            )
            # Ensure we're writing to stderr for logs and stdout for JSON only
            print(json.dumps({'summary': result.get('running_summary', 'No summary available')}), flush=True)
        except Exception as e:
            # Try to extract any partial results from the graph state
            try:
                partial_result = graph.get_state()
                summary = partial_result.get('running_summary', '')
                if summary:
                    print(json.dumps({
                        'summary': f"{summary}\n\nNote: Research process ended early due to error: {str(e)}",
                        'error': str(e)
                    }), flush=True)
                    return
            except:
                pass
            # If we couldn't get partial results, just return the error
            print(json.dumps({'error': str(e)}), flush=True)
    
    if __name__ == '__main__':
        main()
  • Compiles the LangGraph state machine orchestrating query generation, web research, summarization, reflection, and finalization for the research tool
    builder = StateGraph(SummaryState, input=SummaryStateInput, output=SummaryStateOutput, config_schema=Configuration)
    
    # Add nodes with tracing
    builder.add_node("generate_query", generate_query)
    builder.add_node("web_research", web_research)
    builder.add_node("summarize_sources", summarize_sources)
    builder.add_node("reflect_on_summary", reflect_on_summary)
    builder.add_node("finalize_summary", finalize_summary)
    
    # Add edges
    builder.add_edge(START, "generate_query")
    builder.add_edge("generate_query", "web_research")
    builder.add_edge("web_research", "summarize_sources")
    builder.add_edge("summarize_sources", "reflect_on_summary")
    builder.add_conditional_edges("reflect_on_summary", route_research)
    builder.add_edge("finalize_summary", END)
    
    # Compile the graph
    graph = builder.compile()
  • Graph node functions implementing the iterative research loop: query gen, search, summarize, reflect, route
    # Nodes   
    def generate_query(state: SummaryState, config: RunnableConfig):
        """ Generate a query for web search """
        
        # Add timeout to config
        if not config:
            config = {}
        if "configurable" not in config:
            config["configurable"] = {}
        config["configurable"]["timeout"] = 300  # 5 minutes timeout
        
        configurable = Configuration.from_runnable_config(config)
        
        # Enable tracing if configured
        if configurable.langsmith_tracing and configurable.langsmith_api_key:
            os.environ["LANGCHAIN_TRACING_V2"] = "true"
            os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
            os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
            os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
        
        try:
            # Format the prompt
            query_writer_instructions_formatted = query_writer_instructions.format(research_topic=state.research_topic)
    
            # Generate a query
            configurable = Configuration.from_runnable_config(config)
            llm_json_mode = ChatOllama(
                model=configurable.local_llm,
                temperature=0,
                format="json",
                base_url=configurable.ollama_base_url
            )
            result = llm_json_mode.invoke(
                [SystemMessage(content=query_writer_instructions_formatted),
                HumanMessage(content=f"Generate a query for web search:")]
            )   
            query = json.loads(result.content)
            
            return {"search_query": query['query']}
        except Exception as e:
            # If LLM fails, use the research topic as the query
            return {"search_query": state.research_topic}
    
    def web_research(state: SummaryState, config: RunnableConfig):
        """ Gather information from the web """
        
        # Add timeout to config
        if not config:
            config = {}
        if "configurable" not in config:
            config["configurable"] = {}
        config["configurable"]["timeout"] = 300  # 5 minutes timeout
        
        # Configure 
        configurable = Configuration.from_runnable_config(config)
        
        # Enable tracing if configured
        if configurable.langsmith_tracing and configurable.langsmith_api_key:
            os.environ["LANGCHAIN_TRACING_V2"] = "true"
            os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
            os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
            os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
    
        try:
            # Search the web
            if configurable.search_api == SearchAPI.TAVILY:
                search_results = tavily_search(state.search_query, include_raw_content=True, max_results=1)
                search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=True)
            elif configurable.search_api == SearchAPI.PERPLEXITY:
                search_results = perplexity_search(state.search_query, state.research_loop_count)
                search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=False)
            elif configurable.search_api == SearchAPI.EXA:
                search_results = exa_search(state.search_query, max_results=3)
                search_str = deduplicate_and_format_sources(search_results, max_tokens_per_source=1000, include_raw_content=True)
            else:
                raise ValueError(f"Unsupported search API: {configurable.search_api}")
        except Exception as e:
            # If we have a running summary, continue with a note about the error
            if state.running_summary:
                error_note = f"\n\nNote: Search failed during research loop {state.research_loop_count + 1} using {configurable.search_api.value} API. Error: {str(e)}"
                return {
                    "sources_gathered": state.sources_gathered + [f"[Search failed in loop {state.research_loop_count + 1}]"],
                    "research_loop_count": state.research_loop_count + 1,
                    "web_research_results": state.web_research_results + [error_note],
                    "running_summary": state.running_summary + error_note
                }
            # If this is the first search and it failed, raise the error
            raise
            
        return {
            "sources_gathered": [format_sources(search_results)], 
            "research_loop_count": state.research_loop_count + 1, 
            "web_research_results": [search_str]
        }
    
    def summarize_sources(state: SummaryState, config: RunnableConfig):
        """ Summarize the gathered sources """
        
        # Add timeout to config
        if not config:
            config = {}
        if "configurable" not in config:
            config["configurable"] = {}
        config["configurable"]["timeout"] = 300  # 5 minutes timeout
        
        configurable = Configuration.from_runnable_config(config)
        
        # Enable tracing if configured
        if configurable.langsmith_tracing and configurable.langsmith_api_key:
            os.environ["LANGCHAIN_TRACING_V2"] = "true"
            os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
            os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
            os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
        
        try:
            # Existing summary
            existing_summary = state.running_summary
    
            # Most recent web research
            most_recent_web_research = state.web_research_results[-1]
    
            # Build the human message
            if existing_summary:
                human_message_content = (
                    f"<User Input> \n {state.research_topic} \n <User Input>\n\n"
                    f"<Existing Summary> \n {existing_summary} \n <Existing Summary>\n\n"
                    f"<New Search Results> \n {most_recent_web_research} \n <New Search Results>"
                )
            else:
                human_message_content = (
                    f"<User Input> \n {state.research_topic} \n <User Input>\n\n"
                    f"<Search Results> \n {most_recent_web_research} \n <Search Results>"
                )
    
            # Run the LLM
            configurable = Configuration.from_runnable_config(config)
            llm = ChatOllama(
                model=configurable.local_llm,
                temperature=0,
                base_url=configurable.ollama_base_url
            )
            result = llm.invoke(
                [SystemMessage(content=summarizer_instructions),
                HumanMessage(content=human_message_content)]
            )
    
            running_summary = result.content
    
            # TODO: This is a hack to remove the <think> tags w/ Deepseek models 
            # It appears very challenging to prompt them out of the responses 
            while "<think>" in running_summary and "</think>" in running_summary:
                start = running_summary.find("<think>")
                end = running_summary.find("</think>") + len("</think>")
                running_summary = running_summary[:start] + running_summary[end:]
    
            return {"running_summary": running_summary}
        except Exception as e:
            # If LLM fails but we have existing summary, preserve it with error note
            if existing_summary:
                error_note = f"\n\nNote: Failed to summarize new sources due to LLM error: {str(e)}"
                return {"running_summary": existing_summary + error_note}
            # If this is the first summary and LLM failed, return raw search results
            return {"running_summary": f"Research on: {state.research_topic}\n\nRaw search results:\n{most_recent_web_research}"}
    
    def reflect_on_summary(state: SummaryState, config: RunnableConfig):
        """ Reflect on the summary and generate a follow-up query """
    
        # Add timeout to config
        if not config:
            config = {}
        if "configurable" not in config:
            config["configurable"] = {}
        config["configurable"]["timeout"] = 300  # 5 minutes timeout
        
        configurable = Configuration.from_runnable_config(config)
        
        # Enable tracing if configured
        if configurable.langsmith_tracing and configurable.langsmith_api_key:
            os.environ["LANGCHAIN_TRACING_V2"] = "true"
            os.environ["LANGCHAIN_ENDPOINT"] = configurable.langsmith_endpoint
            os.environ["LANGCHAIN_API_KEY"] = configurable.langsmith_api_key
            os.environ["LANGCHAIN_PROJECT"] = configurable.langsmith_project
        
        try:
            # Generate a query
            configurable = Configuration.from_runnable_config(config)
            llm_json_mode = ChatOllama(
                model=configurable.local_llm,
                temperature=0,
                format="json",
                base_url=configurable.ollama_base_url
            )
            result = llm_json_mode.invoke(
                [SystemMessage(content=reflection_instructions.format(research_topic=state.research_topic)),
                HumanMessage(content=f"Identify a knowledge gap and generate a follow-up web search query based on our existing knowledge: {state.running_summary}")]
            )   
            
            try:
                follow_up_query = json.loads(result.content)
                query = follow_up_query.get('follow_up_query')
                if query:
                    return {"search_query": query}
            except (json.JSONDecodeError, AttributeError):
                pass  # Fall through to fallback
                
        except Exception as e:
            # Add error note to summary before falling through to fallback
            error_note = f"\n\nNote: Failed to generate follow-up query due to LLM error: {str(e)}"
            state.running_summary += error_note
    
        # Fallback: Generate a simple follow-up query based on research topic
        fallback_queries = [
            f"latest developments in {state.research_topic}",
            f"important aspects of {state.research_topic}",
            f"key information about {state.research_topic}",
            f"Tell me more about {state.research_topic}"
        ]
        import random
        return {"search_query": random.choice(fallback_queries)}
    
    def finalize_summary(state: SummaryState):
        """ Finalize the summary """
        
        # Format all accumulated sources into a single bulleted list
        all_sources = "\n".join(source for source in state.sources_gathered)
        state.running_summary = f"## Summary\n\n{state.running_summary}\n\n ### Sources:\n{all_sources}"
        return {"running_summary": state.running_summary}
    
    def route_research(state: SummaryState, config: RunnableConfig) -> Literal["finalize_summary", "web_research"]:
        """ Route the research based on the follow-up query """
    
        configurable = Configuration.from_runnable_config(config)
        if state.research_loop_count <= configurable.max_web_research_loops:
            return "web_research"
        else:
            return "finalize_summary" 
        
    # Add nodes and edges 
Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cam10001110101/mcp-server-ollama-deep-researcher'

If you have feedback or need assistance with the MCP directory API, please join our Discord server