Skip to main content
Glama
decorators.py4.21 kB
""" Copyright 2024, Zep Software, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import functools import inspect from collections.abc import Awaitable, Callable from typing import Any, TypeVar from graphiti_core.driver.driver import GraphProvider from graphiti_core.helpers import semaphore_gather from graphiti_core.search.search_config import SearchResults F = TypeVar('F', bound=Callable[..., Awaitable[Any]]) def handle_multiple_group_ids(func: F) -> F: """ Decorator for FalkorDB methods that need to handle multiple group_ids. Runs the function for each group_id separately and merges results. """ @functools.wraps(func) async def wrapper(self, *args, **kwargs): group_ids_func_pos = get_parameter_position(func, 'group_ids') group_ids_pos = ( group_ids_func_pos - 1 if group_ids_func_pos is not None else None ) # Adjust for zero-based index group_ids = kwargs.get('group_ids') # If not in kwargs and position exists, get from args if group_ids is None and group_ids_pos is not None and len(args) > group_ids_pos: group_ids = args[group_ids_pos] # Only handle FalkorDB with multiple group_ids if ( hasattr(self, 'clients') and hasattr(self.clients, 'driver') and self.clients.driver.provider == GraphProvider.FALKORDB and group_ids and len(group_ids) > 1 ): # Execute for each group_id concurrently driver = self.clients.driver async def execute_for_group(gid: str): # Remove group_ids from args if it was passed positionally filtered_args = list(args) if group_ids_pos is not None and len(args) > group_ids_pos: filtered_args.pop(group_ids_pos) return await func( self, *filtered_args, **{**kwargs, 'group_ids': [gid], 'driver': driver.clone(database=gid)}, ) results = await semaphore_gather( *[execute_for_group(gid) for gid in group_ids], max_coroutines=getattr(self, 'max_coroutines', None), ) # Merge results based on type if isinstance(results[0], SearchResults): return SearchResults.merge(results) elif isinstance(results[0], list): return [item for result in results for item in result] elif isinstance(results[0], tuple): # Handle tuple outputs (like build_communities returning (nodes, edges)) merged_tuple = [] for i in range(len(results[0])): component_results = [result[i] for result in results] if isinstance(component_results[0], list): merged_tuple.append( [item for component in component_results for item in component] ) else: merged_tuple.append(component_results) return tuple(merged_tuple) else: return results # Normal execution return await func(self, *args, **kwargs) return wrapper # type: ignore def get_parameter_position(func: Callable, param_name: str) -> int | None: """ Returns the positional index of a parameter in the function signature. If the parameter is not found, returns None. """ sig = inspect.signature(func) for idx, (name, _param) in enumerate(sig.parameters.items()): if name == param_name: return idx return None

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/getzep/graphiti'

If you have feedback or need assistance with the MCP directory API, please join our Discord server