Skip to main content
Glama
EmjayAhn
by EmjayAhn
main.py12.3 kB
from fastapi import FastAPI, HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from pydantic import BaseModel, EmailStr, validator from typing import List, Optional, Dict, Any from datetime import datetime, timedelta from jose import jwt import os from uuid import uuid4 import motor.motor_asyncio from passlib.context import CryptContext app = FastAPI(title="Pensieve API", version="1.0.0") # CORS 설정 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 환경 변수 JWT_SECRET = os.getenv("JWT_SECRET", "your-secret-key-change-in-production") JWT_ALGORITHM = "HS256" JWT_EXPIRATION_HOURS = 24 MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017") DATABASE_NAME = "pensieve" # MongoDB 연결 client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL) db = client[DATABASE_NAME] users_collection = db.users conversations_collection = db.conversations # 보안 security = HTTPBearer() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 모델 class UserCreate(BaseModel): email: EmailStr # 이메일 형식 검증 password: str @validator('password') def validate_password(cls, v): if len(v) < 6: raise ValueError('비밀번호는 최소 6자 이상이어야 합니다') if len(v) > 100: raise ValueError('비밀번호는 100자 이하여야 합니다') return v class UserLogin(BaseModel): email: str password: str class Message(BaseModel): role: str content: str class ConversationCreate(BaseModel): messages: List[Message] metadata: Optional[Dict[str, Any]] = None class ConversationUpdate(BaseModel): messages: List[Message] class Token(BaseModel): access_token: str token_type: str = "bearer" # 헬퍼 함수 def create_access_token(data: dict): to_encode = data.copy() expire = datetime.utcnow() + timedelta(hours=JWT_EXPIRATION_HOURS) to_encode.update({"exp": expire}) return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM) async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) email: str = payload.get("sub") if email is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", ) except jwt.JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", ) user = await users_collection.find_one({"email": email}) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", ) return user # 인증 엔드포인트 @app.post("/auth/register", response_model=Token) async def register(user: UserCreate): # 이메일 중복 확인 if await users_collection.find_one({"email": user.email}): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # 사용자 생성 hashed_password = pwd_context.hash(user.password) user_doc = { "_id": str(uuid4()), "email": user.email, "hashed_password": hashed_password, "created_at": datetime.utcnow() } await users_collection.insert_one(user_doc) # 토큰 생성 access_token = create_access_token(data={"sub": user.email}) return {"access_token": access_token} @app.post("/auth/login", response_model=Token) async def login(user: UserLogin): # 사용자 확인 db_user = await users_collection.find_one({"email": user.email}) if not db_user or not pwd_context.verify(user.password, db_user["hashed_password"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password" ) # 토큰 생성 access_token = create_access_token(data={"sub": user.email}) return {"access_token": access_token} # 대화 엔드포인트 @app.post("/conversations") async def create_conversation( conversation: ConversationCreate, current_user: dict = Depends(get_current_user) ): conversation_doc = { "_id": str(uuid4()), "user_id": current_user["_id"], "messages": [msg.dict() for msg in conversation.messages], "metadata": conversation.metadata or {}, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } await conversations_collection.insert_one(conversation_doc) return {"id": conversation_doc["_id"], "message": "Conversation created successfully"} @app.get("/conversations") async def list_conversations( limit: int = 50, offset: int = 0, current_user: dict = Depends(get_current_user) ): cursor = conversations_collection.find( {"user_id": current_user["_id"]} ).skip(offset).limit(limit) conversations = [] async for conv in cursor: conversations.append({ "id": conv["_id"], "metadata": conv.get("metadata", {}), "created_at": conv["created_at"], "updated_at": conv["updated_at"], "message_count": len(conv.get("messages", [])) }) return conversations @app.get("/conversations/{conversation_id}") async def get_conversation( conversation_id: str, current_user: dict = Depends(get_current_user) ): conversation = await conversations_collection.find_one({ "_id": conversation_id, "user_id": current_user["_id"] }) if not conversation: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Conversation not found" ) return conversation @app.put("/conversations/{conversation_id}") async def update_conversation( conversation_id: str, update: ConversationUpdate, current_user: dict = Depends(get_current_user) ): result = await conversations_collection.update_one( {"_id": conversation_id, "user_id": current_user["_id"]}, { "$set": { "messages": [msg.dict() for msg in update.messages], "updated_at": datetime.utcnow() } } ) if result.matched_count == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Conversation not found" ) return {"message": "Conversation updated successfully"} @app.post("/conversations/{conversation_id}/messages") async def append_messages( conversation_id: str, messages: List[Message], current_user: dict = Depends(get_current_user) ): result = await conversations_collection.update_one( {"_id": conversation_id, "user_id": current_user["_id"]}, { "$push": {"messages": {"$each": [msg.dict() for msg in messages]}}, "$set": {"updated_at": datetime.utcnow()} } ) if result.matched_count == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Conversation not found" ) return {"message": f"Added {len(messages)} messages to conversation"} @app.delete("/conversations/{conversation_id}") async def delete_conversation( conversation_id: str, current_user: dict = Depends(get_current_user) ): result = await conversations_collection.delete_one({ "_id": conversation_id, "user_id": current_user["_id"] }) if result.deleted_count == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Conversation not found" ) return {"message": "Conversation deleted successfully"} @app.get("/conversations/search") async def search_conversations( query: str, limit: int = 20, current_user: dict = Depends(get_current_user) ): # MongoDB 텍스트 검색 사용 cursor = conversations_collection.find( { "user_id": current_user["_id"], "$text": {"$search": query} } ).limit(limit) results = [] async for conv in cursor: results.append({ "id": conv["_id"], "metadata": conv.get("metadata", {}), "created_at": conv["created_at"], "message_count": len(conv.get("messages", [])) }) return results # 웹 페이지 라우트 @app.get("/", response_class=HTMLResponse) async def dashboard(): """메인 대시보드 페이지""" with open("static/index.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) @app.get("/dashboard", response_class=HTMLResponse) async def dashboard(): """대화 관리 대시보드""" with open("static/dashboard.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) @app.get("/conversation/{conversation_id}", response_class=HTMLResponse) async def conversation_detail(conversation_id: str): """대화 상세 페이지""" with open("static/conversation.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) @app.get("/features.html", response_class=HTMLResponse) async def features_page(): """기능 안내 페이지""" with open("static/features.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) @app.get("/setup.html", response_class=HTMLResponse) async def setup_page(): """MCP 설정 가이드 페이지""" with open("static/setup.html", "r", encoding="utf-8") as f: return HTMLResponse(content=f.read()) # API 라우트 (프론트엔드에서 사용) @app.get("/api/me") async def get_current_user_info(current_user: dict = Depends(get_current_user)): """현재 로그인한 사용자 정보""" return { "id": current_user["_id"], "email": current_user["email"], "created_at": current_user["created_at"] } @app.post("/api/login") async def api_login(user: UserLogin): """로그인 API""" return await login(user) @app.post("/api/register") async def api_register(user: UserCreate): """회원가입 API""" return await register(user) @app.get("/api/conversations") async def api_get_conversations( limit: int = 20, skip: int = 0, current_user: dict = Depends(get_current_user) ): """사용자의 대화 목록 조회""" cursor = conversations_collection.find( {"user_id": current_user["_id"]} ).sort("created_at", -1).limit(limit).skip(skip) conversations = [] async for conv in cursor: conversations.append({ "id": conv["_id"], "messages": conv.get("messages", []), "metadata": conv.get("metadata", {}), "created_at": conv["created_at"], "updated_at": conv["updated_at"] }) return conversations @app.get("/api/conversations/{conversation_id}") async def api_get_conversation( conversation_id: str, current_user: dict = Depends(get_current_user) ): """특정 대화 상세 조회""" return await get_conversation(conversation_id, current_user) @app.delete("/api/conversations/{conversation_id}") async def api_delete_conversation( conversation_id: str, current_user: dict = Depends(get_current_user) ): """대화 삭제""" return await delete_conversation(conversation_id, current_user) # 기존 API 호환성을 위한 라우트 @app.get("/health") async def health_check(): return {"message": "Pensieve API", "version": "1.0.0", "status": "healthy"} # 정적 파일 서빙 (라우터들 이후에 마운트) app.mount("/static", StaticFiles(directory="static"), name="static") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/EmjayAhn/pensieve-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server