#!/usr/bin/env python3
"""
Agent Genesis Enhanced Extraction using MKG semantic analysis.
Processes conversations with Qwen3-Coder for high-quality knowledge extraction.
"""
import asyncio
import sys
import json
import httpx
from pathlib import Path
from typing import List, Dict, Optional
import time
import re
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent.parent))
from mcp_server.mcp_tools import add_decision, add_pattern, add_failure
class AgentGenesisMKGExtractor:
"""Enhanced Agent Genesis extractor using MKG for semantic analysis."""
def __init__(self, queries_file: str, batch_size: int = 20, use_mkg: bool = True):
self.queries_file = Path(queries_file)
self.batch_size = batch_size
self.use_mkg = use_mkg
# Statistics
self.decisions_added = 0
self.patterns_added = 0
self.failures_added = 0
self.skipped = 0
self.errors = 0
self.total_processed = 0
# MKG configuration - Qwen3-Coder-30B local model
self.mkg_url = "http://100.80.229.35:1234"
def load_queries(self) -> List[str]:
"""Load search queries from file."""
queries = []
with open(self.queries_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
queries.append(line)
return queries
async def search_agent_genesis(self, query: str, limit: int = 50) -> List[Dict]:
"""Search Agent Genesis conversations."""
print(f" š Searching: '{query}' (limit: {limit})")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
"http://localhost:8080/search",
json={"query": query, "limit": limit}
)
response.raise_for_status()
result = response.json()
# Extract conversations
nested_results = result.get("results", {})
conversations = nested_results.get("results", [])
print(f" ā
Found {len(conversations)} conversations")
# Transform to expected format
transformed = []
for conv in conversations:
transformed.append({
'conversation_id': conv.get('id', 'unknown'),
'content': conv.get('document', ''),
'metadata': conv.get('metadata', {}),
'relevance_score': 1.0 - conv.get('distance', 0.5)
})
return transformed
except Exception as e:
print(f" ā Search failed: {e}")
return []
async def extract_with_mkg(self, conversation: Dict) -> Optional[Dict]:
"""Extract knowledge using MKG semantic analysis."""
content = conversation.get('content', '')
if len(content) < 100:
return None
# Optimized prompt for Qwen3-Coder
prompt = f"""Analyze this technical conversation and extract EXACTLY ONE insight:
TYPE 1 - TECHNICAL DECISION: Deliberate choice between alternatives
Examples: "Chose Redis over MongoDB", "Decided TypeScript over JavaScript"
Format: {{"type": "decision", "description": "brief summary", "rationale": "why", "alternatives": ["opt1", "opt2"]}}
TYPE 2 - RECURRING PATTERN: Repeated solution approach
Examples: "Always implement health checks", "Use dependency injection"
Format: {{"type": "pattern", "name": "pattern name", "context": "when", "implementation": "how"}}
TYPE 3 - SYSTEMATIC FAILURE: Consistent problem/anti-pattern
Examples: "Timeouts during cache invalidation", "Memory leaks in handlers"
Format: {{"type": "failure", "attempt": "what tried", "reason": "why failed", "lesson": "learned"}}
Conversation:
{content[:2000]}
Respond with ONLY valid JSON. If none match, return {{"type": "none"}}.
No markdown, no explanation, just JSON."""
try:
# Use MKG local model
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{self.mkg_url}/v1/chat/completions",
json={
"model": "local",
"messages": [
{"role": "system", "content": "You are a JSON extraction assistant. Respond with ONLY valid JSON, no markdown, no explanation."},
{"role": "user", "content": prompt}
],
"temperature": 0.1,
"max_tokens": 500
},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
return None
result = response.json()
choices = result.get('choices', [])
if not choices:
return None
answer = choices[0].get('message', {}).get('content', '').strip()
# Debug: print first response
if not hasattr(self, '_debug_shown'):
print(f"\n [DEBUG] Sample response: {answer[:200]}...")
self._debug_shown = True
# Robust JSON cleaning
cleaned = answer
# Remove thinking tags (if any)
cleaned = re.sub(r'<think>.*?</think>', '', cleaned, flags=re.DOTALL | re.IGNORECASE)
cleaned = re.sub(r'<think>.*', '', cleaned, flags=re.DOTALL | re.IGNORECASE)
# Remove markdown code blocks
cleaned = re.sub(r'```json\s*', '', cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r'```\s*', '', cleaned)
cleaned = re.sub(r'`+', '', cleaned)
# Remove leading/trailing non-JSON text
cleaned = re.sub(r'^[^{\[]*', '', cleaned)
cleaned = re.sub(r'[^}\]]*$', '', cleaned) # FIXED: Proper regex closing
cleaned = cleaned.strip()
# Try to parse cleaned JSON
try:
extracted = json.loads(cleaned)
if extracted.get('type') in ['decision', 'pattern', 'failure']:
return extracted
if extracted.get('type') == 'none':
return None
except json.JSONDecodeError:
pass
# Fallback: find JSON objects in response
json_matches = re.findall(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', answer, re.DOTALL)
for json_str in reversed(json_matches):
try:
extracted = json.loads(json_str)
if extracted.get('type') in ['decision', 'pattern', 'failure']:
return extracted
except json.JSONDecodeError:
continue
return None
except Exception as e:
return None
async def process_conversation(self, conversation: Dict, query: str) -> bool:
"""Process a single conversation and add to knowledge base."""
# Extract knowledge
extracted = await self.extract_with_mkg(conversation) if self.use_mkg else None
if not extracted or extracted.get('type') == 'none':
self.skipped += 1
return False
try:
conv_id = conversation.get('conversation_id', 'unknown')
# Add to knowledge base based on type
if extracted['type'] == 'decision':
result = await add_decision(
description=extracted.get('description', '')[:200],
rationale=extracted.get('rationale', '')[:500],
alternatives=extracted.get('alternatives', [])[:5],
related_to=[]
)
self.decisions_added += 1
node_id = result['decision_id']
print(f" ā
D: {node_id[:12]}...")
elif extracted['type'] == 'pattern':
result = await add_pattern(
name=extracted.get('name', '')[:100],
context=extracted.get('context', '')[:200],
implementation=extracted.get('implementation', '')[:1000],
use_cases=[extracted.get('context', '')[:200]]
)
self.patterns_added += 1
node_id = result['pattern_id']
print(f" ā
P: {node_id[:12]}...")
elif extracted['type'] == 'failure':
result = await add_failure(
attempt=extracted.get('attempt', '')[:200],
reason_failed=extracted.get('reason', '')[:500],
lesson_learned=extracted.get('lesson', '')[:500],
alternative_solution=""
)
self.failures_added += 1
node_id = result['failure_id']
print(f" ā
F: {node_id[:12]}...")
return True
except Exception as e:
print(f" ā Failed to add: {e}")
self.errors += 1
return False
async def process_query_batch(self, query: str, limit: int = 50):
"""Process all conversations for a query in batches."""
print(f"\n{'='*60}")
print(f"Processing: {query}")
print(f"{'='*60}")
# Search
conversations = await self.search_agent_genesis(query, limit)
if not conversations:
print(f" ā¹ļø No conversations found")
return
print(f" ā
Found {len(conversations)} conversations")
print(f" š¤ Using: MKG Qwen3-Coder")
# Process in batches
for i in range(0, len(conversations), self.batch_size):
batch = conversations[i:i+self.batch_size]
batch_num = i // self.batch_size + 1
total_batches = (len(conversations) + self.batch_size - 1) // self.batch_size
print(f"\n š¦ Batch {batch_num}/{total_batches} ({len(batch)} conversations)")
for j, conv in enumerate(batch):
success = await self.process_conversation(conv, query)
self.total_processed += 1
# Rate limiting
if success:
await asyncio.sleep(0.5)
# Batch summary
total = self.decisions_added + self.patterns_added + self.failures_added
success_rate = (total / self.total_processed * 100) if self.total_processed > 0 else 0
print(f"\n š Progress: {total} nodes | {success_rate:.1f}% success | {self.skipped} skipped | {self.errors} errors")
async def run_enhanced_extraction(self, max_queries: Optional[int] = None):
"""Execute enhanced batch extraction with MKG."""
print("="*60)
print("AGENT GENESIS ENHANCED EXTRACTION (MKG)")
print("="*60)
print(f"\nStarted: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Batch Size: {self.batch_size}")
# Check MKG availability
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{self.mkg_url}/v1/models")
models = response.json()
model_name = models.get('data', [{}])[0].get('id', 'qwen3-coder')
print(f"\nā
MKG Model: {model_name} at {self.mkg_url}")
except Exception as e:
print(f"\nā ļø MKG not available: {e}")
return
# Load queries
queries = self.load_queries()
if max_queries:
queries = queries[:max_queries]
print(f"ā
Loaded {len(queries)} search queries")
start_time = time.time()
# Process each query
for i, query in enumerate(queries):
print(f"\n{'='*60}")
print(f"QUERY {i+1}/{len(queries)}")
print(f"{'='*60}")
await self.process_query_batch(query, limit=self.batch_size)
# Final summary
elapsed = time.time() - start_time
total = self.decisions_added + self.patterns_added + self.failures_added
success_rate = (total / self.total_processed * 100) if self.total_processed > 0 else 0
print("\n" + "="*60)
print("ā
ENHANCED EXTRACTION COMPLETE")
print("="*60)
print(f"\nResults:")
print(f" Conversations processed: {self.total_processed}")
print(f" Decisions: {self.decisions_added}")
print(f" Patterns: {self.patterns_added}")
print(f" Failures: {self.failures_added}")
print(f" Total nodes: {total}")
print(f" Success rate: {success_rate:.1f}%")
print(f" Skipped: {self.skipped}")
print(f" Errors: {self.errors}")
print(f"\nTime: {elapsed/60:.1f} minutes")
print(f"Avg per conversation: {elapsed/self.total_processed:.1f}s")
print(f"Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
async def main():
import argparse
parser = argparse.ArgumentParser(
description="Enhanced Agent Genesis extraction with MKG"
)
parser.add_argument(
'--queries-file',
default='ingestion/agent_genesis_queries.txt',
help='Search queries file'
)
parser.add_argument(
'--batch-size',
type=int,
default=20,
help='Conversations per batch'
)
parser.add_argument(
'--max-queries',
type=int,
help='Maximum queries to process (for testing)'
)
args = parser.parse_args()
extractor = AgentGenesisMKGExtractor(
queries_file=args.queries_file,
batch_size=args.batch_size,
use_mkg=True
)
await extractor.run_enhanced_extraction(max_queries=args.max_queries)
if __name__ == "__main__":
asyncio.run(main())