#!/usr/bin/env python3
import os
import sys
import re
import json
import subprocess
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import random
import yaml
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity as sklearn_cosine
class ObsidianVault:
def __init__(self, vault_path):
self.vault_path = Path(vault_path)
self._cache = {}
self._graph_cache = None
def _get_all_notes(self):
if "notes" in self._cache:
return self._cache["notes"]
notes = []
for md_file in self.vault_path.rglob("*.md"):
if ".trash" in md_file.parts or ".obsidian" in md_file.parts:
continue
notes.append(md_file)
self._cache["notes"] = notes
return notes
def _parse_note(self, note_path):
try:
with open(note_path, "r", encoding="utf-8") as f:
content = f.read()
except Exception as e:
return None
# frontmatter handeling
frontmatter = {}
content_body = content
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
try:
frontmatter = yaml.safe_load(parts[1]) or {}
except:
pass
content_body = parts[2]
# wikilinks [[link]] or [[link|alias]]
wikilinks = re.findall(r"\[\[([^\]]+)\]\]", content)
parsed_links = []
for link in wikilinks:
# aliases
if "|" in link:
link = link.split("|")[0]
# headings
if "#" in link:
link = link.split("#")[0]
if link:
parsed_links.append(link.strip())
# tags from content (#tag)
content_tags = re.findall(r"(?:^|\s)#([\w/-]+)", content_body)
frontmatter_tags = frontmatter.get("tags", [])
if frontmatter_tags is None:
frontmatter_tags = []
if isinstance(frontmatter_tags, str):
frontmatter_tags = [frontmatter_tags]
all_tags = list(set(content_tags + frontmatter_tags))
return {
"path": note_path,
"frontmatter": frontmatter,
"content": content_body,
"links": parsed_links,
"tags": all_tags,
"title": frontmatter.get("title", note_path.stem),
"mtime": os.path.getmtime(note_path),
}
def _build_graph(self):
if self._graph_cache:
return self._graph_cache
notes = self._get_all_notes()
graph = {
"nodes": {}, # note_name -> note_data
"forward_links": defaultdict(list), # note -> [notes it links to]
"backlinks": defaultdict(list), # note -> [notes that link to it]
"broken_links": defaultdict(list),
}
note_name_to_path = {}
path_to_stem = {}
stem_to_canonical = {}
for note_path in notes:
note_data = self._parse_note(note_path)
if note_data:
note_name = note_path.stem
note_name_lower = note_name.lower()
note_name_to_path[note_name_lower] = note_path
stem_to_canonical[note_name_lower] = note_name
graph["nodes"][note_name] = note_data
rel_path = str(note_path.relative_to(self.vault_path))
rel_path_no_ext = rel_path.rsplit(".md", 1)[0]
rel_path_lower = rel_path_no_ext.lower()
note_name_to_path[rel_path_lower] = note_path
path_to_stem[rel_path_lower] = note_name
for note_name, note_data in graph["nodes"].items():
for link in note_data["links"]:
link_normalized = link.strip()
link_lower = link_normalized.lower()
if link_lower in note_name_to_path:
target_stem = path_to_stem.get(
link_lower, stem_to_canonical.get(link_lower, link_normalized)
)
graph["forward_links"][note_name].append(target_stem)
graph["backlinks"][target_stem].append(note_name)
else:
# broken linkks
graph["broken_links"][link_normalized].append(note_name)
self._graph_cache = graph
return graph
def search(self, query, search_in=["title", "content", "tags"], limit=50):
notes = self._get_all_notes()
results = []
query_lower = query.lower()
for note_path in notes:
note_data = self._parse_note(note_path)
if not note_data:
continue
match = False
match_reason = []
title = note_data.get("title") or ""
if "title" in search_in and title and query_lower in title.lower():
match = True
match_reason.append("title")
if "content" in search_in and query_lower in note_data["content"].lower():
match = True
match_reason.append("content")
if "tags" in search_in:
for tag in note_data["tags"]:
if tag and query_lower in tag.lower():
match = True
match_reason.append("tag")
break
if match:
results.append(
{
"path": str(note_data["path"].relative_to(self.vault_path)),
"title": title,
"match_reason": match_reason,
"tags": note_data["tags"][:5],
}
)
if len(results) >= limit:
break
return results
def get_broken_links(self, limit=100, filter_tag=None):
graph = self._build_graph()
broken = []
for link_name, referrers in graph["broken_links"].items():
tag_counts = Counter()
for ref in referrers:
ref_data = graph["nodes"].get(ref, {})
for tag in ref_data.get("tags", []):
tag_counts[tag] += 1
inferred_tags = tag_counts.most_common(10)
if filter_tag:
matched = any(
filter_tag.lower() in tag.lower() for tag, _ in inferred_tags
)
if not matched:
continue
broken.append(
{
"link": link_name,
"count": len(referrers),
"referrers": referrers[:10],
"inferred_tags": inferred_tags,
}
)
broken.sort(key=lambda x: x["count"], reverse=True)
return broken[:limit]
def get_backlinks(self, note_name):
graph = self._build_graph()
matching_notes = [
n for n in graph["nodes"].keys() if n.lower() == note_name.lower()
]
if not matching_notes:
return {"error": f'Note "{note_name}" not found'}
note_name = matching_notes[0]
backlinks = graph["backlinks"].get(note_name, [])
return {
"note": note_name,
"backlink_count": len(backlinks),
"backlinks": backlinks,
}
def get_forward_links(self, note_name):
graph = self._build_graph()
matching_notes = [
n for n in graph["nodes"].keys() if n.lower() == note_name.lower()
]
if not matching_notes:
return {"error": f'Note "{note_name}" not found'}
note_name = matching_notes[0]
forward_links = graph["forward_links"].get(note_name, [])
return {
"note": note_name,
"forward_link_count": len(forward_links),
"forward_links": forward_links,
}
def get_orphans(self, limit=None):
graph = self._build_graph()
orphans = []
for note_name, note_data in graph["nodes"].items():
has_backlinks = len(graph["backlinks"].get(note_name, [])) > 0
has_forward_links = len(graph["forward_links"].get(note_name, [])) > 0
if not has_backlinks and not has_forward_links:
orphans.append(
{
"note": note_name,
"path": str(note_data["path"].relative_to(self.vault_path)),
"title": note_data["title"],
}
)
if limit:
return orphans[:limit]
return orphans
def get_hubs(self, limit=50, min_links=5):
graph = self._build_graph()
hubs = []
for note_name, note_data in graph["nodes"].items():
backlink_count = len(graph["backlinks"].get(note_name, []))
forward_link_count = len(graph["forward_links"].get(note_name, []))
total_links = backlink_count + forward_link_count
if total_links >= min_links:
hubs.append(
{
"note": note_name,
"backlinks": backlink_count,
"forward_links": forward_link_count,
"total": total_links,
"path": str(note_data["path"].relative_to(self.vault_path)),
}
)
hubs.sort(key=lambda x: x["total"], reverse=True)
return hubs[:limit]
def get_stats(self):
graph = self._build_graph()
notes = graph["nodes"]
total_forward_links = sum(
len(links) for links in graph["forward_links"].values()
)
total_broken_links = sum(len(refs) for refs in graph["broken_links"].values())
all_tags = []
for note_data in notes.values():
all_tags.extend(note_data["tags"])
tag_counter = Counter(all_tags)
no_backlinks = sum(
1 for name in notes if len(graph["backlinks"].get(name, [])) == 0
)
return {
"total_notes": len(notes),
"total_links": total_forward_links,
"unique_broken_links": len(graph["broken_links"]),
"total_broken_link_references": total_broken_links,
"total_tags": len(tag_counter),
"most_common_tags": tag_counter.most_common(10),
"notes_without_backlinks": no_backlinks,
"orphaned_notes": len(
[
n
for n in notes
if len(graph["backlinks"].get(n, [])) == 0
and len(graph["forward_links"].get(n, [])) == 0
]
),
}
def get_recent(self, limit=20, days=None):
notes = self._get_all_notes()
recent = []
now = datetime.now().timestamp()
for note_path in notes:
note_data = self._parse_note(note_path)
if not note_data:
continue
mtime = note_data["mtime"]
age_days = (now - mtime) / 86400
if days is None or age_days <= days:
recent.append(
{
"path": str(note_data["path"].relative_to(self.vault_path)),
"title": note_data["title"],
"modified": datetime.fromtimestamp(mtime).isoformat(),
"days_ago": round(age_days, 1),
}
)
recent.sort(key=lambda x: x["days_ago"])
return recent[:limit]
def get_tags(self, min_count=1):
graph = self._build_graph()
tag_counter = Counter()
tag_notes = defaultdict(list)
for note_name, note_data in graph["nodes"].items():
for tag in note_data["tags"]:
tag_counter[tag] += 1
tag_notes[tag].append(note_name)
tags = []
for tag, count in tag_counter.most_common():
if count >= min_count:
tags.append(
{
"tag": tag,
"count": count,
"notes": tag_notes[tag][:10], # First 10 notes
}
)
return tags
def get_related(self, note_name, limit=10):
graph = self._build_graph()
matching_notes = [
n for n in graph["nodes"].keys() if n.lower() == note_name.lower()
]
if not matching_notes:
return {"error": f'Note "{note_name}" not found'}
note_name = matching_notes[0]
forward_links = set(graph["forward_links"].get(note_name, []))
backlinks = set(graph["backlinks"].get(note_name, []))
scores = {}
for other_note in graph["nodes"].keys():
if other_note == note_name:
continue
other_forward = set(graph["forward_links"].get(other_note, []))
other_backlinks = set(graph["backlinks"].get(other_note, []))
shared_forward = len(forward_links & other_forward)
shared_backlinks = len(backlinks & other_backlinks)
direct_connection = 0
if other_note in forward_links or other_note in backlinks:
direct_connection = 10
score = shared_forward * 2 + shared_backlinks * 2 + direct_connection
if score > 0:
scores[other_note] = score
related = [
{"note": note, "score": score}
for note, score in sorted(scores.items(), key=lambda x: x[1], reverse=True)
]
return {
"note": note_name,
"related_count": len(related),
"related": related[:limit],
}
def _load_embeddings(self):
if "embeddings" in self._cache:
return self._cache["embeddings"]
embeddings = {}
smart_env = self.vault_path / ".smart-env" / "multi"
if not smart_env.exists():
return embeddings
for ajson_file in smart_env.glob("*.ajson"):
try:
with open(ajson_file, "r") as f:
content = f.read()
if content.strip().endswith(","):
content = "{" + content.rstrip().rstrip(",") + "}"
else:
content = "{" + content.strip() + "}"
data = json.loads(content)
for key, note_data in data.items():
if note_data is None:
continue
if (
"embeddings" in note_data
and "TaylorAI/bge-micro-v2" in note_data["embeddings"]
):
vec = note_data["embeddings"]["TaylorAI/bge-micro-v2"]["vec"]
path = note_data.get("path", "")
note_name = (
Path(path).stem
if path
else Path(key.replace("smart_sources:", "")).stem
)
embeddings[note_name] = np.array(vec)
except Exception as e:
continue
self._cache["embeddings"] = embeddings
return embeddings
def _cosine_similarity(self, vec1, vec2):
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def get_context(self, query, depth=2, limit=10):
graph = self._build_graph()
embeddings = self._load_embeddings()
if not embeddings:
return {
"error": "No embeddings found. Smart Connections may not be configured."
}
matching_notes = [
n for n in graph["nodes"].keys() if n.lower() == query.lower()
]
if not matching_notes:
return {"error": f'Note "{query}" not found'}
note_name = matching_notes[0]
if note_name not in embeddings:
return {"error": f'No embedding found for "{note_name}"'}
query_vec = embeddings[note_name]
scores = {}
for other_note, other_vec in embeddings.items():
if other_note == note_name:
continue
similarity = self._cosine_similarity(query_vec, other_vec)
scores[other_note] = {
"similarity": similarity,
"link_bonus": 0,
"total": similarity,
}
forward_links = set(graph["forward_links"].get(note_name, []))
backlinks = set(graph["backlinks"].get(note_name, []))
for linked_note in forward_links | backlinks:
if linked_note in scores:
scores[linked_note]["link_bonus"] += 0.3
scores[linked_note]["total"] += 0.3
if depth >= 2:
for first_degree in forward_links | backlinks:
second_forward = set(graph["forward_links"].get(first_degree, []))
second_back = set(graph["backlinks"].get(first_degree, []))
for second_degree in second_forward | second_back:
if second_degree in scores and second_degree not in (
forward_links | backlinks
):
scores[second_degree]["link_bonus"] += 0.1
scores[second_degree]["total"] += 0.1
sorted_notes = sorted(scores.items(), key=lambda x: x[1]["total"], reverse=True)
results = []
for other_note, score_data in sorted_notes[:limit]:
node_data = graph["nodes"].get(other_note, {})
results.append(
{
"note": other_note,
"path": (
str(node_data["path"].relative_to(self.vault_path))
if "path" in node_data
else other_note
),
"similarity": round(score_data["similarity"], 3),
"link_bonus": round(score_data["link_bonus"], 3),
"total_score": round(score_data["total"], 3),
}
)
return {"query": note_name, "context_count": len(results), "context": results}
def suggest_missing_links(self, threshold=0.85, limit=20):
graph = self._build_graph()
embeddings = self._load_embeddings()
if not embeddings:
return {
"error": "No embeddings found. Smart Connections may not be configured."
}
suggestions = []
note_list = list(embeddings.keys())
for i, note_a in enumerate(note_list):
if note_a not in graph["nodes"]:
continue
vec_a = embeddings[note_a]
forward_a = set(graph["forward_links"].get(note_a, []))
back_a = set(graph["backlinks"].get(note_a, []))
connected_a = forward_a | back_a
for note_b in note_list[i + 1 :]:
if note_b not in graph["nodes"]:
continue
if note_b in connected_a:
continue
vec_b = embeddings[note_b]
similarity = self._cosine_similarity(vec_a, vec_b)
if similarity >= threshold:
forward_b = set(graph["forward_links"].get(note_b, []))
back_b = set(graph["backlinks"].get(note_b, []))
connected_b = forward_b | back_b
shared_neighbors = len(connected_a & connected_b)
suggestions.append(
{
"note_a": note_a,
"note_b": note_b,
"similarity": round(similarity, 3),
"shared_neighbors": shared_neighbors,
"reason": (
"high_semantic_similarity"
if shared_neighbors == 0
else "semantic_and_structural"
),
}
)
suggestions.sort(
key=lambda x: (x["similarity"], x["shared_neighbors"]), reverse=True
)
return {"suggestions": suggestions[:limit]}
def analyze_knowledge_velocity(self, days=90, metric="links"):
graph = self._build_graph()
since_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
try:
result = subprocess.run(
[
"git",
"log",
"--since",
since_date,
"--name-only",
"--format=%H|%at|%s",
],
cwd=self.vault_path,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError:
return {"error": "Git repository not found or git command failed"}
commits = []
lines = result.stdout.strip().split("\n")
i = 0
while i < len(lines):
if "|" in lines[i]:
commit_hash, timestamp, message = lines[i].split("|", 2)
files = []
i += 1
while i < len(lines) and lines[i] and "|" not in lines[i]:
if lines[i].endswith(".md"):
files.append(lines[i])
i += 1
commits.append(
{
"hash": commit_hash,
"timestamp": int(timestamp),
"message": message,
"files": files,
}
)
else:
i += 1
if metric == "links":
note_link_timeline = defaultdict(list)
for commit in sorted(commits, key=lambda x: x["timestamp"]):
date = datetime.fromtimestamp(commit["timestamp"]).strftime("%Y-%m-%d")
for filepath in commit["files"]:
note_name = Path(filepath).stem
if note_name in graph["nodes"]:
link_count = len(graph["forward_links"].get(note_name, []))
note_link_timeline[note_name].append(
{"date": date, "link_count": link_count}
)
velocity = []
for note_name, timeline in note_link_timeline.items():
if len(timeline) >= 2:
first = timeline[0]
last = timeline[-1]
link_growth = last["link_count"] - first["link_count"]
if link_growth > 0:
velocity.append(
{
"note": note_name,
"initial_links": first["link_count"],
"final_links": last["link_count"],
"growth": link_growth,
"first_seen": first["date"],
"last_seen": last["date"],
}
)
velocity.sort(key=lambda x: x["growth"], reverse=True)
return {
"metric": "link_growth",
"period_days": days,
"notes": velocity[:50],
}
elif metric == "edits":
edit_frequency = Counter()
for commit in commits:
for filepath in commit["files"]:
note_name = Path(filepath).stem
edit_frequency[note_name] += 1
results = [
{"note": note, "edit_count": count}
for note, count in edit_frequency.most_common(50)
]
return {"metric": "edit_frequency", "period_days": days, "notes": results}
def score_note_maturity(self):
graph = self._build_graph()
scores = []
for note_name, note_data in graph["nodes"].items():
word_count = len(note_data["content"].split())
backlink_count = len(graph["backlinks"].get(note_name, []))
forward_count = len(graph["forward_links"].get(note_name, []))
total_links = backlink_count + forward_count
age_days = (datetime.now().timestamp() - note_data["mtime"]) / 86400
word_score = min(word_count / 500, 1.0)
link_score = min(total_links / 10, 1.0)
recency_score = 1.0 if age_days < 30 else max(0.5, 1.0 - (age_days / 365))
maturity = word_score * 0.4 + link_score * 0.4 + recency_score * 0.2
issues = []
if word_count < 100:
issues.append("stub")
if total_links == 0:
issues.append("orphaned")
if age_days > 180 and backlink_count == 0:
issues.append("write-only")
if age_days > 90 and recency_score < 0.7:
issues.append("stale")
scores.append(
{
"note": note_name,
"maturity_score": round(maturity, 2),
"word_count": word_count,
"backlinks": backlink_count,
"forward_links": forward_count,
"age_days": round(age_days, 1),
"issues": issues,
}
)
scores.sort(key=lambda x: x["maturity_score"])
return scores
def batch_tag(
self,
similar_to=None,
add_tag=None,
remove_tag=None,
threshold=0.85,
dry_run=True,
):
graph = self._build_graph()
embeddings = self._load_embeddings()
if similar_to and not embeddings:
return {
"error": "No embeddings found. Smart Connections may not be configured."
}
targets = []
if similar_to:
matching_notes = [
n for n in graph["nodes"].keys() if n.lower() == similar_to.lower()
]
if not matching_notes:
return {"error": f'Note "{similar_to}" not found'}
ref_note = matching_notes[0]
if ref_note not in embeddings:
return {"error": f'No embedding found for "{ref_note}"'}
ref_vec = embeddings[ref_note]
for note_name, note_vec in embeddings.items():
if note_name == ref_note:
continue
similarity = self._cosine_similarity(ref_vec, note_vec)
if similarity >= threshold:
targets.append((note_name, similarity))
targets.sort(key=lambda x: x[1], reverse=True)
else:
targets = [(n, 1.0) for n in graph["nodes"].keys()]
operations = []
for note_name, similarity in targets:
note_data = graph["nodes"][note_name]
current_tags = note_data["tags"].copy()
modified = False
if add_tag and add_tag not in current_tags:
current_tags.append(add_tag)
modified = True
if remove_tag and remove_tag in current_tags:
current_tags.remove(remove_tag)
modified = True
if modified:
operations.append(
{
"note": note_name,
"path": str(note_data["path"].relative_to(self.vault_path)),
"similarity": round(similarity, 3) if similar_to else None,
"old_tags": note_data["tags"],
"new_tags": current_tags,
}
)
if dry_run:
return {"dry_run": True, "operations": operations}
for op in operations:
note_path = self.vault_path / op["path"]
try:
with open(note_path, "r", encoding="utf-8") as f:
content = f.read()
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
fm = yaml.safe_load(parts[1]) or {}
fm["tags"] = op["new_tags"]
new_frontmatter = yaml.dump(
fm, default_flow_style=False, allow_unicode=True
)
content = f"---\n{new_frontmatter}---{parts[2]}"
with open(note_path, "w", encoding="utf-8") as f:
f.write(content)
except Exception as e:
op["error"] = str(e)
return {"dry_run": False, "operations": operations}
def analyze_link_weights(self, min_weight=1):
graph = self._build_graph()
try:
result = subprocess.run(
["git", "log", "--name-only", "--format=%H|%at"],
cwd=self.vault_path,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError:
return {"error": "Git repository not found or git command failed"}
co_edit_counts = defaultdict(int)
lines = result.stdout.strip().split("\n")
i = 0
while i < len(lines):
if "|" in lines[i]:
files = []
i += 1
while i < len(lines) and lines[i] and "|" not in lines[i]:
if lines[i].endswith(".md"):
files.append(Path(lines[i]).stem)
i += 1
for j, file_a in enumerate(files):
for file_b in files[j + 1 :]:
pair = tuple(sorted([file_a, file_b]))
co_edit_counts[pair] += 1
else:
i += 1
weighted_links = []
for note_name in graph["nodes"].keys():
forward_links = graph["forward_links"].get(note_name, [])
for linked_note in forward_links:
pair = tuple(sorted([note_name, linked_note]))
weight = co_edit_counts.get(pair, 0)
if weight >= min_weight:
weighted_links.append(
{
"from": note_name,
"to": linked_note,
"weight": weight,
"type": (
"strong"
if weight >= 5
else "moderate" if weight >= 3 else "weak"
),
}
)
weighted_links.sort(key=lambda x: x["weight"], reverse=True)
return {"links": weighted_links[:100]}
def build_prereq_tree(self, root_note, depth=3):
graph = self._build_graph()
matching_notes = [
n for n in graph["nodes"].keys() if n.lower() == root_note.lower()
]
if not matching_notes:
return {"error": f'Note "{root_note}" not found'}
root = matching_notes[0]
visited = set()
tree = {"note": root, "children": []}
queue = [(root, tree, 0)]
while queue:
current_note, current_node, current_depth = queue.pop(0)
if current_note in visited or current_depth >= depth:
continue
visited.add(current_note)
backlinks = graph["backlinks"].get(current_note, [])
backlink_data = []
for bl in backlinks:
if bl not in visited:
node_data = graph["nodes"].get(bl)
if node_data:
backlink_data.append((bl, node_data["mtime"]))
backlink_data.sort(key=lambda x: x[1])
for bl_note, mtime in backlink_data:
child_node = {"note": bl_note, "children": []}
current_node["children"].append(child_node)
queue.append((bl_note, child_node, current_depth + 1))
return {"root": root, "tree": tree, "depth": depth}
# rag
def _build_tfidf_index(self):
if "tfidf" in self._cache:
return self._cache["tfidf"]
graph = self._build_graph()
note_names = []
documents = []
for note_name, note_data in graph["nodes"].items():
title = note_data.get("title") or note_name
content = note_data.get("content", "")
tags = " ".join(note_data.get("tags", []))
doc = f"{title} {title} {title} {tags} {content}"
note_names.append(note_name)
documents.append(doc)
vectorizer = TfidfVectorizer(
max_features=20000,
stop_words="english",
ngram_range=(1, 2),
min_df=2,
max_df=0.9,
sublinear_tf=True,
)
tfidf_matrix = vectorizer.fit_transform(documents)
result = {
"vectorizer": vectorizer,
"matrix": tfidf_matrix,
"note_names": note_names,
}
self._cache["tfidf"] = result
return result
def _extract_excerpt(self, content, query_terms, max_len=500):
paragraphs = re.split(r"\n\s*\n", content.strip())
if not paragraphs:
return ""
scored = []
for para in paragraphs:
para_lower = para.lower()
score = sum(1 for term in query_terms if term in para_lower)
word_count = len(para.split())
if word_count < 3:
score *= 0.1
scored.append((score, para))
scored.sort(key=lambda x: x[0], reverse=True)
result = ""
for score, para in scored:
if score == 0:
break
if len(result) + len(para) > max_len:
if not result:
result = para[:max_len]
break
result += para + "\n\n"
return result.strip() if result else paragraphs[0][:max_len]
def rag_search(self, query, limit=10, excerpt_len=500, expand_links=True):
tfidf = self._build_tfidf_index()
graph = self._build_graph()
query_vec = tfidf["vectorizer"].transform([query])
similarities = sklearn_cosine(query_vec, tfidf["matrix"]).flatten()
fetch_count = min(limit * 3, len(tfidf["note_names"]))
top_indices = similarities.argsort()[::-1][:fetch_count]
candidates = []
for idx in top_indices:
score = float(similarities[idx])
if score < 0.01:
break
candidates.append(
{
"note": tfidf["note_names"][idx],
"tfidf_score": round(score, 4),
}
)
if not candidates:
return {"query": query, "results": [], "result_count": 0}
embeddings = self._load_embeddings()
if embeddings and candidates:
anchor_vecs = []
for c in candidates[:3]:
if c["note"] in embeddings:
anchor_vecs.append(embeddings[c["note"]])
if anchor_vecs:
anchor_mean = np.mean(anchor_vecs, axis=0)
for c in candidates:
if c["note"] in embeddings:
emb_sim = float(
self._cosine_similarity(anchor_mean, embeddings[c["note"]])
)
c["embedding_score"] = round(emb_sim, 4)
c["combined_score"] = round(
c["tfidf_score"] * 0.6 + emb_sim * 0.4, 4
)
else:
c["embedding_score"] = 0.0
c["combined_score"] = c["tfidf_score"]
candidates.sort(key=lambda x: x["combined_score"], reverse=True)
else:
for c in candidates:
c["combined_score"] = c["tfidf_score"]
c["embedding_score"] = 0.0
else:
for c in candidates:
c["combined_score"] = c["tfidf_score"]
c["embedding_score"] = 0.0
results = candidates[:limit]
query_terms = [t.lower() for t in query.split() if len(t) > 2]
for r in results:
note_data = graph["nodes"].get(r["note"], {})
r["path"] = (
str(note_data["path"].relative_to(self.vault_path))
if "path" in note_data
else ""
)
r["tags"] = note_data.get("tags", [])[:5]
r["excerpt"] = self._extract_excerpt(
note_data.get("content", ""), query_terms, max_len=excerpt_len
)
if expand_links and len(results) < limit:
seen = {r["note"] for r in results}
expanded = []
for r in results[:3]:
neighbors = set(graph["forward_links"].get(r["note"], []))
neighbors |= set(graph["backlinks"].get(r["note"], []))
for neighbor in neighbors:
if neighbor not in seen and neighbor in graph["nodes"]:
seen.add(neighbor)
nd = graph["nodes"][neighbor]
expanded.append(
{
"note": neighbor,
"path": str(nd["path"].relative_to(self.vault_path)),
"tags": nd.get("tags", [])[:5],
"tfidf_score": 0.0,
"embedding_score": 0.0,
"combined_score": 0.0,
"excerpt": self._extract_excerpt(
nd.get("content", ""), query_terms, max_len=300
),
"source": "graph_expansion",
}
)
results.extend(expanded[: limit - len(results)])
return {
"query": query,
"result_count": len(results),
"results": results,
}
def detect_clusters(self, min_cluster_size=3, max_iterations=50):
graph = self._build_graph()
nodes = list(graph["nodes"].keys())
if not nodes:
return {"clusters": []}
adjacency = defaultdict(set)
for note in nodes:
for target in graph["forward_links"].get(note, []):
if target in graph["nodes"]:
adjacency[note].add(target)
adjacency[target].add(note)
labels = {node: i for i, node in enumerate(nodes)}
for _ in range(max_iterations):
changed = False
order = nodes.copy()
random.shuffle(order)
for node in order:
neighbors = adjacency.get(node, set())
if not neighbors:
continue
neighbor_labels = Counter(labels[n] for n in neighbors)
most_common_label = neighbor_labels.most_common(1)[0][0]
if labels[node] != most_common_label:
labels[node] = most_common_label
changed = True
if not changed:
break
cluster_map = defaultdict(list)
for node, label in labels.items():
cluster_map[label].append(node)
clusters = []
for label, members in cluster_map.items():
if len(members) < min_cluster_size:
continue
member_set = set(members)
centrality = {}
for m in members:
internal_links = len(adjacency.get(m, set()) & member_set)
centrality[m] = internal_links
central_nodes = sorted(
centrality.items(), key=lambda x: x[1], reverse=True
)[:3]
cluster_tags = Counter()
for m in members:
nd = graph["nodes"].get(m, {})
for tag in nd.get("tags", []):
cluster_tags[tag] += 1
internal_links = 0
external_links = 0
for m in members:
for target in graph["forward_links"].get(m, []):
if target in member_set:
internal_links += 1
else:
external_links += 1
clusters.append(
{
"size": len(members),
"central_notes": [n for n, _ in central_nodes],
"top_tags": cluster_tags.most_common(5),
"members": sorted(members),
"internal_links": internal_links,
"external_links": external_links,
"cohesion": round(
internal_links / max(internal_links + external_links, 1), 3
),
}
)
clusters.sort(key=lambda x: x["size"], reverse=True)
return {"cluster_count": len(clusters), "clusters": clusters}
def graph_diff(self, since_days=7):
since_date = (datetime.now() - timedelta(days=since_days)).strftime("%Y-%m-%d")
try:
result = subprocess.run(
[
"git",
"log",
"--since",
since_date,
"--name-status",
"--format=%H|%at|%s",
"--diff-filter=ADMRTUC",
],
cwd=self.vault_path,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError:
return {"error": "Git repository not found or git command failed"}
new_notes = set()
deleted_notes = set()
modified_notes = set()
lines = result.stdout.strip().split("\n")
for line in lines:
if not line or "|" in line:
continue
parts = line.split("\t")
if len(parts) < 2:
continue
status, filepath = parts[0], parts[-1]
if not filepath.endswith(".md"):
continue
note_name = Path(filepath).stem
if status.startswith("A"):
new_notes.add(note_name)
elif status.startswith("D"):
deleted_notes.add(note_name)
elif status.startswith("M") or status.startswith("R"):
modified_notes.add(note_name)
graph = self._build_graph()
link_changes = []
for note_name in list(modified_notes)[:50]:
if note_name not in graph["nodes"]:
continue
current_links = set(graph["forward_links"].get(note_name, []))
note_data = graph["nodes"][note_name]
rel_path = str(note_data["path"].relative_to(self.vault_path))
try:
old_content = subprocess.run(
["git", "show", f"HEAD~{since_days * 3}:{rel_path}"],
cwd=self.vault_path,
capture_output=True,
text=True,
)
if old_content.returncode == 0:
old_wikilinks = re.findall(r"\[\[([^\]]+)\]\]", old_content.stdout)
old_links = set()
for link in old_wikilinks:
if "|" in link:
link = link.split("|")[0]
if "#" in link:
link = link.split("#")[0]
if link.strip():
old_links.add(link.strip())
added_links = current_links - old_links
removed_links = old_links - current_links
if added_links or removed_links:
link_changes.append(
{
"note": note_name,
"links_added": list(added_links),
"links_removed": list(removed_links),
}
)
except Exception:
continue
new_note_details = []
for note_name in new_notes:
if note_name in graph["nodes"]:
nd = graph["nodes"][note_name]
new_note_details.append(
{
"note": note_name,
"word_count": len(nd.get("content", "").split()),
"links_out": len(graph["forward_links"].get(note_name, [])),
"tags": nd.get("tags", [])[:5],
}
)
return {
"period_days": since_days,
"new_notes": new_note_details,
"new_note_count": len(new_notes),
"deleted_notes": list(deleted_notes),
"deleted_count": len(deleted_notes),
"modified_count": len(modified_notes),
"link_changes": link_changes,
"summary": {
"total_links_added": sum(len(lc["links_added"]) for lc in link_changes),
"total_links_removed": sum(
len(lc["links_removed"]) for lc in link_changes
),
},
}
def extract_open_questions(self, limit=50):
graph = self._build_graph()
questions = []
question_re = re.compile(r"^[>\s-]*(.{10,200}\?)\s*$", re.MULTILINE)
todo_re = re.compile(
r"^[\s>-]*(?:TODO|FIXME|OPEN|QUESTION|IDEA)[:\s]+(.+)$",
re.MULTILINE | re.IGNORECASE,
)
checkbox_re = re.compile(
r"^[\s>]*-\s*\[\s\]\s+(.+)$",
re.MULTILINE,
)
for note_name, note_data in graph["nodes"].items():
content = note_data.get("content", "")
content_no_code = re.sub(r"```[\s\S]*?```", "", content)
content_no_code = re.sub(r"^---\n[\s\S]*?\n---\n", "", content_no_code)
for match in question_re.finditer(content_no_code):
text = match.group(1).strip()
if text.startswith("http") or len(text) < 15:
continue
if text.startswith("#") and "?" not in text[:5]:
continue
questions.append(
{
"type": "question",
"text": text,
"note": note_name,
"path": str(note_data["path"].relative_to(self.vault_path)),
"mtime": note_data["mtime"],
}
)
for match in todo_re.finditer(content_no_code):
text = match.group(1).strip()
if len(text) < 5:
continue
questions.append(
{
"type": "todo",
"text": text,
"note": note_name,
"path": str(note_data["path"].relative_to(self.vault_path)),
"mtime": note_data["mtime"],
}
)
for match in checkbox_re.finditer(content_no_code):
text = match.group(1).strip()
if len(text) < 5:
continue
questions.append(
{
"type": "checkbox",
"text": text,
"note": note_name,
"path": str(note_data["path"].relative_to(self.vault_path)),
"mtime": note_data["mtime"],
}
)
questions.sort(key=lambda x: x["mtime"], reverse=True)
now = datetime.now().timestamp()
for q in questions:
q["days_ago"] = round((now - q["mtime"]) / 86400, 1)
del q["mtime"]
return questions[:limit]
def concept_gaps(self, limit=30):
graph = self._build_graph()
gaps = []
for link_name, referrers in graph["broken_links"].items():
if len(referrers) >= 3:
gaps.append(
{
"type": "missing_note",
"name": link_name,
"demand": len(referrers),
"referenced_by": referrers[:10],
"priority": len(referrers) * 3,
"suggestion": f"Create [[{link_name}]] — referenced by {len(referrers)} notes",
}
)
for note_name, note_data in graph["nodes"].items():
backlink_count = len(graph["backlinks"].get(note_name, []))
word_count = len(note_data.get("content", "").split())
if backlink_count >= 3 and word_count < 50:
gaps.append(
{
"type": "stub_hub",
"name": note_name,
"backlinks": backlink_count,
"word_count": word_count,
"priority": backlink_count * 2,
"suggestion": f"Expand [[{note_name}]] — {backlink_count} notes link here but only {word_count} words",
}
)
adjacency = defaultdict(set)
for note in graph["nodes"]:
for target in graph["forward_links"].get(note, []):
if target in graph["nodes"]:
adjacency[note].add(target)
adjacency[target].add(note)
node_list = [n for n in graph["nodes"] if len(adjacency.get(n, set())) >= 3]
bridge_candidates = []
for i, note_a in enumerate(node_list):
neighbors_a = adjacency[note_a]
for note_b in node_list[i + 1 :]:
if note_b in neighbors_a:
continue
neighbors_b = adjacency[note_b]
shared = len(neighbors_a & neighbors_b)
union = len(neighbors_a | neighbors_b)
if shared >= 3 and union > 0:
jaccard = shared / union
if jaccard >= 0.2:
bridge_candidates.append(
{
"type": "bridge_gap",
"name": f"{note_a} <-> {note_b}",
"note_a": note_a,
"note_b": note_b,
"shared_neighbors": shared,
"jaccard": round(jaccard, 3),
"priority": shared * 2,
"suggestion": f"Link [[{note_a}]] and [[{note_b}]] — {shared} shared neighbors",
}
)
bridge_candidates.sort(key=lambda x: x["priority"], reverse=True)
gaps.extend(bridge_candidates[:20])
gaps.sort(key=lambda x: x["priority"], reverse=True)
return gaps[:limit]
def journal_append(self, entry, source="agent", tags=None):
journal_path = self.vault_path / ".claude" / "journal.jsonl"
journal_path.parent.mkdir(parents=True, exist_ok=True)
record = {
"timestamp": datetime.now().isoformat(),
"source": source,
"entry": entry,
"tags": tags or [],
}
with open(journal_path, "a", encoding="utf-8") as f:
f.write(json.dumps(record) + "\n")
return record
def journal_read(self, limit=50, source=None, tag=None, since_days=None):
journal_path = self.vault_path / ".claude" / "journal.jsonl"
if not journal_path.exists():
return []
entries = []
cutoff = None
if since_days:
cutoff = (datetime.now() - timedelta(days=since_days)).isoformat()
with open(journal_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
if source and record.get("source") != source:
continue
if tag and tag not in record.get("tags", []):
continue
if cutoff and record.get("timestamp", "") < cutoff:
continue
entries.append(record)
entries.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
return entries[:limit]
def _parse_git_edits(self, since_days, until_days=None):
since_date = (datetime.now() - timedelta(days=since_days)).strftime("%Y-%m-%d")
cmd = ["git", "log", "--since", since_date, "--name-only", "--format=%at"]
if until_days is not None:
until_date = (datetime.now() - timedelta(days=until_days)).strftime(
"%Y-%m-%d"
)
cmd.extend(["--until", until_date])
try:
result = subprocess.run(
cmd,
cwd=self.vault_path,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError:
return None
edits = []
current_ts = None
for line in result.stdout.strip().split("\n"):
line = line.strip()
if not line:
continue
if line.isdigit():
current_ts = int(line)
elif line.endswith(".md") and current_ts:
if ".trash" not in line and ".obsidian" not in line:
note_name = Path(line).stem
edits.append((current_ts, note_name, line))
return edits
def topic_timeline(self, tag=None, folder=None, period="week", days=180):
graph = self._build_graph()
edits = self._parse_git_edits(since_days=days)
if edits is None:
return {"error": "Git command failed"}
if tag:
matching_notes = set()
for note_name, note_data in graph["nodes"].items():
if any(tag.lower() in t.lower() for t in note_data.get("tags", [])):
matching_notes.add(note_name)
edits = [(ts, n, p) for ts, n, p in edits if n in matching_notes]
if folder:
folder_lower = folder.lower().rstrip("/")
edits = [
(ts, n, p)
for ts, n, p in edits
if p.lower().startswith(folder_lower + "/")
or p.lower().startswith(folder_lower)
]
if not edits:
filter_desc = (
f"tag={tag}" if tag else f"folder={folder}" if folder else "all"
)
return {"error": f"No edits found for filter: {filter_desc}"}
buckets = defaultdict(lambda: {"edits": 0, "unique_notes": set()})
for ts, note_name, _ in edits:
dt = datetime.fromtimestamp(ts)
if period == "week":
key = dt.strftime("%Y-W%V")
elif period == "month":
key = dt.strftime("%Y-%m")
else:
key = dt.strftime("%Y-%m-%d")
buckets[key]["edits"] += 1
buckets[key]["unique_notes"].add(note_name)
timeline = []
for key in sorted(buckets.keys()):
b = buckets[key]
timeline.append(
{
"period": key,
"edits": b["edits"],
"unique_notes": len(b["unique_notes"]),
"notes": sorted(b["unique_notes"]),
}
)
return {
"filter": {"tag": tag, "folder": folder},
"period_type": period,
"days": days,
"total_edits": sum(t["edits"] for t in timeline),
"total_unique_notes": len(set(n for t in timeline for n in t["notes"])),
"timeline": timeline,
}
def note_history(self, note_name, limit=30):
graph = self._build_graph()
matching = [n for n in graph["nodes"].keys() if n.lower() == note_name.lower()]
if not matching:
return {"error": f'Note "{note_name}" not found'}
note_name = matching[0]
note_data = graph["nodes"][note_name]
rel_path = str(note_data["path"].relative_to(self.vault_path))
try:
result = subprocess.run(
["git", "log", "--follow", "--format=%H|%at|%s", "--", rel_path],
cwd=self.vault_path,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError:
return {"error": "Git command failed"}
commits = []
for line in result.stdout.strip().split("\n"):
if not line or "|" not in line:
continue
parts = line.split("|", 2)
if len(parts) >= 3:
commits.append(
{
"hash": parts[0],
"timestamp": int(parts[1]),
"message": parts[2],
"date": datetime.fromtimestamp(int(parts[1])).strftime(
"%Y-%m-%d %H:%M"
),
}
)
for commit in commits[:limit]:
try:
content_result = subprocess.run(
["git", "show", f"{commit['hash']}:{rel_path}"],
cwd=self.vault_path,
capture_output=True,
text=True,
)
if content_result.returncode == 0:
content = content_result.stdout
commit["word_count"] = len(content.split())
links = re.findall(r"\[\[([^\]]+)\]\]", content)
commit["link_count"] = len(links)
except Exception:
pass
commits = commits[:limit]
for i in range(len(commits) - 1):
curr = commits[i]
prev = commits[i + 1]
if "word_count" in curr and "word_count" in prev:
curr["word_delta"] = curr["word_count"] - prev["word_count"]
curr["link_delta"] = curr.get("link_count", 0) - prev.get(
"link_count", 0
)
return {
"note": note_name,
"path": rel_path,
"total_commits": len(commits),
"first_seen": commits[-1]["date"] if commits else None,
"last_modified": commits[0]["date"] if commits else None,
"history": commits,
}
def attention_flow(self, days=14, group_by="tag", limit=20):
graph = self._build_graph()
edits = self._parse_git_edits(since_days=days)
if edits is None:
return {"error": "Git command failed"}
topic_data = defaultdict(lambda: {"edits": 0, "notes": set(), "latest": 0})
for ts, note_name, filepath in edits:
note_info = graph["nodes"].get(note_name)
if not note_info:
continue
if group_by == "folder":
rel_path = str(note_info["path"].relative_to(self.vault_path))
topics = [rel_path.split("/")[0] if "/" in rel_path else "root"]
else:
topics = note_info.get("tags", []) or ["untagged"]
for topic in topics:
topic_data[topic]["edits"] += 1
topic_data[topic]["notes"].add(note_name)
topic_data[topic]["latest"] = max(topic_data[topic]["latest"], ts)
prev_edits = self._parse_git_edits(since_days=days * 2, until_days=days)
prev_counts = Counter()
if prev_edits:
for ts, note_name, filepath in prev_edits:
note_info = graph["nodes"].get(note_name)
if not note_info:
continue
if group_by == "folder":
rel_path = str(note_info["path"].relative_to(self.vault_path))
topics = [rel_path.split("/")[0] if "/" in rel_path else "root"]
else:
topics = note_info.get("tags", []) or ["untagged"]
for topic in topics:
prev_counts[topic] += 1
results = []
for topic, data in topic_data.items():
prev_count = prev_counts.get(topic, 0)
current_count = data["edits"]
if prev_count > 0:
change_ratio = (current_count - prev_count) / prev_count
trend = (
"rising"
if change_ratio > 0.2
else ("falling" if change_ratio < -0.2 else "stable")
)
else:
trend = "new" if current_count > 0 else "stable"
results.append(
{
"topic": topic,
"edits": current_count,
"unique_notes": len(data["notes"]),
"notes": sorted(data["notes"]),
"prev_period_edits": prev_count,
"trend": trend,
}
)
results.sort(key=lambda x: x["edits"], reverse=True)
return {
"period_days": days,
"group_by": group_by,
"total_edits": sum(r["edits"] for r in results),
"topics": results[:limit],
}