Skip to main content
Glama
proxy.py22.9 kB
""" Augments injection proxy using mitmproxy. Intercepts requests to Anthropic API and replaces __NISABA_AUGMENTS_PLACEHOLDER__ with actual augments content. Supports checkpoint-based context compression. """ import datetime import importlib import json import logging import os import tiktoken from logging.handlers import RotatingFileHandler from mitmproxy import http from nisaba.structured_file import JsonStructuredFile, StructuredFileCache from nisaba.workspace_files import WorkspaceFiles from nisaba.wrapper.request_modifier import RequestModifier from pathlib import Path from typing import Optional, List, TYPE_CHECKING if TYPE_CHECKING: from nisaba.augments import AugmentManager # Setup logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Ensure log directory exists log_dir = Path(".nisaba/logs") log_dir.mkdir(parents=True, exist_ok=True) # Add file handler for proxy logs if not any(isinstance(h, RotatingFileHandler) for h in logger.handlers): file_handler = RotatingFileHandler( log_dir / "proxy.log", maxBytes=1*1024*1024, # 10MB backupCount=3 ) file_handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) file_handler.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.info("Proxy logging initialized to .nisaba/logs/proxy.log") # Module-level singleton for RequestModifier access _REQUEST_MODIFIER_INSTANCE = None def _set_request_modifier(instance): """Store the RequestModifier instance for tool access.""" global _REQUEST_MODIFIER_INSTANCE _REQUEST_MODIFIER_INSTANCE = instance def get_request_modifier(): """Get the active RequestModifier instance.""" assert(_REQUEST_MODIFIER_INSTANCE) return _REQUEST_MODIFIER_INSTANCE class AugmentInjector: """ mitmproxy addon that injects augments content into Anthropic API requests. Intercepts POST requests to api.anthropic.com, parses the JSON body, finds system prompt blocks containing __NISABA_AUGMENTS_PLACEHOLDER__, and replaces: - First occurrence: replaced with augments content from file - Remaining occurrences: deleted (replaced with empty string) """ def __init__( self, augment_manager: Optional["AugmentManager"] = None ): """ Initialize augments injector. Can operate in two modes: 1. File-based (legacy): reads from augments_file 2. Shared-state (unified): uses shared AugmentManager Args: augments_file: Path to file containing augments content. If None, reads from NISABA_AUGMENTS_FILE env var. augment_manager: Shared AugmentManager instance (unified mode) """ # Unified mode (shared AugmentManager) self.augment_manager = augment_manager # Get shared workspace files singleton self.files = WorkspaceFiles.instance() # Aliases for backward compatibility (existing code uses these names) self.augments_cache = self.files.augments self.system_prompt_cache = self.files.system_prompt self.core_system_prompt_cache = self.files.core_system_prompt self.transcript_cache = self.files.transcript self.structural_view_cache = self.files.structural_view self.todos_cache = self.files.todos self.notifications_cache = self.files.notifications self.checkpoint_file = self.files.notification_state self.filtered_tools:set[str] = { "TodoWrite" } self.core_system_prompt_tokens:int = 0 # Store reference globally for tool access self.request_modifier:RequestModifier = RequestModifier() _set_request_modifier(self.request_modifier) # Initial load if augment_manager is None: self.augments_cache.load() else: # Unified mode - get content from AugmentManager logger.info("AugmentInjector initialized in unified mode (shared AugmentManager)") self.system_prompt_cache.load() self.transcript_cache.load() self.structural_view_cache.load() self.todos_cache.load() self.notifications_cache.load() def request(self, flow: http.HTTPFlow) -> None: """ Intercept and modify requests. Called by mitmproxy for each HTTP request. If the request is to Anthropic API, we parse the body, replace the placeholder, and modify the request. In unified mode, also checks for checkpoint and applies compression. Args: flow: mitmproxy HTTPFlow object """ # Only intercept Anthropic API requests if not self._is_anthropic_request(flow): return try: # Parse request body as JSON body = json.loads(flow.request.content) self.request_modifier.clear_visible_tools() body = self.request_modifier.process_request(body) # Detect delta and generate notifications self._process_notifications(body) # Process system prompt blocks if self._inject_augments(body): flow.request.content = json.dumps(body).encode('utf-8') except json.JSONDecodeError as e: logger.error(f"Failed to parse request JSON: {e}") except Exception as e: logger.error(f"Error processing request: {e}") def _is_anthropic_request(self, flow: http.HTTPFlow) -> bool: """ Check if this is an Anthropic API request. Args: flow: mitmproxy HTTPFlow object Returns: True if this is a request to Anthropic API """ return ( flow.request.method == "POST" and "api.anthropic.com" in flow.request.pretty_host ) def _inject_augments(self, body: dict) -> bool: """ Inject augments content into request body. Finds __NISABA_AUGMENTS_PLACEHOLDER__ in system blocks: - First occurrence: replaced with augments content - Remaining occurrences: deleted Args: body: Parsed JSON request body (modified in place) Returns: True if any replacements were made """ # Filter native tools on every request if "tools" in body: filtered_tools = [] for tool in body["tools"]: if "name" in tool and tool["name"] in self.filtered_tools: continue filtered_tools.append(tool) body["tools"] = filtered_tools if "system" in body: if len(body["system"]) < 2: body["system"].append( { "type": "text", "text": ( f"\n{self.system_prompt_cache.load()}" f"\n{self.augments_cache.load()}" f"\n{self.transcript_cache.load()}" ), "cache_control": { "type": "ephemeral" } } ) elif "text" in body["system"][1]: # Generate status bar from current state if not self.core_system_prompt_cache.file_path.exists() or self.core_system_prompt_cache.content != body["system"][1]["text"]: self.core_system_prompt_cache.write(body["system"][1]["text"]) body["system"][1]["text"] = ( f"\n{self.system_prompt_cache.load()}" f"\n{self.core_system_prompt_cache.load()}" f"\n{self.augments_cache.load()}" f"\n{self.transcript_cache.load()}" ) if 'messages' in body and len(body["messages"]) > 2: visible_tools = self.request_modifier.get_and_visible_tool_result_views() if visible_tools: visible_tools = f"\n---RESULTS_END\n{visible_tools}\n---RESULTS_END" status_bar = f"{self._generate_status_bar(body, visible_tools)}" workspace_text = ( f"<system-reminder>\n--- WORKSPACE ---" f"\n{status_bar}" f"\n{self.structural_view_cache.load()}" # f"{visible_tools}" # this has a newline when populated f"\n{self.notifications_cache.load()}" f"\n{self.todos_cache.load()}" f"\n</system-reminder>" ) body['messages'].append( { "role": "user", "content": [ { "type": "text", "text": workspace_text } ] } ) # TODO: this is mostly for development - it needs to bne switched off self._write_to_file(Path(os.getcwd()) / '.nisaba/workspace.md', workspace_text, "Workspace markdow written") self._write_to_file(Path(os.getcwd()) / '.nisaba/modified_context.json', json.dumps(body, indent=2, ensure_ascii=False), "Modified request written") return True return "tools" in body or "system" in body def _write_to_file(self, file_path:Path, content: str, log_message: str | None = None) -> None: """ write to file file_path the content optionally displaying log_message """ try: # Create/truncate file (only last message) with open(file_path, "w", encoding="utf-8") as f: f.write(content) if log_message: logger.debug(log_message) except Exception as e: # Don't crash proxy if logging fails logger.error(f"Failed to log context: {e}") def _process_notifications(self, body: dict) -> None: """ Detect delta in messages and generate notifications for new tool calls. Uses session-aware checkpoint to avoid duplicate notifications on restart. Args: body: Parsed JSON request body """ # Extract current session ID try: user_id = body.get('metadata', {}).get('user_id', '') if '_session_' in user_id: current_session_id = user_id.split('_session_')[1] else: current_session_id = None except Exception as e: logger.warning(f"Failed to extract session ID: {e}") current_session_id = None # Load checkpoint checkpoint = self.checkpoint_file.load_json() last_session_id = checkpoint.get('session_id') last_tool_id_seen = checkpoint.get('last_tool_id_seen') # Determine if this is a new session is_new_session = (current_session_id != last_session_id) messages = body.get('messages', []) # Collect all tool_use blocks with their IDs (in order) all_tool_calls = [] for msg in messages: if not isinstance(msg, dict): continue content = msg.get('content', []) if isinstance(content, str): continue for block in content: if not isinstance(block, dict): continue if block.get('type') == 'tool_use': tool_id = block.get('id') if tool_id: all_tool_calls.append({ 'id': tool_id, 'name': block.get('name', 'unknown'), 'input': block.get('input', {}) }) # Determine which tool calls are "new" for notification new_tool_calls = [] if is_new_session: # New session: notify all tools new_tool_calls = all_tool_calls else: # Same session: notify only tools after last_tool_id_seen found_last = (last_tool_id_seen is None) # If None, include all for call in all_tool_calls: if found_last: new_tool_calls.append(call) elif call['id'] == last_tool_id_seen: found_last = True # Start including from next tool # Build map of tool results (scan all messages) tool_results = {} for msg in messages: if not isinstance(msg, dict): continue content = msg.get('content', []) if isinstance(content, str): continue for block in content: if not isinstance(block, dict): continue if block.get('type') == 'tool_result': tool_use_id = block.get('tool_use_id') if tool_use_id: tool_results[tool_use_id] = block # Generate notifications notifications = [] for call in new_tool_calls: result = tool_results.get(call['id']) if result: icon = '✗' if result.get('is_error') else '✓' summary = self._create_summary(call['name'], call, result) notifications.append(f"{icon} {call['name']}() → {summary}") # Write to file if notifications: self._write_notifications(notifications) logger.info(f"Generated {len(notifications)} notification(s)") # Update checkpoint with latest tool ID (if any tools exist) if all_tool_calls and current_session_id: last_tool_id = all_tool_calls[-1]['id'] self.checkpoint_file.write_json({"session_id": current_session_id, "last_tool_id_seen": last_tool_id}) def _create_summary(self, tool_name: str, call: dict, result: dict) -> str: """ Create concise summary from tool result. Parses nisaba standard tool return format to extract semantic message. Args: tool_name: Name of the tool call: Tool call block (contains input parameters) result: Tool result block Returns: Concise summary string """ import re try: content = result.get('content', '') # Parse JSON response from content if isinstance(content, list) and content: data = json.loads(content[0].get('text', '{}')) elif isinstance(content, str): data = json.loads(content) else: return "ok" # Extract 'data' field (contains markdown in nisaba standard format) data_field = data.get('data', '') # Parse markdown for **message**: field if isinstance(data_field, str): match = re.search(r'\*\*message\*\*:\s*\n\s*(.+)', data_field) if match: return match.group(1).strip() # Fallback: use 'message' from root (also nisaba standard) if 'message' in data: return data['message'] except Exception: pass return "ok" def _write_notifications(self, notifications: List[str]) -> None: """ Write notifications via WorkspaceFiles singleton. Args: notifications: List of notification strings """ content = "" if notifications: content += "Recent activity:\n" content += "\n".join(notifications) + "\n" else: content += "(no recent activity)\n" # Write via singleton (updates cache + file atomically) self.files.notifications.write(content) def _estimate_tokens(self, text: str) -> int: """ Accurate token estimate using tiktoken. Args: text: Text to estimate Returns: Exact token count using cl100k_base encoding """ try: enc = tiktoken.get_encoding("cl100k_base") return len(enc.encode(text)) except Exception as e: logger.warning(f"tiktoken encoding failed, using fallback: {e}") # Fallback to rough estimate if tiktoken fails return len(text) // 4 def _generate_status_bar(self, body: dict, tool_results:str = "") -> str: """ Generate status bar with segmented token usage. Calculates workspace, messages, and total token counts. Shows window counts for context awareness. Exports JSON for external status line tools. Args: body: Request body dict Returns: Formatted status bar with tags """ # Extract model name model_name = body.get('model', 'unknown') # Load all caches (triggers mtime-based updates) self.system_prompt_cache.load() self.todos_cache.load() self.notifications_cache.load() self.augments_cache.load() self.structural_view_cache.load() self.transcript_cache.load() # Use cached token counts (no recalculation!) structural_view_tokens = self.structural_view_cache.token_count workspace_tokens = self.todos_cache.token_count + self.notifications_cache.token_count + structural_view_tokens # Count message tokens (properly, without JSON overhead) messages = body.get('messages', []) messages_tokens = 0 for msg in messages: if isinstance(msg, dict): # Count role messages_tokens += self._estimate_tokens(msg.get('role', '')) # Count content content = msg.get('content', '') if isinstance(content, str): messages_tokens += self._estimate_tokens(content) elif isinstance(content, list): for block in content: if isinstance(block, dict): # Count actual content based on block type block_type = block.get('type', '') if block_type == 'text': messages_tokens += self._estimate_tokens(block.get('text', '')) elif block_type == 'tool_use': messages_tokens += self._estimate_tokens(block.get('name', '')) messages_tokens += self._estimate_tokens(str(block.get('input', {}))) elif block_type == 'tool_result': result_content = block.get('content', '') if isinstance(result_content, str): messages_tokens += self._estimate_tokens(result_content) else: messages_tokens += self._estimate_tokens(str(result_content)) elif block_type == 'thinking': messages_tokens += self._estimate_tokens(block.get('thinking', '')) else: # Fallback for unknown block types messages_tokens += self._estimate_tokens(str(block)) tool_tokens = self._estimate_tokens(json.dumps(body.get('tools', []))) tool_result_tokens = self._estimate_tokens(tool_results) # Total usage total_tokens = workspace_tokens + self.system_prompt_cache.token_count + self.core_system_prompt_cache.token_count + tool_tokens + self.augments_cache.token_count + tool_result_tokens + messages_tokens budget = 200 # 200k token budget # Export JSON for external status line tools status_data = { "model": model_name, "workspace": { "total": workspace_tokens, "system": { "prompt": self.system_prompt_cache.token_count + self.core_system_prompt_cache.token_count, "tools": tool_tokens, "transcript": self.transcript_cache.token_count }, "augments": self.augments_cache.token_count, "view": structural_view_tokens, "tool_results": tool_result_tokens, "notifications": self.notifications_cache.line_count - 1, # header + endl compensation, "todos": self.todos_cache.line_count - 1 # header + endl compensation }, "messages": messages_tokens, "total": total_tokens, "budget": budget * 1000 } ws = status_data['workspace'] parts = [ f"SYSTEM({ws['system']['prompt']//1000}k)", f"TOOLS({ws['system']['tools']//1000}k)", f"AUG({ws['augments']//1000}k)", f"COMPTRANS({ws['system']['transcript']//1000}k)", f"MSG({status_data['messages']//1000}k)", f"WORKPACE({ws['total']//1000}k)", f"STVIEW({ws['view']//1000}k)", f"RESULTS({ws['tool_results']//1000}k)", f"MODEL({model_name})", f"{status_data['total']//1000}k/{status_data['budget']//1000}k" ] SPLIT = 4 status = "\n".join([ ' | '.join(parts[0:SPLIT]), ' | '.join(parts[SPLIT:-2]), ' | '.join(parts[-2:]), ]) try: status_file = Path("./.nisaba/tui/status_bar_live.txt") status_file.parent.mkdir(parents=True, exist_ok=True) with open(status_file, 'w') as f: status_file.write_text(status) except Exception as e: logger.warning(f"Failed to write status JSON: {e}") return f"---STATUS_BAR\n{status}\n---STATUS_BAR_END" def load(loader): """ Called when mitmproxy loads this script. This is the mitmproxy addon interface for script-based addons. """ # Augments file comes from environment variable addon = AugmentInjector() # Add option for documentation loader.add_option( name="nisaba_augments_file", typespec=str, default="./.nisaba/tui/augment_view.md", help="Path to augments content file", ) # Register the addon return addon # Instantiate addon for mitmproxy addons = [AugmentInjector()]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server