We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/qinshu1109/memos-MCP'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import time
from datetime import datetime
from typing import Any, Literal
from neo4j import GraphDatabase
from memos.configs.graph_db import Neo4jGraphDBConfig
from memos.graph_dbs.base import BaseGraphDB
from memos.log import get_logger
logger = get_logger(__name__)
def _parse_node(node_data: dict[str, Any]) -> dict[str, Any]:
node = node_data.copy()
# Convert Neo4j datetime to string
for time_field in ("created_at", "updated_at"):
if time_field in node and hasattr(node[time_field], "isoformat"):
node[time_field] = node[time_field].isoformat()
return {"id": node.pop("id"), "memory": node.pop("memory", ""), "metadata": node}
def _compose_node(item: dict[str, Any]) -> tuple[str, str, dict[str, Any]]:
node_id = item["id"]
memory = item["memory"]
metadata = item.get("metadata", {})
return node_id, memory, metadata
def _prepare_node_metadata(metadata: dict[str, Any]) -> dict[str, Any]:
"""
Ensure metadata has proper datetime fields and normalized types.
- Fill `created_at` and `updated_at` if missing (in ISO 8601 format).
- Convert embedding to list of float if present.
"""
now = datetime.utcnow().isoformat()
# Fill timestamps if missing
metadata.setdefault("created_at", now)
metadata.setdefault("updated_at", now)
# Normalize embedding type
embedding = metadata.get("embedding")
if embedding and isinstance(embedding, list):
metadata["embedding"] = [float(x) for x in embedding]
return metadata
class Neo4jGraphDB(BaseGraphDB):
"""Neo4j-based implementation of a graph memory store."""
def __init__(self, config: Neo4jGraphDBConfig):
self.config = config
self.driver = GraphDatabase.driver(config.uri, auth=(config.user, config.password))
self.db_name = config.db_name
if config.auto_create:
self._ensure_database_exists()
# Create only if not exists
self.create_index(dimensions=config.embedding_dimension)
def create_index(
self,
label: str = "Memory",
vector_property: str = "embedding",
dimensions: int = 1536,
index_name: str = "memory_vector_index",
) -> None:
"""
Create the vector index for embedding and datetime indexes for created_at and updated_at fields.
"""
# Create vector index if it doesn't exist
if not self._vector_index_exists(index_name):
self._create_vector_index(label, vector_property, dimensions, index_name)
# Create indexes
self._create_basic_property_indexes()
def get_memory_count(self, memory_type: str) -> int:
query = """
MATCH (n:Memory)
WHERE n.memory_type = $memory_type
RETURN COUNT(n) AS count
"""
with self.driver.session(database=self.db_name) as session:
result = session.run(query, memory_type=memory_type)
return result.single()["count"]
def count_nodes(self, scope: str) -> int:
query = """
MATCH (n:Memory)
WHERE n.memory_type = $scope
RETURN count(n) AS count
"""
with self.driver.session(database=self.db_name) as session:
result = session.run(query, {"scope": scope}).single()
return result["count"]
def remove_oldest_memory(self, memory_type: str, keep_latest: int) -> None:
"""
Remove all WorkingMemory nodes except the latest `keep_latest` entries.
Args:
memory_type (str): Memory type (e.g., 'WorkingMemory', 'LongTermMemory').
keep_latest (int): Number of latest WorkingMemory entries to keep.
"""
query = f"""
MATCH (n:Memory)
WHERE n.memory_type = '{memory_type}'
WITH n ORDER BY n.updated_at DESC
SKIP {keep_latest}
DETACH DELETE n
"""
with self.driver.session(database=self.db_name) as session:
session.run(query)
def add_node(self, id: str, memory: str, metadata: dict[str, Any]) -> None:
# Safely process metadata
metadata = _prepare_node_metadata(metadata)
# Merge node and set metadata
created_at = metadata.pop("created_at")
updated_at = metadata.pop("updated_at")
query = """
MERGE (n:Memory {id: $id})
SET n.memory = $memory,
n.created_at = datetime($created_at),
n.updated_at = datetime($updated_at),
n += $metadata
"""
with self.driver.session(database=self.db_name) as session:
session.run(
query,
id=id,
memory=memory,
created_at=created_at,
updated_at=updated_at,
metadata=metadata,
)
def update_node(self, id: str, fields: dict[str, Any]) -> None:
"""
Update node fields in Neo4j, auto-converting `created_at` and `updated_at` to datetime type if present.
"""
fields = fields.copy() # Avoid mutating external dict
set_clauses = []
params = {"id": id, "fields": fields}
for time_field in ("created_at", "updated_at"):
if time_field in fields:
# Set clause like: n.created_at = datetime($created_at)
set_clauses.append(f"n.{time_field} = datetime(${time_field})")
params[time_field] = fields.pop(time_field)
set_clauses.append("n += $fields") # Merge remaining fields
set_clause_str = ",\n ".join(set_clauses)
query = f"""
MATCH (n:Memory {{id: $id}})
SET {set_clause_str}
"""
with self.driver.session(database=self.db_name) as session:
session.run(query, **params)
def delete_node(self, id: str) -> None:
"""
Delete a node from the graph.
Args:
id: Node identifier to delete.
"""
with self.driver.session(database=self.db_name) as session:
session.run("MATCH (n:Memory {id: $id}) DETACH DELETE n", id=id)
# Edge (Relationship) Management
def add_edge(self, source_id: str, target_id: str, type: str) -> None:
"""
Create an edge from source node to target node.
Args:
source_id: ID of the source node.
target_id: ID of the target node.
type: Relationship type (e.g., 'RELATE_TO', 'PARENT').
"""
with self.driver.session(database=self.db_name) as session:
session.run(
f"""
MATCH (a:Memory {{id: $source_id}})
MATCH (b:Memory {{id: $target_id}})
MERGE (a)-[:{type}]->(b)
""",
{"source_id": source_id, "target_id": target_id},
)
def delete_edge(self, source_id: str, target_id: str, type: str) -> None:
"""
Delete a specific edge between two nodes.
Args:
source_id: ID of the source node.
target_id: ID of the target node.
type: Relationship type to remove.
"""
with self.driver.session(database=self.db_name) as session:
session.run(
f"MATCH (a:Memory {{id: $source}})-[r:{type}]->(b:Memory {{id: $target}})\nDELETE r",
source=source_id,
target=target_id,
)
def edge_exists(
self, source_id: str, target_id: str, type: str = "ANY", direction: str = "OUTGOING"
) -> bool:
"""
Check if an edge exists between two nodes.
Args:
source_id: ID of the source node.
target_id: ID of the target node.
type: Relationship type. Use "ANY" to match any relationship type.
direction: Direction of the edge.
Use "OUTGOING" (default), "INCOMING", or "ANY".
Returns:
True if the edge exists, otherwise False.
"""
# Prepare the relationship pattern
rel = "r" if type == "ANY" else f"r:{type}"
# Prepare the match pattern with direction
if direction == "OUTGOING":
pattern = f"(a:Memory {{id: $source}})-[{rel}]->(b:Memory {{id: $target}})"
elif direction == "INCOMING":
pattern = f"(a:Memory {{id: $source}})<-[{rel}]-(b:Memory {{id: $target}})"
elif direction == "ANY":
pattern = f"(a:Memory {{id: $source}})-[{rel}]-(b:Memory {{id: $target}})"
else:
raise ValueError(
f"Invalid direction: {direction}. Must be 'OUTGOING', 'INCOMING', or 'ANY'."
)
# Run the Cypher query
with self.driver.session(database=self.db_name) as session:
result = session.run(
f"MATCH {pattern} RETURN r",
source=source_id,
target=target_id,
)
return result.single() is not None
# Graph Query & Reasoning
def get_node(self, id: str) -> dict[str, Any] | None:
"""
Retrieve the metadata and memory of a node.
Args:
id: Node identifier.
Returns:
Dictionary of node fields, or None if not found.
"""
with self.driver.session(database=self.db_name) as session:
result = session.run("MATCH (n:Memory {id: $id}) RETURN n", id=id)
record = result.single()
return _parse_node(dict(record["n"])) if record else None
def get_nodes(self, ids: list[str]) -> list[dict[str, Any]]:
"""
Retrieve the metadata and memory of a list of nodes.
Args:
ids: List of Node identifier.
Returns:
list[dict]: Parsed node records containing 'id', 'memory', and 'metadata'.
Notes:
- Assumes all provided IDs are valid and exist.
- Returns empty list if input is empty.
"""
if not ids:
return []
query = "MATCH (n:Memory) WHERE n.id IN $ids RETURN n"
with self.driver.session(database=self.db_name) as session:
results = session.run(query, {"ids": ids})
return [_parse_node(dict(record["n"])) for record in results]
def get_edges(self, id: str, type: str = "ANY", direction: str = "ANY") -> list[dict[str, str]]:
"""
Get edges connected to a node, with optional type and direction filter.
Args:
id: Node ID to retrieve edges for.
type: Relationship type to match, or 'ANY' to match all.
direction: 'OUTGOING', 'INCOMING', or 'ANY'.
Returns:
List of edges:
[
{"from": "source_id", "to": "target_id", "type": "RELATE"},
...
]
"""
# Build relationship type filter
rel_type = "" if type == "ANY" else f":{type}"
# Build Cypher pattern based on direction
if direction == "OUTGOING":
pattern = f"(a:Memory)-[r{rel_type}]->(b:Memory)"
where_clause = "a.id = $id"
elif direction == "INCOMING":
pattern = f"(a:Memory)<-[r{rel_type}]-(b:Memory)"
where_clause = "a.id = $id"
elif direction == "ANY":
pattern = f"(a:Memory)-[r{rel_type}]-(b:Memory)"
where_clause = "a.id = $id OR b.id = $id"
else:
raise ValueError("Invalid direction. Must be 'OUTGOING', 'INCOMING', or 'ANY'.")
query = f"""
MATCH {pattern}
WHERE {where_clause}
RETURN a.id AS from_id, b.id AS to_id, type(r) AS type
"""
with self.driver.session(database=self.db_name) as session:
result = session.run(query, id=id)
edges = []
for record in result:
edges.append(
{"from": record["from_id"], "to": record["to_id"], "type": record["type"]}
)
return edges
def get_neighbors(
self, id: str, type: str, direction: Literal["in", "out", "both"] = "out"
) -> list[str]:
"""
Get connected node IDs in a specific direction and relationship type.
Args:
id: Source node ID.
type: Relationship type.
direction: Edge direction to follow ('out', 'in', or 'both').
Returns:
List of neighboring node IDs.
"""
raise NotImplementedError
def get_neighbors_by_tag(
self,
tags: list[str],
exclude_ids: list[str],
top_k: int = 5,
min_overlap: int = 1,
) -> list[dict[str, Any]]:
"""
Find top-K neighbor nodes with maximum tag overlap.
Args:
tags: The list of tags to match.
exclude_ids: Node IDs to exclude (e.g., local cluster).
top_k: Max number of neighbors to return.
min_overlap: Minimum number of overlapping tags required.
Returns:
List of dicts with node details and overlap count.
"""
query = """
MATCH (n:Memory)
WHERE NOT n.id IN $exclude_ids
AND n.status = 'activated'
AND n.type <> 'reasoning'
AND n.memory_type <> 'WorkingMemory'
WITH n, [tag IN n.tags WHERE tag IN $tags] AS overlap_tags
WHERE size(overlap_tags) >= $min_overlap
RETURN n, size(overlap_tags) AS overlap_count
ORDER BY overlap_count DESC
LIMIT $top_k
"""
params = {
"tags": tags,
"exclude_ids": exclude_ids,
"min_overlap": min_overlap,
"top_k": top_k,
}
with self.driver.session(database=self.db_name) as session:
result = session.run(query, params)
return [_parse_node(dict(record["n"])) for record in result]
def get_children_with_embeddings(self, id: str) -> list[str]:
query = """
MATCH (p:Memory)-[:PARENT]->(c:Memory)
WHERE p.id = $id
RETURN c.id AS id, c.embedding AS embedding, c.memory AS memory
"""
with self.driver.session(database=self.db_name) as session:
return list(session.run(query, id=id))
def get_path(self, source_id: str, target_id: str, max_depth: int = 3) -> list[str]:
"""
Get the path of nodes from source to target within a limited depth.
Args:
source_id: Starting node ID.
target_id: Target node ID.
max_depth: Maximum path length to traverse.
Returns:
Ordered list of node IDs along the path.
"""
raise NotImplementedError
def get_subgraph(
self, center_id: str, depth: int = 2, center_status: str = "activated"
) -> dict[str, Any]:
"""
Retrieve a local subgraph centered at a given node.
Args:
center_id: The ID of the center node.
depth: The hop distance for neighbors.
center_status: Required status for center node.
Returns:
{
"core_node": {...},
"neighbors": [...],
"edges": [...]
}
"""
with self.driver.session(database=self.db_name) as session:
status_clause = f", status: '{center_status}'" if center_status else ""
query = f"""
MATCH (center:Memory {{id: $center_id{status_clause}}})
OPTIONAL MATCH (center)-[r*1..{depth}]-(neighbor:Memory)
WITH collect(DISTINCT center) AS centers,
collect(DISTINCT neighbor) AS neighbors,
collect(DISTINCT r) AS rels
RETURN centers, neighbors, rels
"""
record = session.run(query, {"center_id": center_id}).single()
if not record:
return {"core_node": None, "neighbors": [], "edges": []}
centers = record["centers"]
if not centers or centers[0] is None:
return {"core_node": None, "neighbors": [], "edges": []}
core_node = _parse_node(dict(centers[0]))
neighbors = [_parse_node(dict(n)) for n in record["neighbors"] if n]
edges = []
for rel_chain in record["rels"]:
for rel in rel_chain:
edges.append(
{
"type": rel.type,
"source": rel.start_node["id"],
"target": rel.end_node["id"],
}
)
return {"core_node": core_node, "neighbors": neighbors, "edges": edges}
def get_context_chain(self, id: str, type: str = "FOLLOWS") -> list[str]:
"""
Get the ordered context chain starting from a node, following a relationship type.
Args:
id: Starting node ID.
type: Relationship type to follow (e.g., 'FOLLOWS').
Returns:
List of ordered node IDs in the chain.
"""
raise NotImplementedError
# Search / recall operations
def search_by_embedding(
self,
vector: list[float],
top_k: int = 5,
scope: str | None = None,
status: str | None = None,
threshold: float | None = None,
) -> list[dict]:
"""
Retrieve node IDs based on vector similarity.
Args:
vector (list[float]): The embedding vector representing query semantics.
top_k (int): Number of top similar nodes to retrieve.
scope (str, optional): Memory type filter (e.g., 'WorkingMemory', 'LongTermMemory').
status (str, optional): Node status filter (e.g., 'active', 'archived').
If provided, restricts results to nodes with matching status.
threshold (float, optional): Minimum similarity score threshold (0 ~ 1).
Returns:
list[dict]: A list of dicts with 'id' and 'score', ordered by similarity.
Notes:
- This method uses Neo4j native vector indexing to search for similar nodes.
- If scope is provided, it restricts results to nodes with matching memory_type.
- If 'status' is provided, only nodes with the matching status will be returned.
- If threshold is provided, only results with score >= threshold will be returned.
- Typical use case: restrict to 'status = activated' to avoid
matching archived or merged nodes.
"""
# Build WHERE clause dynamically
where_clauses = []
if scope:
where_clauses.append("node.memory_type = $scope")
if status:
where_clauses.append("node.status = $status")
where_clause = ""
if where_clauses:
where_clause = "WHERE " + " AND ".join(where_clauses)
query = f"""
CALL db.index.vector.queryNodes('memory_vector_index', $k, $embedding)
YIELD node, score
{where_clause}
RETURN node.id AS id, score
"""
parameters = {"embedding": vector, "k": top_k, "scope": scope}
if scope:
parameters["scope"] = scope
if status:
parameters["status"] = status
with self.driver.session(database=self.db_name) as session:
result = session.run(query, parameters)
records = [{"id": record["id"], "score": record["score"]} for record in result]
# Threshold filtering after retrieval
if threshold is not None:
records = [r for r in records if r["score"] >= threshold]
return records
def get_by_metadata(self, filters: list[dict[str, Any]]) -> list[str]:
"""
TODO:
1. ADD logic: "AND" vs "OR"(support logic combination);
2. Support nested conditional expressions;
Retrieve node IDs that match given metadata filters.
Supports exact match.
Args:
filters: List of filter dicts like:
[
{"field": "key", "op": "in", "value": ["A", "B"]},
{"field": "confidence", "op": ">=", "value": 80},
{"field": "tags", "op": "contains", "value": "AI"},
...
]
Returns:
list[str]: Node IDs whose metadata match the filter conditions. (AND logic).
Notes:
- Supports structured querying such as tag/category/importance/time filtering.
- Can be used for faceted recall or prefiltering before embedding rerank.
"""
where_clauses = []
params = {}
for i, f in enumerate(filters):
field = f["field"]
op = f.get("op", "=")
value = f["value"]
param_key = f"val{i}"
# Build WHERE clause
if op == "=":
where_clauses.append(f"n.{field} = ${param_key}")
params[param_key] = value
elif op == "in":
where_clauses.append(f"n.{field} IN ${param_key}")
params[param_key] = value
elif op == "contains":
where_clauses.append(f"ANY(x IN ${param_key} WHERE x IN n.{field})")
params[param_key] = value
elif op == "starts_with":
where_clauses.append(f"n.{field} STARTS WITH ${param_key}")
params[param_key] = value
elif op == "ends_with":
where_clauses.append(f"n.{field} ENDS WITH ${param_key}")
params[param_key] = value
elif op in [">", ">=", "<", "<="]:
where_clauses.append(f"n.{field} {op} ${param_key}")
params[param_key] = value
else:
raise ValueError(f"Unsupported operator: {op}")
where_str = " AND ".join(where_clauses)
query = f"MATCH (n:Memory) WHERE {where_str} RETURN n.id AS id"
with self.driver.session(database=self.db_name) as session:
result = session.run(query, params)
return [record["id"] for record in result]
def get_grouped_counts(
self,
group_fields: list[str],
where_clause: str = "",
params: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""
Count nodes grouped by any fields.
Args:
group_fields (list[str]): Fields to group by, e.g., ["memory_type", "status"]
where_clause (str, optional): Extra WHERE condition. E.g.,
"WHERE n.status = 'activated'"
params (dict, optional): Parameters for WHERE clause.
Returns:
list[dict]: e.g., [{ 'memory_type': 'WorkingMemory', 'status': 'active', 'count': 10 }, ...]
"""
if not group_fields:
raise ValueError("group_fields cannot be empty")
# Force RETURN field AS field to guarantee key match
group_fields_cypher = ", ".join([f"n.{field} AS {field}" for field in group_fields])
query = f"""
MATCH (n:Memory)
{where_clause}
RETURN {group_fields_cypher}, COUNT(n) AS count
"""
with self.driver.session(database=self.db_name) as session:
result = session.run(query, params or {})
return [
{**{field: record[field] for field in group_fields}, "count": record["count"]}
for record in result
]
# Structure Maintenance
def deduplicate_nodes(self) -> None:
"""
Deduplicate redundant or semantically similar nodes.
This typically involves identifying nodes with identical or near-identical memory.
"""
raise NotImplementedError
def detect_conflicts(self) -> list[tuple[str, str]]:
"""
Detect conflicting nodes based on logical or semantic inconsistency.
Returns:
A list of (node_id1, node_id2) tuples that conflict.
"""
raise NotImplementedError
def merge_nodes(self, id1: str, id2: str) -> str:
"""
Merge two similar or duplicate nodes into one.
Args:
id1: First node ID.
id2: Second node ID.
Returns:
ID of the resulting merged node.
"""
raise NotImplementedError
# Utilities
def clear(self) -> None:
"""
Clear the entire graph if the target database exists.
"""
try:
# Step 1: Check if the database exists
with self.driver.session(database="system") as session:
result = session.run("SHOW DATABASES YIELD name RETURN name")
db_names = [record["name"] for record in result]
if self.db_name not in db_names:
logger.info(f"[Skip] Database '{self.db_name}' does not exist.")
return
# Step 2: Clear the graph in that database
with self.driver.session(database=self.db_name) as session:
session.run("MATCH (n) DETACH DELETE n")
logger.info(f"Cleared all nodes from database '{self.db_name}'.")
except Exception as e:
logger.error(f"[ERROR] Failed to clear database '{self.db_name}': {e}")
raise
def export_graph(self) -> dict[str, Any]:
"""
Export all graph nodes and edges in a structured form.
Returns:
{
"nodes": [ { "id": ..., "memory": ..., "metadata": {...} }, ... ],
"edges": [ { "source": ..., "target": ..., "type": ... }, ... ]
}
"""
with self.driver.session(database=self.db_name) as session:
# Export nodes
node_result = session.run("MATCH (n:Memory) RETURN n")
nodes = [_parse_node(dict(record["n"])) for record in node_result]
# Export edges
edge_result = session.run("""
MATCH (a:Memory)-[r]->(b:Memory)
RETURN a.id AS source, b.id AS target, type(r) AS type
""")
edges = [
{"source": record["source"], "target": record["target"], "type": record["type"]}
for record in edge_result
]
return {"nodes": nodes, "edges": edges}
def import_graph(self, data: dict[str, Any]) -> None:
"""
Import the entire graph from a serialized dictionary.
Args:
data: A dictionary containing all nodes and edges to be loaded.
"""
with self.driver.session(database=self.db_name) as session:
for node in data.get("nodes", []):
id, memory, metadata = _compose_node(node)
metadata = _prepare_node_metadata(metadata)
# Merge node and set metadata
created_at = metadata.pop("created_at")
updated_at = metadata.pop("updated_at")
session.run(
"""
MERGE (n:Memory {id: $id})
SET n.memory = $memory,
n.created_at = datetime($created_at),
n.updated_at = datetime($updated_at),
n += $metadata
""",
id=id,
memory=memory,
created_at=created_at,
updated_at=updated_at,
metadata=metadata,
)
for edge in data.get("edges", []):
session.run(
f"""
MATCH (a:Memory {{id: $source_id}})
MATCH (b:Memory {{id: $target_id}})
MERGE (a)-[:{edge["type"]}]->(b)
""",
source_id=edge["source"],
target_id=edge["target"],
)
def get_all_memory_items(self, scope: str) -> list[dict]:
"""
Retrieve all memory items of a specific memory_type.
Args:
scope (str): Must be one of 'WorkingMemory', 'LongTermMemory', or 'UserMemory'.
Returns:
list[dict]: Full list of memory items under this scope.
"""
if scope not in {"WorkingMemory", "LongTermMemory", "UserMemory"}:
raise ValueError(f"Unsupported memory type scope: {scope}")
query = """
MATCH (n:Memory)
WHERE n.memory_type = $scope
RETURN n
"""
with self.driver.session(database=self.db_name) as session:
results = session.run(query, {"scope": scope})
return [_parse_node(dict(record["n"])) for record in results]
def get_structure_optimization_candidates(self, scope: str) -> list[dict]:
"""
Find nodes that are likely candidates for structure optimization:
- Isolated nodes, nodes with empty background, or nodes with exactly one child.
- Plus: the child of any parent node that has exactly one child.
"""
query = """
MATCH (n:Memory)
WHERE n.memory_type = $scope
AND n.status = 'activated'
AND NOT ( (n)-[:PARENT]->() OR ()-[:PARENT]->(n) )
RETURN n.id AS id, n AS node
"""
with self.driver.session(database=self.db_name) as session:
results = session.run(query, {"scope": scope})
return [_parse_node({"id": record["id"], **dict(record["node"])}) for record in results]
def drop_database(self) -> None:
"""
Permanently delete the entire database this instance is using.
WARNING: This operation is destructive and cannot be undone.
"""
if self.db_name in ("system", "neo4j"):
raise ValueError(f"Refusing to drop protected database: {self.db_name}")
with self.driver.session(database="system") as session:
session.run(f"DROP DATABASE {self.db_name} IF EXISTS")
print(f"Database '{self.db_name}' has been dropped.")
def _ensure_database_exists(self):
with self.driver.session(database="system") as session:
session.run(f"CREATE DATABASE $db_name IF NOT EXISTS", db_name=self.db_name)
# Wait until the database is available
for _ in range(10):
with self.driver.session(database="system") as session:
result = session.run(
"SHOW DATABASES YIELD name, currentStatus RETURN name, currentStatus"
)
status_map = {r["name"]: r["currentStatus"] for r in result}
if self.db_name in status_map and status_map[self.db_name] == "online":
return
time.sleep(1)
raise RuntimeError(f"Database {self.db_name} not ready after waiting.")
def _vector_index_exists(self, index_name: str = "memory_vector_index") -> bool:
query = "SHOW INDEXES YIELD name WHERE name = $name RETURN name"
with self.driver.session(database=self.db_name) as session:
result = session.run(query, name=index_name)
return result.single() is not None
def _create_vector_index(
self, label: str, vector_property: str, dimensions: int, index_name: str
) -> None:
"""
Create a vector index for the specified property in the label.
"""
try:
query = f"""
CREATE VECTOR INDEX {index_name} IF NOT EXISTS
FOR (n:{label}) ON (n.{vector_property})
OPTIONS {{
indexConfig: {{
`vector.dimensions`: {dimensions},
`vector.similarity_function`: 'cosine'
}}
}}
"""
with self.driver.session(database=self.db_name) as session:
session.run(query)
logger.debug(f"Vector index '{index_name}' ensured.")
except Exception as e:
logger.warning(f"Failed to create vector index '{index_name}': {e}")
def _create_basic_property_indexes(self) -> None:
"""
Create standard B-tree indexes on memory_type, created_at, and updated_at fields.
"""
try:
with self.driver.session(database=self.db_name) as session:
session.run("""
CREATE INDEX memory_type_index IF NOT EXISTS
FOR (n:Memory) ON (n.memory_type)
""")
logger.debug("Index 'memory_type_index' ensured.")
session.run("""
CREATE INDEX memory_created_at_index IF NOT EXISTS
FOR (n:Memory) ON (n.created_at)
""")
logger.debug("Index 'memory_created_at_index' ensured.")
session.run("""
CREATE INDEX memory_updated_at_index IF NOT EXISTS
FOR (n:Memory) ON (n.updated_at)
""")
logger.debug("Index 'memory_updated_at_index' ensured.")
except Exception as e:
logger.warning(f"Failed to create basic property indexes: {e}")
def _index_exists(self, index_name: str) -> bool:
"""
Check if an index with the given name exists.
"""
query = "SHOW INDEXES"
with self.driver.session(database=self.db_name) as session:
result = session.run(query)
for record in result:
if record["name"] == index_name:
return True
return False