database.pyā¢9.77 kB
"""
Database operations for Skills Registry
"""
import os
from typing import List, Optional, Dict, Any, Tuple
from datetime import datetime, timedelta
import psycopg2
from psycopg2.extras import RealDictCursor
import json
from .models import Skill, SkillMetadata, SkillRating, SkillFavorite
class Database:
"""PostgreSQL database operations"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self._ensure_connection()
def _ensure_connection(self):
"""Ensure database connection is available"""
try:
conn = psycopg2.connect(self.connection_string)
conn.close()
except Exception as e:
print(f"Database connection failed: {e}")
raise
def _get_conn(self):
"""Get a new database connection"""
return psycopg2.connect(self.connection_string, cursor_factory=RealDictCursor)
async def get_skill(self, skill_id: str) -> Optional[Skill]:
"""Fetch a skill by ID"""
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute("""
SELECT s.*,
COALESCE(ss.rating_avg, 0) as rating_avg,
COALESCE(ss.rating_count, 0) as rating_count
FROM skills s
LEFT JOIN skill_stats ss ON s.skill_id = ss.skill_id
WHERE s.skill_id = %s
""", (skill_id,))
row = cur.fetchone()
if not row:
return None
# Convert to Skill model
return Skill(**dict(row))
finally:
conn.close()
async def create_skill(
self,
name: str,
description: str,
skill_md_content: str,
category: str,
tags: List[str],
author_id: str,
visibility: str = "private",
ai_generated: bool = False
) -> str:
"""Create a new skill"""
import uuid
from slugify import slugify
# Generate skill_id from name
base_id = slugify(name)
skill_id = f"{base_id}-{uuid.uuid4().hex[:8]}"
# Store file locally
storage_path = os.getenv("SKILLS_STORAGE_PATH", "/app/skills_storage")
os.makedirs(storage_path, exist_ok=True)
file_path = os.path.join(storage_path, f"{skill_id}.md")
with open(file_path, 'w') as f:
f.write(skill_md_content)
skill_md_url = f"file://{file_path}"
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO skills (
skill_id, name, description, category, tags,
author_id, visibility, ai_generated,
skill_md_url, skill_md_content, version
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
skill_id, name, description, category, tags,
author_id, visibility, ai_generated,
skill_md_url, skill_md_content, "1.0.0"
))
conn.commit()
return skill_id
finally:
conn.close()
async def get_categories(self) -> List[Dict[str, Any]]:
"""Get all categories with counts"""
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute("""
SELECT category, COUNT(*) as count
FROM skills
WHERE visibility = 'public'
GROUP BY category
ORDER BY count DESC
""")
rows = cur.fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
async def add_favorite(self, skill_id: str, user_id: str) -> bool:
"""Add skill to favorites"""
conn = self._get_conn()
try:
with conn.cursor() as cur:
# Check if skill exists
cur.execute("SELECT 1 FROM skills WHERE skill_id = %s", (skill_id,))
if not cur.fetchone():
return False
# Insert favorite (ignore if already exists)
cur.execute("""
INSERT INTO skill_favorites (skill_id, user_id)
VALUES (%s, %s)
ON CONFLICT (skill_id, user_id) DO NOTHING
""", (skill_id, user_id))
conn.commit()
return True
finally:
conn.close()
async def remove_favorite(self, skill_id: str, user_id: str) -> bool:
"""Remove skill from favorites"""
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute("""
DELETE FROM skill_favorites
WHERE skill_id = %s AND user_id = %s
""", (skill_id, user_id))
conn.commit()
return cur.rowcount > 0
finally:
conn.close()
async def get_favorites(self, user_id: str) -> List[SkillMetadata]:
"""Get user's favorite skills"""
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute("""
SELECT s.*,
COALESCE(ss.rating_avg, 0) as rating_avg,
COALESCE(ss.rating_count, 0) as rating_count
FROM skills s
JOIN skill_favorites sf ON s.skill_id = sf.skill_id
LEFT JOIN skill_stats ss ON s.skill_id = ss.skill_id
WHERE sf.user_id = %s
ORDER BY sf.created_at DESC
""", (user_id,))
rows = cur.fetchall()
return [SkillMetadata(**dict(row)) for row in rows]
finally:
conn.close()
async def rate_skill(
self,
skill_id: str,
user_id: str,
rating: int,
review: Optional[str] = None
) -> Tuple[bool, float]:
"""Rate a skill, returns (success, new_avg_rating)"""
conn = self._get_conn()
try:
with conn.cursor() as cur:
# Check if skill exists
cur.execute("SELECT 1 FROM skills WHERE skill_id = %s", (skill_id,))
if not cur.fetchone():
return False, 0.0
# Upsert rating
cur.execute("""
INSERT INTO skill_ratings (skill_id, user_id, rating, review)
VALUES (%s, %s, %s, %s)
ON CONFLICT (skill_id, user_id)
DO UPDATE SET rating = EXCLUDED.rating, review = EXCLUDED.review
""", (skill_id, user_id, rating, review))
# Get new average
cur.execute("""
SELECT AVG(rating) as avg_rating
FROM skill_ratings
WHERE skill_id = %s
""", (skill_id,))
result = cur.fetchone()
avg_rating = float(result['avg_rating']) if result else 0.0
conn.commit()
return True, avg_rating
finally:
conn.close()
async def track_usage(self, skill_id: str, user_id: str):
"""Track skill usage for analytics"""
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO skill_usage (skill_id, user_id)
VALUES (%s, %s)
""", (skill_id, user_id))
# Increment download counter
cur.execute("""
UPDATE skills SET downloads = downloads + 1
WHERE skill_id = %s
""", (skill_id,))
conn.commit()
finally:
conn.close()
async def get_trending(self, limit: int, timeframe: str) -> List[Dict[str, Any]]:
"""Get trending skills"""
# Calculate time window
windows = {
"day": timedelta(days=1),
"week": timedelta(weeks=1),
"month": timedelta(days=30)
}
time_threshold = datetime.now() - windows[timeframe]
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute("""
SELECT
s.skill_id,
s.name,
s.description,
s.category,
s.tags,
COUNT(su.usage_id) as usage_count,
COALESCE(ss.rating_avg, 0) as rating_avg
FROM skills s
JOIN skill_usage su ON s.skill_id = su.skill_id
LEFT JOIN skill_stats ss ON s.skill_id = ss.skill_id
WHERE su.used_at >= %s AND s.visibility = 'public'
GROUP BY s.skill_id, s.name, s.description, s.category, s.tags, ss.rating_avg
ORDER BY usage_count DESC
LIMIT %s
""", (time_threshold, limit))
rows = cur.fetchall()
return [dict(row) for row in rows]
finally:
conn.close()