Skip to main content
Glama

Knowledge Base MCP Server

by Leon4s4
MIT License
kb_ui.py16.7 kB
#!/usr/bin/env python3 """Streamlit UI for managing the Knowledge Base.""" import asyncio from pathlib import Path import streamlit as st from kb_server import ( init_database, kb_save, kb_search, kb_list, kb_delete, ) # Initialize the database connection init_database() def run_async(coro): """Utility to run async functions from Streamlit callbacks.""" return asyncio.run(coro) def main() -> None: st.title("Knowledge Base MCP") st.sidebar.markdown("## Actions") # Use buttons instead of selectbox if st.sidebar.button("Add Memory", use_container_width=True): st.session_state.action = "Add Memory" if st.sidebar.button("Search", use_container_width=True): st.session_state.action = "Search" if st.sidebar.button("List Memories", use_container_width=True): st.session_state.action = "List Memories" if st.sidebar.button("Delete Memory", use_container_width=True): st.session_state.action = "Delete Memory" # Initialize default action if not set if "action" not in st.session_state: st.session_state.action = "List Memories" action = st.session_state.action if action == "Add Memory": st.subheader("Add Memory") content = st.text_area("Content") memory_type = st.selectbox( "Memory Type", ["general", "environment", "code_snippet", "operational", "architectural"], ) tags_str = st.text_input("Tags (comma separated)") if st.button("Save"): tags = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else None result = run_async( kb_save( content=content, memory_type=memory_type if memory_type != "general" else None, tags=tags, ) ) st.success(result) elif action == "Search": st.subheader("Search") query = st.text_input("Query") limit = st.number_input("Limit", min_value=1, max_value=20, value=5) mtype = st.selectbox( "Memory Type Filter", ["", "environment", "code_snippet", "operational", "architectural"], ) if st.button("Search"): result = run_async( kb_search(query=query, limit=int(limit), memory_type=mtype or None) ) st.markdown(result) elif action == "List Memories": st.subheader("All Memories") # Filtering options in columns col1, col2 = st.columns(2) with col1: mtype = st.selectbox( "Filter by Type", ["All", "environment", "code_snippet", "operational", "architectural"], index=0 ) with col2: page_size = st.selectbox( "Items per page", [10, 20, 50], index=0 ) # Initialize pagination state if "page_number" not in st.session_state: st.session_state.page_number = 1 # Get memories data directly from kb_server from kb_server import collection if collection: try: # Build where clause for filtering where_clause = {} if mtype != "All": where_clause["memory_type"] = mtype # Get entries from ChromaDB results = collection.get( where=where_clause if where_clause else None, limit=page_size, include=["documents", "metadatas"] ) if results["ids"]: # Display memories as cards st.markdown("---") # Define memory type colors type_colors = { 'environment': '🌐', 'code_snippet': '💻', 'operational': '⚙️', 'architectural': '🏗️', 'general': '📝' } # Create cards using Streamlit containers and columns for idx in range(len(results["ids"])): memory_id = results["ids"][idx] doc = results["documents"][idx] if idx < len(results["documents"]) else "" metadata = results["metadatas"][idx] if idx < len(results["metadatas"]) else {} memory_type = metadata.get('memory_type', 'general') # Create card container with st.container(): # Card header with type badge and date col1, col2 = st.columns([3, 1]) with col1: st.markdown(f"**{type_colors.get(memory_type, '📝')} {memory_type.title()}**") with col2: st.caption(metadata.get('created_at', '')[:10]) # Card content content_preview = doc[:150] + ('...' if len(doc) > 150 else '') st.write(content_preview) # Card footer with ID and actions col1, col2, col3 = st.columns([3.5, 1, 1]) with col1: st.caption(f"ID: {memory_id[:8]}... | Accessed: {metadata.get('access_count', 0)} times") with col2: if st.button("✏️ Edit", key=f"edit_{memory_id}", help="View and edit memory", use_container_width=True): st.session_state.selected_memory_id = memory_id st.session_state.action = "Memory Details" st.rerun() with col3: if st.button("🗑️ Delete", key=f"delete_{memory_id}", help="Delete memory", use_container_width=True): # Quick delete with confirmation if f"confirm_delete_{memory_id}" not in st.session_state: st.session_state[f"confirm_delete_{memory_id}"] = True st.rerun() # Show delete confirmation if needed if f"confirm_delete_{memory_id}" in st.session_state: st.warning("⚠️ Are you sure you want to delete this memory?") col1, col2, col3 = st.columns([1, 1, 2]) with col1: if st.button("Yes, Delete", key=f"confirm_yes_{memory_id}", type="primary"): try: from kb_server import collection collection.delete(ids=[memory_id]) st.success("Memory deleted!") del st.session_state[f"confirm_delete_{memory_id}"] st.rerun() except Exception as e: st.error(f"Error deleting memory: {str(e)}") with col2: if st.button("Cancel", key=f"confirm_no_{memory_id}"): del st.session_state[f"confirm_delete_{memory_id}"] st.rerun() # Visual separator between cards st.divider() else: st.info("No memories found. Click 'Add Memory' to get started!") except Exception as e: st.error(f"Error loading memories: {str(e)}") else: st.error("Database not initialized") # Pagination controls with proper spacing st.markdown("---") col1, col2, col3 = st.columns([2, 1, 2]) with col1: if st.button("← Previous Page", disabled=st.session_state.page_number <= 1, use_container_width=True): st.session_state.page_number -= 1 st.rerun() with col2: st.markdown(f"<div style='text-align: center; padding: 8px; font-weight: bold;'>Page {st.session_state.page_number}</div>", unsafe_allow_html=True) with col3: if st.button("Next Page →", use_container_width=True): st.session_state.page_number += 1 st.rerun() elif action == "Delete Memory": st.subheader("Delete Memory") mem_id = st.text_input("Memory ID (full or partial)") if st.button("Delete"): result = run_async(kb_delete(mem_id)) st.success(result) elif action == "Memory Details": if "selected_memory_id" in st.session_state: memory_id = st.session_state.selected_memory_id # Get memory details from kb_server import collection if collection: try: # Get the specific memory results = collection.get( ids=[memory_id], include=["documents", "metadatas"] ) if results["ids"]: doc = results["documents"][0] metadata = results["metadatas"][0] # Header with back button col1, col2 = st.columns([2, 6]) with col1: if st.button("← Back to List", use_container_width=True): st.session_state.action = "List Memories" st.rerun() with col2: st.subheader("Memory Details") # Memory info memory_type = metadata.get('memory_type', 'general') type_colors = { 'environment': '🌐', 'code_snippet': '💻', 'operational': '⚙️', 'architectural': '🏗️', 'general': '📝' } st.markdown(f"**Type:** {type_colors.get(memory_type, '📝')} {memory_type}") st.markdown(f"**ID:** `{memory_id}`") st.markdown(f"**Created:** {metadata.get('created_at', 'Unknown')}") st.markdown(f"**Last Accessed:** {metadata.get('last_accessed', 'Unknown')}") st.markdown(f"**Access Count:** {metadata.get('access_count', 0)}") if metadata.get('technologies'): techs = metadata['technologies'].split(',') st.markdown(f"**Technologies:** {', '.join(techs)}") if metadata.get('urls'): urls = metadata['urls'].split(',') st.markdown("**URLs:**") for url in urls: st.markdown(f"- {url}") st.markdown("---") # Edit form st.subheader("Edit Memory") new_content = st.text_area("Content", value=doc, height=200) col1, col2 = st.columns(2) with col1: new_memory_type = st.selectbox( "Memory Type", ["general", "environment", "code_snippet", "operational", "architectural"], index=["general", "environment", "code_snippet", "operational", "architectural"].index(memory_type) if memory_type in ["general", "environment", "code_snippet", "operational", "architectural"] else 0 ) with col2: tags_str = st.text_input("Tags (comma separated)", value=metadata.get('tags', '')) # Action buttons st.markdown("---") st.markdown("### Actions") col1, col2, col3 = st.columns(3) with col1: if st.button("💾 Update Memory", use_container_width=True): # Delete old memory and create new one try: # Delete old collection.delete(ids=[memory_id]) # Create new with same ID but updated content from kb_server import extract_metadata new_metadata = extract_metadata(new_content) new_metadata["memory_type"] = new_memory_type if tags_str: new_metadata["tags"] = tags_str # Preserve some original metadata new_metadata["created_at"] = metadata.get("created_at") new_metadata["access_count"] = metadata.get("access_count", 0) collection.add( ids=[memory_id], documents=[new_content], metadatas=[new_metadata] ) st.success("Memory updated successfully!") st.rerun() except Exception as e: st.error(f"Error updating memory: {str(e)}") with col2: if st.button("🗑️ Delete Memory", use_container_width=True): try: collection.delete(ids=[memory_id]) st.success("Memory deleted successfully!") st.session_state.action = "List Memories" if "selected_memory_id" in st.session_state: del st.session_state.selected_memory_id st.rerun() except Exception as e: st.error(f"Error deleting memory: {str(e)}") with col3: if st.button("📋 Copy ID", use_container_width=True): st.code(memory_id) st.info("Memory ID copied above!") else: st.error("Memory not found") st.session_state.action = "List Memories" st.rerun() except Exception as e: st.error(f"Error loading memory: {str(e)}") else: st.error("Database not initialized") else: st.error("No memory selected") st.session_state.action = "List Memories" st.rerun() if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Leon4s4/knowledge-base-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server