tools.py•36.9 kB
import json
import os
import re
import ssl
import subprocess
import uuid
from datetime import datetime, timezone
from typing import Union, List, Dict, Optional, Any
import logging
from dotenv import load_dotenv
from .context import Context
from pymongo import MongoClient
from .database import db_connection
from .utils import create_response, mqtt_publish, _format_duration
from .todo_log_service import log_todo_create, log_todo_update, log_todo_delete, log_todo_complete
# Load environment variables
load_dotenv()
# Get the logger
logger = logging.getLogger(__name__)
# Cache constants
TAGS_CACHE_KEY = "all_lesson_tags"
TAGS_CACHE_EXPIRY = 43200 # Cache expiry in seconds (12 hours)
PROJECTS_CACHE_KEY = "all_valid_projects"
PROJECTS_CACHE_EXPIRY = 43200 # Cache expiry in seconds (12 hours)
# Valid project list - all lowercase for case-insensitive matching
# TODO: This will be migrated to MongoDB and deprecated
VALID_PROJECTS = [
"madness_interactive", "regressiontestkit", "omnispindle",
"todomill_projectorium", "swarmonomicon", "hammerspoon",
"lab_management", "cogwyrm", "docker_implementation",
"documentation", "eventghost-rust", "hammerghost",
"quality_assurance", "spindlewrit", "inventorium"
]
# Cache utility functions
def cache_lesson_tags(tags_list, ctx=None):
"""
Cache the list of all lesson tags in MongoDB.
Args:
tags_list: List of tags to cache
ctx: Optional context for user-scoped collections
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
tags_cache_collection = collections['tags_cache']
# Add timestamp for cache expiry management
cache_entry = {
"key": TAGS_CACHE_KEY,
"tags": list(tags_list),
"updated_at": int(datetime.now(timezone.utc).timestamp())
}
# Use upsert to update if exists or insert if not
tags_cache_collection.update_one(
{"key": TAGS_CACHE_KEY},
{"$set": cache_entry},
upsert=True
)
return True
except Exception as e:
logging.error(f"Failed to cache lesson tags: {str(e)}")
return False
def get_cached_lesson_tags(ctx=None):
"""
Retrieve the cached list of lesson tags from MongoDB.
Args:
ctx: Optional context for user-scoped collections
Returns:
List of tags if cache exists and is valid, None otherwise
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
tags_cache_collection = collections['tags_cache']
# Find the cache entry
cache_entry = tags_cache_collection.find_one({"key": TAGS_CACHE_KEY})
if not cache_entry:
return None
# Check if cache is expired
current_time = int(datetime.now(timezone.utc).timestamp())
if current_time - cache_entry["updated_at"] > TAGS_CACHE_EXPIRY:
# Cache expired, invalidate it
invalidate_lesson_tags_cache(ctx)
return None
return cache_entry["tags"]
except Exception as e:
logging.error(f"Failed to retrieve cached lesson tags: {str(e)}")
return None
def invalidate_lesson_tags_cache(ctx=None):
"""
Invalidate the lesson tags cache in MongoDB.
Args:
ctx: Optional context for user-scoped collections
Returns:
True if successful, False otherwise
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
tags_cache_collection = collections['tags_cache']
tags_cache_collection.delete_one({"key": TAGS_CACHE_KEY})
return True
except Exception as e:
logging.error(f"Failed to invalidate lesson tags cache: {str(e)}")
return False
def get_all_lesson_tags(ctx=None):
"""
Get all unique tags from lessons, with caching.
First tries to fetch from cache, falls back to database if needed.
Also updates the cache if fetching from database.
Args:
ctx: Optional context for user-scoped collections
Returns:
List of all unique tags
"""
cached_tags = get_cached_lesson_tags(ctx)
if cached_tags is not None:
return cached_tags
# If not in cache, query from database
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
lessons_collection = collections['lessons']
# Use MongoDB aggregation to get all unique tags
pipeline = [
{"$project": {"tags": 1}},
{"$unwind": "$tags"},
{"$group": {"_id": None, "unique_tags": {"$addToSet": "$tags"}}},
]
result = list(lessons_collection.aggregate(pipeline))
# Extract tags from result
all_tags = []
if result and 'unique_tags' in result[0]:
all_tags = result[0]['unique_tags']
# Cache the results for future use
cache_lesson_tags(all_tags, ctx)
return all_tags
except Exception as e:
logging.error(f"Failed to aggregate lesson tags: {str(e)}")
return []
# Project management functions
def cache_projects(projects_list, ctx=None):
"""
Cache the list of valid projects in MongoDB.
Args:
projects_list: List of project names to cache
ctx: Optional context for user-scoped collections
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
tags_cache_collection = collections['tags_cache']
cache_entry = {
"key": PROJECTS_CACHE_KEY,
"projects": list(projects_list),
"updated_at": int(datetime.now(timezone.utc).timestamp())
}
tags_cache_collection.update_one(
{"key": PROJECTS_CACHE_KEY},
{"$set": cache_entry},
upsert=True
)
return True
except Exception as e:
logging.error(f"Failed to cache projects: {str(e)}")
return False
def get_cached_projects(ctx=None):
"""
Retrieve the cached list of valid projects from MongoDB.
Args:
ctx: Optional context for user-scoped collections
Returns:
List of project names if cache exists and is valid, None otherwise
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
tags_cache_collection = collections['tags_cache']
cache_entry = tags_cache_collection.find_one({"key": PROJECTS_CACHE_KEY})
if not cache_entry:
return None
# Check if cache is expired
current_time = int(datetime.now(timezone.utc).timestamp())
if current_time - cache_entry["updated_at"] > PROJECTS_CACHE_EXPIRY:
invalidate_projects_cache(ctx)
return None
return cache_entry["projects"]
except Exception as e:
logging.error(f"Failed to retrieve cached projects: {str(e)}")
return None
def invalidate_projects_cache(ctx=None):
"""
Invalidate the projects cache in MongoDB.
Args:
ctx: Optional context for user-scoped collections
Returns:
True if successful, False otherwise
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
tags_cache_collection = collections['tags_cache']
tags_cache_collection.delete_one({"key": PROJECTS_CACHE_KEY})
return True
except Exception as e:
logging.error(f"Failed to invalidate projects cache: {str(e)}")
return False
def initialize_projects_collection(ctx=None):
"""
Initialize the projects collection with the current VALID_PROJECTS list.
This is a one-time migration function that includes git URLs and paths.
Args:
ctx: Optional context for user-scoped collections
Returns:
True if successful, False otherwise
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
projects_collection = collections['projects']
# Check if projects collection is already populated
existing_count = projects_collection.count_documents({})
if existing_count > 0:
logging.info(f"Projects collection already has {existing_count} projects")
return True
# Insert all valid projects with enhanced metadata
current_time = int(datetime.now(timezone.utc).timestamp())
project_definitions = {
"madness_interactive": {
"git_url": "https://github.com/d-edens/madness_interactive.git",
"relative_path": "",
"description": "Main Madness Interactive project hub"
},
"regressiontestkit": {
"git_url": "https://github.com/d-edens/RegressionTestKit.git",
"relative_path": "../RegressionTestKit",
"description": "A toolkit for regression testing"
}
}
projects_to_insert = [
{
"id": name,
"name": name,
"display_name": name.replace("_", " ").title(),
"created_at": current_time,
**project_definitions.get(name, {})
}
for name in VALID_PROJECTS
]
if projects_to_insert:
projects_collection.insert_many(projects_to_insert)
logging.info(f"Successfully inserted {len(projects_to_insert)} projects into the collection")
# Invalidate project cache after initialization
invalidate_projects_cache(ctx)
return True
except Exception as e:
logging.error(f"Failed to initialize projects collection: {str(e)}")
return False
def get_all_projects(ctx=None):
"""
Get all projects from the database, with caching.
Args:
ctx: Optional context for user-scoped collections
"""
cached_projects = get_cached_projects(ctx)
if cached_projects:
return cached_projects
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
projects_collection = collections['projects']
# Get all projects from the database
projects_from_db = list(projects_collection.find({}, {"_id": 0}))
# If the database is empty, initialize it as a fallback
if not projects_from_db:
initialize_projects_collection(ctx)
projects_from_db = list(projects_collection.find({}, {"_id": 0}))
# Cache the results for future use
cache_projects(projects_from_db, ctx)
return projects_from_db
except Exception as e:
logging.error(f"Failed to get projects from database: {str(e)}")
return []
def validate_project_name(project: str) -> str:
# Normalize project name for validation
project_lower = project.lower()
# Check if the normalized project name is in the list of valid projects
if project_lower in [p.lower() for p in VALID_PROJECTS]:
return project_lower # Return the lowercase version for consistency
# Default to "madness_interactive" if not found
return "madness_interactive"
async def add_todo(description: str, project: str, priority: str = "Medium", target_agent: str = "user", metadata: Optional[Dict[str, Any]] = None, ctx: Optional[Context] = None) -> str:
"""
Creates a task in the specified project with the given priority and target agent.
Returns a compact representation of the created todo with an ID for reference.
"""
todo_id = str(uuid.uuid4())
validated_project = validate_project_name(project)
todo = {
"id": todo_id,
"description": description,
"project": validated_project,
"priority": priority,
"status": "pending",
"target_agent": target_agent,
"created_at": int(datetime.now(timezone.utc).timestamp()),
"metadata": metadata or {}
}
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
todos_collection = collections['todos']
todos_collection.insert_one(todo)
user_email = ctx.user.get("email", "anonymous") if ctx and ctx.user else "anonymous"
logger.info(f"Todo created by {user_email} in user database: {todo_id}")
await log_todo_create(todo_id, description, project, user_email)
# Get project todo counts from user's database
pipeline = [
{"$match": {"project": validated_project}},
{"$group": {"_id": "$status", "count": {"$sum": 1}}}
]
counts = list(todos_collection.aggregate(pipeline))
project_counts = {
"pending": 0,
"completed": 0,
}
for status_count in counts:
if status_count["_id"] in project_counts:
project_counts[status_count["_id"]] = status_count["count"]
return create_response(True,
{
"operation": "create",
"status": "success",
"todo_id": todo_id,
"description": description[:40] + ("..." if len(description) > 40 else ""),
"project_counts": project_counts
},
message=f"Todo '{description[:30]}...' created in '{validated_project}'. Pending: {project_counts['pending']}, Completed: {project_counts['completed']}."
)
except Exception as e:
logger.error(f"Failed to create todo: {str(e)}")
return create_response(False, message=str(e))
async def query_todos(filter: Optional[Dict[str, Any]] = None, projection: Optional[Dict[str, Any]] = None, limit: int = 100, ctx: Optional[Context] = None) -> str:
"""
Query todos with flexible filtering options from user's database.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
todos_collection = collections['todos']
cursor = todos_collection.find(filter or {}, projection).limit(limit)
results = list(cursor)
return create_response(True, {"items": results})
except Exception as e:
logger.error(f"Failed to query todos: {str(e)}")
return create_response(False, message=str(e))
async def update_todo(todo_id: str, updates: dict, ctx: Optional[Context] = None) -> str:
"""
Update a todo with the provided changes.
"""
if "updated_at" not in updates:
updates["updated_at"] = int(datetime.now(timezone.utc).timestamp())
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
todos_collection = collections['todos']
existing_todo = todos_collection.find_one({"id": todo_id})
if not existing_todo:
return create_response(False, message=f"Todo {todo_id} not found.")
result = todos_collection.update_one({"id": todo_id}, {"$set": updates})
if result.modified_count == 1:
user_email = ctx.user.get("email", "anonymous") if ctx and ctx.user else "anonymous"
logger.info(f"Todo updated by {user_email}: {todo_id}")
description = updates.get('description', existing_todo.get('description', 'Unknown'))
project = updates.get('project', existing_todo.get('project', 'Unknown'))
changes = [
{"field": field, "old_value": existing_todo.get(field), "new_value": value}
for field, value in updates.items()
if field != 'updated_at' and existing_todo.get(field) != value
]
await log_todo_update(todo_id, description, project, changes, user_email)
return create_response(True, message=f"Todo {todo_id} updated successfully")
else:
return create_response(False, message=f"Todo {todo_id} not found or no changes made.")
except Exception as e:
logger.error(f"Failed to update todo: {str(e)}")
return create_response(False, message=str(e))
async def delete_todo(todo_id: str, ctx: Optional[Context] = None) -> str:
"""
Delete a todo item by its ID.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
todos_collection = collections['todos']
existing_todo = todos_collection.find_one({"id": todo_id})
if existing_todo:
user_email = ctx.user.get("email", "anonymous") if ctx and ctx.user else "anonymous"
logger.info(f"Todo deleted by {user_email}: {todo_id}")
await log_todo_delete(todo_id, existing_todo.get('description', 'Unknown'),
existing_todo.get('project', 'Unknown'), user_email)
result = todos_collection.delete_one({"id": todo_id})
if result.deleted_count == 1:
return create_response(True, message=f"Todo {todo_id} deleted successfully.")
else:
return create_response(False, message=f"Todo {todo_id} not found.")
except Exception as e:
logger.error(f"Failed to delete todo: {str(e)}")
return create_response(False, message=str(e))
async def get_todo(todo_id: str, ctx: Optional[Context] = None) -> str:
"""
Get a specific todo item by its ID.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
todos_collection = collections['todos']
todo = todos_collection.find_one({"id": todo_id})
if todo:
return create_response(True, todo)
else:
return create_response(False, message=f"Todo with ID {todo_id} not found.")
except Exception as e:
logger.error(f"Failed to get todo: {str(e)}")
return create_response(False, message=str(e))
async def mark_todo_complete(todo_id: str, comment: Optional[str] = None, ctx: Optional[Context] = None) -> str:
"""
Mark a todo as completed.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
todos_collection = collections['todos']
existing_todo = todos_collection.find_one({"id": todo_id})
if not existing_todo:
return create_response(False, message=f"Todo {todo_id} not found.")
completed_at = int(datetime.now(timezone.utc).timestamp())
duration_sec = completed_at - existing_todo.get('created_at', completed_at)
updates = {
"status": "completed",
"completed_at": completed_at,
"duration": _format_duration(duration_sec),
"duration_sec": duration_sec,
"updated_at": completed_at
}
if comment:
updates["metadata.completion_comment"] = comment
user_email = ctx.user.get("email", "anonymous") if ctx and ctx.user else "anonymous"
updates["metadata.completed_by"] = user_email
result = todos_collection.update_one({"id": todo_id}, {"$set": updates})
if result.modified_count == 1:
user_email = ctx.user.get("email", "anonymous") if ctx and ctx.user else "anonymous"
logger.info(f"Todo completed by {user_email}: {todo_id}")
await log_todo_complete(todo_id, existing_todo.get('description', 'Unknown'),
existing_todo.get('project', 'Unknown'), user_email)
return create_response(True, message=f"Todo {todo_id} marked as complete.")
else:
return create_response(False, message=f"Failed to update todo {todo_id}.")
except Exception as e:
logger.error(f"Failed to mark todo complete: {str(e)}")
return create_response(False, message=str(e))
async def list_todos_by_status(status: str, limit: int = 100, ctx: Optional[Context] = None) -> str:
"""
List todos filtered by their status.
"""
if status.lower() not in ['pending', 'completed', 'initial']:
return create_response(False, message="Invalid status. Must be one of 'pending', 'completed', 'initial'.")
return await query_todos(filter={"status": status.lower()}, limit=limit, ctx=ctx)
async def add_lesson(language: str, topic: str, lesson_learned: str, tags: Optional[list] = None, ctx: Optional[Context] = None) -> str:
"""
Add a new lesson to the knowledge base.
"""
lesson = {
"id": str(uuid.uuid4()),
"language": language,
"topic": topic,
"lesson_learned": lesson_learned,
"tags": tags or [],
"created_at": int(datetime.now(timezone.utc).timestamp())
}
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
lessons_collection = collections['lessons']
lessons_collection.insert_one(lesson)
if tags:
# Invalidate the tags cache when new tags are added
invalidate_lesson_tags_cache(ctx)
return create_response(True, lesson)
except Exception as e:
logger.error(f"Failed to add lesson: {str(e)}")
return create_response(False, message=str(e))
async def get_lesson(lesson_id: str, ctx: Optional[Context] = None) -> str:
"""
Get a specific lesson by its ID.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
lessons_collection = collections['lessons']
lesson = lessons_collection.find_one({"id": lesson_id})
if lesson:
return create_response(True, lesson)
else:
return create_response(False, message=f"Lesson with ID {lesson_id} not found.")
except Exception as e:
logger.error(f"Failed to get lesson: {str(e)}")
return create_response(False, message=str(e))
async def update_lesson(lesson_id: str, updates: dict, ctx: Optional[Context] = None) -> str:
"""
Update an existing lesson.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
lessons_collection = collections['lessons']
result = lessons_collection.update_one({"id": lesson_id}, {"$set": updates})
if result.modified_count == 1:
if 'tags' in updates:
# Invalidate the tags cache when tags are modified
invalidate_lesson_tags_cache(ctx)
return create_response(True, message=f"Lesson {lesson_id} updated.")
else:
return create_response(False, message=f"Lesson {lesson_id} not found.")
except Exception as e:
logger.error(f"Failed to update lesson: {str(e)}")
return create_response(False, message=str(e))
async def delete_lesson(lesson_id: str, ctx: Optional[Context] = None) -> str:
"""
Delete a lesson by its ID.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
lessons_collection = collections['lessons']
result = lessons_collection.delete_one({"id": lesson_id})
if result.deleted_count == 1:
# Invalidate the tags cache when lessons are deleted
invalidate_lesson_tags_cache(ctx)
return create_response(True, message=f"Lesson {lesson_id} deleted.")
else:
return create_response(False, message=f"Lesson {lesson_id} not found.")
except Exception as e:
logger.error(f"Failed to delete lesson: {str(e)}")
return create_response(False, message=str(e))
async def search_todos(query: str, fields: Optional[list] = None, limit: int = 100, ctx: Optional[Context] = None) -> str:
"""
Search todos with text search capabilities.
"""
if fields is None:
fields = ["description", "project"]
search_query = {
"$or": [{field: {"$regex": query, "$options": "i"}} for field in fields]
}
return await query_todos(filter=search_query, limit=limit, ctx=ctx)
async def grep_lessons(pattern: str, limit: int = 20, ctx: Optional[Context] = None) -> str:
"""
Search lessons with grep-style pattern matching across topic and content.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
lessons_collection = collections['lessons']
search_query = {
"$or": [
{"topic": {"$regex": pattern, "$options": "i"}},
{"lesson_learned": {"$regex": pattern, "$options": "i"}}
]
}
cursor = lessons_collection.find(search_query).limit(limit)
results = list(cursor)
return create_response(True, {"items": results})
except Exception as e:
logger.error(f"Failed to grep lessons: {str(e)}")
return create_response(False, message=str(e))
async def list_project_todos(project: str, limit: int = 5, ctx: Optional[Context] = None) -> str:
"""
List recent active todos for a specific project.
"""
return await query_todos(
filter={"project": project.lower(), "status": "pending"},
limit=limit,
ctx=ctx
)
async def query_todo_logs(filter_type: str = 'all', project: str = 'all',
page: int = 1, page_size: int = 20, ctx: Optional[Context] = None) -> str:
"""
Query the todo logs with filtering and pagination.
"""
from .todo_log_service import get_service_instance
service = get_service_instance()
logs = await service.get_logs(filter_type, project, page, page_size)
return create_response(True, logs)
async def list_projects(include_details: Union[bool, str] = False, madness_root: str = "/Users/d.edens/lab/madness_interactive", ctx: Optional[Context] = None) -> str:
"""
List all valid projects from the centralized project management system.
"""
# This tool now directly returns the hardcoded list of valid projects
return create_response(True, {"projects": VALID_PROJECTS})
async def add_explanation(topic: str, content: str, kind: str = "concept", author: str = "system", ctx: Optional[Context] = None) -> str:
"""
Add a new explanation to the knowledge base.
"""
explanation = {
"topic": topic,
"content": content,
"kind": kind,
"author": author,
"created_at": datetime.now(timezone.utc)
}
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
explanations_collection = collections['explanations']
explanations_collection.update_one(
{"topic": topic},
{"$set": explanation},
upsert=True
)
return create_response(True, explanation, f"Explanation for '{topic}' added/updated.")
except Exception as e:
logger.error(f"Failed to add explanation: {str(e)}")
return create_response(False, message=str(e))
async def get_explanation(topic: str, ctx: Optional[Context] = None) -> str:
"""Get an explanation for a given topic."""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
explanations_collection = collections['explanations']
explanation = explanations_collection.find_one({"topic": topic})
if explanation:
return create_response(True, explanation)
return create_response(False, message=f"Explanation for '{topic}' not found.")
except Exception as e:
logger.error(f"Failed to get explanation: {str(e)}")
return create_response(False, message=str(e))
async def update_explanation(topic: str, updates: dict, ctx: Optional[Context] = None) -> str:
"""Update an existing explanation."""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
explanations_collection = collections['explanations']
result = explanations_collection.update_one({"topic": topic}, {"$set": updates})
if result.modified_count:
return create_response(True, message="Explanation updated.")
return create_response(False, message="Explanation not found or no changes made.")
except Exception as e:
logger.error(f"Failed to update explanation: {str(e)}")
return create_response(False, message=str(e))
async def delete_explanation(topic: str, ctx: Optional[Context] = None) -> str:
"""Delete an explanation for a given topic."""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
explanations_collection = collections['explanations']
result = explanations_collection.delete_one({"topic": topic})
if result.deleted_count:
return create_response(True, message="Explanation deleted.")
return create_response(False, message="Explanation not found.")
except Exception as e:
logger.error(f"Failed to delete explanation: {str(e)}")
return create_response(False, message=str(e))
async def explain_tool(topic: str, brief: bool = False, ctx: Optional[Context] = None) -> str:
"""
Provides a detailed explanation for a project or concept.
"""
from . import explain as explain_module
explanation = await explain_module.explain(topic, brief)
return create_response(True, {"topic": topic, "explanation": explanation})
async def list_lessons(limit: int = 100, brief: bool = False, ctx: Optional[Context] = None) -> str:
"""
List all lessons, sorted by creation date.
"""
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
lessons_collection = collections['lessons']
cursor = lessons_collection.find().sort("created_at", -1).limit(limit)
results = list(cursor)
if brief:
results = [{"id": r["id"], "topic": r["topic"], "language": r["language"]} for r in results]
return create_response(True, {"items": results})
except Exception as e:
logger.error(f"Failed to list lessons: {str(e)}")
return create_response(False, message=str(e))
async def search_lessons(query: str, fields: Optional[list] = None, limit: int = 100, brief: bool = False, ctx: Optional[Context] = None) -> str:
"""
Search lessons with text search capabilities.
"""
if fields is None:
fields = ["topic", "lesson_learned", "tags"]
search_query = {
"$or": [{field: {"$regex": query, "$options": "i"}} for field in fields]
}
try:
# Get user-scoped collections
collections = db_connection.get_collections(ctx.user if ctx else None)
lessons_collection = collections['lessons']
cursor = lessons_collection.find(search_query).limit(limit)
results = list(cursor)
if brief:
results = [{"id": r["id"], "topic": r["topic"], "language": r["language"]} for r in results]
return create_response(True, {"items": results})
except Exception as e:
logger.error(f"Failed to search lessons: {str(e)}")
return create_response(False, message=str(e))
async def point_out_obvious(observation: str, sarcasm_level: int = 5, ctx: Optional[Context] = None) -> str:
"""
Points out something obvious to the human user with varying levels of humor.
Args:
observation: The obvious thing to point out
sarcasm_level: Scale from 1-10 (1=gentle, 10=maximum sass)
ctx: Optional context
Returns:
A response highlighting the obvious with appropriate commentary
"""
import random
# Sarcasm templates based on level
templates = {
1: ["Just a friendly observation: {obs}", "I noticed that {obs}"],
2: ["It seems that {obs}", "Apparently, {obs}"],
3: ["Fun fact: {obs}", "Did you know? {obs}"],
4: ["Breaking news: {obs}", "Alert: {obs}"],
5: ["Captain Obvious reporting: {obs}", "In today's episode of 'Things We Already Know': {obs}"],
6: ["🎉 Congratulations! You've discovered that {obs}", "Achievement unlocked: Noticing that {obs}"],
7: ["*drum roll* ... {obs}", "Stop the presses! {obs}"],
8: ["I'm sure you're shocked to learn that {obs}", "Brace yourself: {obs}"],
9: ["In other groundbreaking revelations: {obs}", "Nobel Prize committee, take note: {obs}"],
10: ["🤯 Mind = Blown: {obs}", "Call the scientists, we've confirmed that {obs}"]
}
# Clamp sarcasm level
level = max(1, min(10, sarcasm_level))
# Pick a random template for the level
template_options = templates.get(level, templates[5])
template = random.choice(template_options)
# Format the response
response = template.format(obs=observation)
# Add emoji based on level
if level >= 7:
emojis = ["🙄", "😏", "🤔", "🧐", "🎭"]
response = f"{random.choice(emojis)} {response}"
# Log the obvious observation (for science)
logger.info(f"Obvious observation made (sarcasm={level}): {observation}")
# Store in a special "obvious_things" collection if we have DB
try:
# Get user-scoped collections - use a generic collection access
collections = db_connection.get_collections(ctx.user if ctx else None)
# Access the database directly for custom collections like obvious_observations
obvious_collection = collections.database["obvious_observations"]
obvious_collection.insert_one({
"observation": observation,
"sarcasm_level": level,
"timestamp": datetime.now(timezone.utc),
"user": ctx.user.get("sub") if ctx and ctx.user else "anonymous",
"response": response
})
except Exception as e:
logger.debug(f"Failed to store obvious observation: {e}")
# Publish to MQTT for other systems to enjoy the obviousness
try:
mqtt_publish("observations/obvious", {
"observation": observation,
"sarcasm_level": level,
"response": response
})
except Exception as e:
logger.debug(f"Failed to publish obvious observation: {e}")
return create_response(True, {
"response": response,
"observation": observation,
"sarcasm_level": level,
"meta": {
"obviousness_score": min(100, level * 10),
"humor_attempted": True,
"captain_obvious_mode": level >= 5
}
})
# DISABLED FOR SECURITY - DO NOT UNCOMMENT WITHOUT PROPER SANDBOXING
async def bring_your_own(tool_name: str, code: str, runtime: str = "python",
timeout: int = 30, args: Optional[Dict[str, Any]] = None,
persist: bool = False, ctx: Optional[Context] = None) -> str:
"""
DISABLED: Custom tool execution has been disabled for security reasons.
This tool previously allowed arbitrary code execution which poses significant
security risks. It has been disabled until proper sandboxing can be implemented.
Args:
tool_name: Name for the temporary tool (ignored)
code: The code to execute (ignored)
runtime: Runtime environment (ignored)
timeout: Maximum execution time (ignored)
args: Arguments to pass to the custom tool (ignored)
persist: Whether to save this tool (ignored)
ctx: Optional context
Returns:
Security error message
"""
logger.warning(f"Attempt to use disabled bring_your_own tool by user: {ctx.user.get('sub', 'anonymous') if ctx and ctx.user else 'anonymous'}")
return create_response(False,
message="SECURITY: The 'bring_your_own' tool has been disabled for security reasons. "
"This tool previously allowed arbitrary code execution which poses significant "
"security risks. Please contact an administrator if you need custom tool functionality.")