Skip to main content
Glama
Skynotdie

MCP Localization Project

by Skynotdie
unigui_web_module.py67.1 kB
#!/usr/bin/env python3 """ UniGUI-MCP 웹 모듈 - Playwright 기반 웹 브라우저 자동화 UniGUI 아키텍처 내에서 웹 브라우저 제어를 담당하는 모듈 주요 기능: - Playwright 기반 웹 브라우저 제어 - 코드 생성 세션 관리 - 브라우저 기본 조작 - 사용자 상호작용 - 페이지 정보 및 실행 - HTTP 요청 처리 """ import asyncio import json import time import uuid import logging from typing import Dict, List, Optional, Any, Union, Tuple from dataclasses import dataclass, asdict from pathlib import Path import base64 import os try: from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Playwright except ImportError: # Playwright가 설치되지 않은 경우에도 모듈이 로드되도록 함 print("⚠️ Playwright가 설치되지 않았습니다. 웹 기능이 제한됩니다.") Browser = BrowserContext = Page = Playwright = None from unigui_base import ( UniGUIModule, ModuleType, ActionType, ActionResult, UniGUIDatabase, CodegenSession, ElementInfo ) logger = logging.getLogger(__name__) @dataclass class WebSessionConfig: """웹 세션 설정""" browser_type: str = "chromium" # chromium, firefox, webkit headless: bool = False viewport_width: int = 1280 viewport_height: int = 720 timeout: int = 30000 user_agent: str = "" downloads_dir: str = "" @dataclass class WebElementInfo: """웹 요소 정보 확장""" selector: str element_type: str text: str attributes: Dict[str, str] position: Tuple[int, int] size: Tuple[int, int] visible: bool = True tag_name: str = "" inner_html: str = "" class WebModule(UniGUIModule): """UniGUI 웹 모듈 - Playwright 기반""" def __init__(self): """웹 모듈 초기화""" super().__init__(ModuleType.WEB) # Playwright 관련 객체들 self.playwright: Optional[Playwright] = None self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.page: Optional[Page] = None # 세션 설정 self.config = WebSessionConfig() # 데이터베이스 self.db = UniGUIDatabase("unigui_web.db") # 코드 생성 세션들 self.codegen_sessions: Dict[str, Dict[str, Any]] = {} # 상태 추적 self.last_action_time = time.time() self.action_count = 0 logger.info("🌐 WebModule 초기화 완료") async def initialize(self) -> bool: """웹 모듈 초기화""" try: if not async_playwright: logger.error("❌ Playwright가 설치되지 않았습니다") return False # Playwright 실행 self.playwright = await async_playwright().start() # 브라우저 타입 선택 if self.config.browser_type == "firefox": self.browser = await self.playwright.firefox.launch(headless=self.config.headless) elif self.config.browser_type == "webkit": self.browser = await self.playwright.webkit.launch(headless=self.config.headless) else: # chromium (기본값) self.browser = await self.playwright.chromium.launch(headless=self.config.headless) # 브라우저 컨텍스트 생성 context_options = { "viewport": { "width": self.config.viewport_width, "height": self.config.viewport_height } } if self.config.user_agent: context_options["user_agent"] = self.config.user_agent if self.config.downloads_dir: downloads_path = Path(self.config.downloads_dir) downloads_path.mkdir(exist_ok=True) context_options["accept_downloads"] = True self.context = await self.browser.new_context(**context_options) # 새 페이지 생성 self.page = await self.context.new_page() # 타임아웃 설정 self.page.set_default_timeout(self.config.timeout) self.active = True logger.info(f"✅ 웹 모듈 초기화 성공 ({self.config.browser_type})") return True except Exception as e: logger.error(f"❌ 웹 모듈 초기화 실패: {e}") await self.cleanup() return False async def cleanup(self): """웹 모듈 정리""" try: if self.page: await self.page.close() self.page = None if self.context: await self.context.close() self.context = None if self.browser: await self.browser.close() self.browser = None if self.playwright: await self.playwright.stop() self.playwright = None self.active = False logger.info("🔚 웹 모듈 정리 완료") except Exception as e: logger.error(f"❌ 웹 모듈 정리 실패: {e}") async def is_available(self) -> bool: """웹 모듈 사용 가능 여부 확인""" return ( self.active and self.playwright is not None and self.browser is not None and self.page is not None ) async def execute_action(self, action_type: ActionType, **kwargs) -> ActionResult: """액션 실행""" start_time = time.time() try: if not await self.is_available(): return ActionResult( success=False, data=None, message="Web module is not available", action_type=action_type, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "module_not_available"} ) # 액션 타입별 처리 if action_type == ActionType.NAVIGATE: result = await self._navigate(**kwargs) elif action_type == ActionType.CLICK: result = await self._click(**kwargs) elif action_type == ActionType.FILL: result = await self._fill(**kwargs) elif action_type == ActionType.SELECT: result = await self._select(**kwargs) elif action_type == ActionType.HOVER: result = await self._hover(**kwargs) elif action_type == ActionType.DRAG: result = await self._drag(**kwargs) elif action_type == ActionType.KEY_PRESS: result = await self._key_press(**kwargs) elif action_type == ActionType.SCREENSHOT: result = await self._screenshot(**kwargs) elif action_type == ActionType.EVALUATE: result = await self._evaluate(**kwargs) elif action_type == ActionType.HTTP_REQUEST: result = await self._http_request(**kwargs) elif action_type == ActionType.CUSTOM: result = await self._custom_action(**kwargs) else: result = ActionResult( success=False, data=None, message=f"Unsupported action type: {action_type}", action_type=action_type, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "unsupported_action"} ) # 실행 시간 업데이트 execution_time = time.time() - start_time result.execution_time = execution_time # 통계 업데이트 self.last_action_time = time.time() self.action_count += 1 return result except Exception as e: execution_time = time.time() - start_time logger.error(f"❌ 액션 실행 실패 ({action_type}): {e}") return ActionResult( success=False, data=None, message=f"Action execution failed: {str(e)}", action_type=action_type, module_type=ModuleType.WEB, execution_time=execution_time, metadata={"error": "execution_failed", "exception": str(e)} ) # ================================================================= # 코드 생성 세션 관리 (4개 기능) # ================================================================= async def start_codegen_session(self, options: Dict[str, Any]) -> ActionResult: """코드 생성 세션 시작""" try: # 필수 옵션 검증 if "outputPath" not in options: return ActionResult( success=False, data=None, message="outputPath is required", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "missing_output_path"} ) # 세션 ID 생성 session_id = str(uuid.uuid4()) # 데이터베이스에 세션 생성 db_session_id = await self.db.create_codegen_session( output_path=options["outputPath"], test_name_prefix=options.get("testNamePrefix", "GeneratedTest"), include_comments=options.get("includeComments", True) ) # 메모리에 세션 정보 저장 self.codegen_sessions[session_id] = { "db_session_id": db_session_id, "output_path": options["outputPath"], "test_name_prefix": options.get("testNamePrefix", "GeneratedTest"), "include_comments": options.get("includeComments", True), "actions": [], "active": True, "created_at": time.time() } logger.info(f"✅ 코드 생성 세션 시작: {session_id}") return ActionResult( success=True, data={"session_id": session_id}, message="Code generation session started", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"session_id": session_id} ) except Exception as e: logger.error(f"❌ 코드 생성 세션 시작 실패: {e}") return ActionResult( success=False, data=None, message=f"Failed to start codegen session: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def end_codegen_session(self, session_id: str) -> ActionResult: """코드 생성 세션 종료 및 파일 생성""" try: if session_id not in self.codegen_sessions: return ActionResult( success=False, data=None, message="Session not found", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "session_not_found"} ) session = self.codegen_sessions[session_id] # 테스트 파일 생성 test_content = await self._generate_test_file(session) # 출력 파일 경로 준비 output_path = Path(session["output_path"]) output_path.mkdir(parents=True, exist_ok=True) test_file_path = output_path / f"{session['test_name_prefix']}.spec.js" # 파일 쓰기 with open(test_file_path, 'w', encoding='utf-8') as f: f.write(test_content) # 데이터베이스 세션 종료 await self.db.close_session(session["db_session_id"]) # 메모리에서 세션 제거 del self.codegen_sessions[session_id] logger.info(f"✅ 코드 생성 세션 종료: {session_id}") return ActionResult( success=True, data={ "session_id": session_id, "output_file": str(test_file_path), "test_content": test_content }, message="Code generation session ended and test file created", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"output_file": str(test_file_path)} ) except Exception as e: logger.error(f"❌ 코드 생성 세션 종료 실패: {e}") return ActionResult( success=False, data=None, message=f"Failed to end codegen session: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def get_codegen_session(self, session_id: str) -> ActionResult: """코드 생성 세션 정보 조회""" try: if session_id not in self.codegen_sessions: return ActionResult( success=False, data=None, message="Session not found", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "session_not_found"} ) session = self.codegen_sessions[session_id] return ActionResult( success=True, data=session, message="Session information retrieved", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"session_id": session_id} ) except Exception as e: logger.error(f"❌ 코드 생성 세션 조회 실패: {e}") return ActionResult( success=False, data=None, message=f"Failed to get codegen session: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def clear_codegen_session(self, session_id: str) -> ActionResult: """코드 생성 세션 정리 (파일 생성하지 않고 삭제)""" try: if session_id not in self.codegen_sessions: return ActionResult( success=False, data=None, message="Session not found", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "session_not_found"} ) session = self.codegen_sessions[session_id] # 데이터베이스에서 세션 정리 await self.db.clear_session(session["db_session_id"]) # 메모리에서 세션 제거 del self.codegen_sessions[session_id] logger.info(f"✅ 코드 생성 세션 정리: {session_id}") return ActionResult( success=True, data={"session_id": session_id}, message="Code generation session cleared", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"session_id": session_id} ) except Exception as e: logger.error(f"❌ 코드 생성 세션 정리 실패: {e}") return ActionResult( success=False, data=None, message=f"Failed to clear codegen session: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def _generate_test_file(self, session: Dict[str, Any]) -> str: """테스트 파일 생성""" lines = [] # 헤더 추가 if session["include_comments"]: lines.extend([ f"// Generated test file: {session['test_name_prefix']}", f"// Created at: {time.strftime('%Y-%m-%d %H:%M:%S')}", f"// Total actions: {len(session['actions'])}", "", ]) # Playwright 임포트 lines.extend([ "const { test, expect } = require('@playwright/test');", "", f"test('{session['test_name_prefix']}', async ({{ page }}) => {{", ]) # 액션들을 코드로 변환 for action in session["actions"]: action_code = self._action_to_code(action, session["include_comments"]) if action_code: lines.extend([f" {line}" for line in action_code.split('\n')]) lines.append("") # 테스트 종료 lines.append("});") return '\n'.join(lines) def _action_to_code(self, action: Dict[str, Any], include_comments: bool) -> str: """액션을 Playwright 코드로 변환""" action_type = action.get("type", "") params = action.get("parameters", {}) lines = [] if include_comments: lines.append(f"// {action_type.title()} action") if action_type == "navigate": url = params.get("url", "") lines.append(f"await page.goto('{url}');") elif action_type == "click": selector = params.get("selector", "") lines.append(f"await page.click('{selector}');") elif action_type == "fill": selector = params.get("selector", "") value = params.get("value", "") lines.append(f"await page.fill('{selector}', '{value}');") elif action_type == "select": selector = params.get("selector", "") value = params.get("value", "") lines.append(f"await page.selectOption('{selector}', '{value}');") elif action_type == "hover": selector = params.get("selector", "") lines.append(f"await page.hover('{selector}');") elif action_type == "key_press": key = params.get("key", "") selector = params.get("selector") if selector: lines.append(f"await page.press('{selector}', '{key}');") else: lines.append(f"await page.keyboard.press('{key}');") elif action_type == "screenshot": path = params.get("path", "screenshot.png") lines.append(f"await page.screenshot({{ path: '{path}' }});") else: lines.append(f"// Custom action: {action_type}") if params: lines.append(f"// Parameters: {json.dumps(params)}") return '\n'.join(lines) # ================================================================= # 브라우저 기본 조작 (7개 기능) # ================================================================= async def _navigate(self, url: str, **kwargs) -> ActionResult: """페이지 내비게이션""" try: timeout = kwargs.get("timeout", self.config.timeout) wait_until = kwargs.get("waitUntil", "load") await self.page.goto(url, timeout=timeout, wait_until=wait_until) # 코드 생성 세션에 액션 기록 await self._record_action("navigate", {"url": url}) return ActionResult( success=True, data={"url": url, "title": await self.page.title()}, message=f"Navigated to {url}", action_type=ActionType.NAVIGATE, module_type=ModuleType.WEB, execution_time=0, metadata={"current_url": self.page.url} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Navigation failed: {str(e)}", action_type=ActionType.NAVIGATE, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def _screenshot(self, **kwargs) -> ActionResult: """스크린샷 촬영""" try: name = kwargs.get("name", f"screenshot_{int(time.time())}") full_page = kwargs.get("fullPage", False) selector = kwargs.get("selector") save_png = kwargs.get("savePng", True) store_base64 = kwargs.get("storeBase64", True) downloads_dir = kwargs.get("downloadsDir", self.config.downloads_dir or "downloads") # 디렉토리 생성 downloads_path = Path(downloads_dir) downloads_path.mkdir(parents=True, exist_ok=True) screenshot_data = None file_path = None if save_png: file_path = downloads_path / f"{name}.png" if selector: # 특정 요소 스크린샷 element = await self.page.locator(selector).first await element.screenshot(path=str(file_path)) else: # 페이지 스크린샷 await self.page.screenshot(path=str(file_path), full_page=full_page) if store_base64: if selector: element = await self.page.locator(selector).first screenshot_bytes = await element.screenshot() else: screenshot_bytes = await self.page.screenshot(full_page=full_page) screenshot_data = base64.b64encode(screenshot_bytes).decode() # 코드 생성 세션에 액션 기록 await self._record_action("screenshot", { "name": name, "fullPage": full_page, "selector": selector }) result_data = { "name": name, "file_path": str(file_path) if file_path else None, "base64_data": screenshot_data if store_base64 else None } return ActionResult( success=True, data=result_data, message="Screenshot captured successfully", action_type=ActionType.SCREENSHOT, module_type=ModuleType.WEB, execution_time=0, metadata={"selector": selector, "full_page": full_page} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Screenshot failed: {str(e)}", action_type=ActionType.SCREENSHOT, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def get_visible_text(self) -> ActionResult: """페이지의 보이는 텍스트 내용 가져오기""" try: # body 요소의 텍스트 추출 text_content = await self.page.locator("body").inner_text() return ActionResult( success=True, data={"text": text_content, "length": len(text_content)}, message="Visible text retrieved successfully", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"text_length": len(text_content)} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Failed to get visible text: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def get_visible_html(self) -> ActionResult: """페이지의 HTML 내용 가져오기""" try: html_content = await self.page.content() return ActionResult( success=True, data={"html": html_content, "length": len(html_content)}, message="HTML content retrieved successfully", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"html_length": len(html_content)} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Failed to get HTML content: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def go_back(self) -> ActionResult: """브라우저 뒤로 가기""" try: await self.page.go_back() return ActionResult( success=True, data={"url": self.page.url}, message="Navigated back successfully", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"current_url": self.page.url} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Go back failed: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def go_forward(self) -> ActionResult: """브라우저 앞으로 가기""" try: await self.page.go_forward() return ActionResult( success=True, data={"url": self.page.url}, message="Navigated forward successfully", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"current_url": self.page.url} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Go forward failed: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def close_browser(self) -> ActionResult: """브라우저 닫기""" try: await self.cleanup() return ActionResult( success=True, data=None, message="Browser closed successfully", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Browser close failed: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) # ================================================================= # 사용자 상호작용 (7개 기능) # ================================================================= async def _click(self, selector: str, **kwargs) -> ActionResult: """요소 클릭""" try: iframe_selector = kwargs.get("iframeSelector") if iframe_selector: # iframe 내부 요소 클릭 frame = self.page.frame_locator(iframe_selector) await frame.locator(selector).click() else: # 일반 요소 클릭 await self.page.click(selector) # 코드 생성 세션에 액션 기록 await self._record_action("click", { "selector": selector, "iframeSelector": iframe_selector }) return ActionResult( success=True, data={"selector": selector}, message=f"Clicked element: {selector}", action_type=ActionType.CLICK, module_type=ModuleType.WEB, execution_time=0, metadata={"selector": selector, "iframe": iframe_selector} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Click failed: {str(e)}", action_type=ActionType.CLICK, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "selector": selector} ) async def _fill(self, selector: str, value: str, **kwargs) -> ActionResult: """입력 필드 채우기""" try: await self.page.fill(selector, value) # 코드 생성 세션에 액션 기록 await self._record_action("fill", { "selector": selector, "value": value }) return ActionResult( success=True, data={"selector": selector, "value": value}, message=f"Filled input: {selector}", action_type=ActionType.FILL, module_type=ModuleType.WEB, execution_time=0, metadata={"selector": selector} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Fill failed: {str(e)}", action_type=ActionType.FILL, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "selector": selector} ) async def _select(self, selector: str, value: str, **kwargs) -> ActionResult: """선택 박스에서 옵션 선택""" try: await self.page.select_option(selector, value) # 코드 생성 세션에 액션 기록 await self._record_action("select", { "selector": selector, "value": value }) return ActionResult( success=True, data={"selector": selector, "value": value}, message=f"Selected option: {value}", action_type=ActionType.SELECT, module_type=ModuleType.WEB, execution_time=0, metadata={"selector": selector} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Select failed: {str(e)}", action_type=ActionType.SELECT, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "selector": selector} ) async def _hover(self, selector: str, **kwargs) -> ActionResult: """요소에 마우스 호버""" try: await self.page.hover(selector) # 코드 생성 세션에 액션 기록 await self._record_action("hover", {"selector": selector}) return ActionResult( success=True, data={"selector": selector}, message=f"Hovered over element: {selector}", action_type=ActionType.HOVER, module_type=ModuleType.WEB, execution_time=0, metadata={"selector": selector} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Hover failed: {str(e)}", action_type=ActionType.HOVER, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "selector": selector} ) async def _drag(self, source_selector: str, target_selector: str, **kwargs) -> ActionResult: """드래그 앤 드롭""" try: source_element = self.page.locator(source_selector) target_element = self.page.locator(target_selector) await source_element.drag_to(target_element) # 코드 생성 세션에 액션 기록 await self._record_action("drag", { "sourceSelector": source_selector, "targetSelector": target_selector }) return ActionResult( success=True, data={"source": source_selector, "target": target_selector}, message=f"Dragged from {source_selector} to {target_selector}", action_type=ActionType.DRAG, module_type=ModuleType.WEB, execution_time=0, metadata={"source": source_selector, "target": target_selector} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Drag failed: {str(e)}", action_type=ActionType.DRAG, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def _key_press(self, key: str, **kwargs) -> ActionResult: """키보드 키 누르기""" try: selector = kwargs.get("selector") if selector: # 특정 요소에 포커스하고 키 누르기 await self.page.press(selector, key) else: # 페이지 전체에서 키 누르기 await self.page.keyboard.press(key) # 코드 생성 세션에 액션 기록 await self._record_action("key_press", { "key": key, "selector": selector }) return ActionResult( success=True, data={"key": key, "selector": selector}, message=f"Pressed key: {key}", action_type=ActionType.KEY_PRESS, module_type=ModuleType.WEB, execution_time=0, metadata={"key": key, "selector": selector} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Key press failed: {str(e)}", action_type=ActionType.KEY_PRESS, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "key": key} ) async def save_as_pdf(self, **kwargs) -> ActionResult: """페이지를 PDF로 저장""" try: filename = kwargs.get("filename", "page.pdf") output_path = kwargs.get("outputPath", "downloads") format_type = kwargs.get("format", "A4") print_background = kwargs.get("printBackground", True) margin = kwargs.get("margin", {}) # 출력 디렉토리 생성 output_dir = Path(output_path) output_dir.mkdir(parents=True, exist_ok=True) pdf_path = output_dir / filename # PDF 생성 옵션 pdf_options = { "path": str(pdf_path), "format": format_type, "print_background": print_background } if margin: pdf_options["margin"] = margin await self.page.pdf(**pdf_options) return ActionResult( success=True, data={"file_path": str(pdf_path), "filename": filename}, message=f"PDF saved: {filename}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"pdf_path": str(pdf_path)} ) except Exception as e: return ActionResult( success=False, data=None, message=f"PDF save failed: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) # ================================================================= # 페이지 정보 및 실행 (4개 기능) # ================================================================= async def _evaluate(self, script: str, **kwargs) -> ActionResult: """JavaScript 실행""" try: result = await self.page.evaluate(script) # 코드 생성 세션에 액션 기록 await self._record_action("evaluate", {"script": script}) return ActionResult( success=True, data={"result": result, "script": script}, message="JavaScript executed successfully", action_type=ActionType.EVALUATE, module_type=ModuleType.WEB, execution_time=0, metadata={"script_length": len(script)} ) except Exception as e: return ActionResult( success=False, data=None, message=f"JavaScript execution failed: {str(e)}", action_type=ActionType.EVALUATE, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "script": script} ) async def get_console_logs(self, **kwargs) -> ActionResult: """콘솔 로그 가져오기""" try: log_type = kwargs.get("type", "all") limit = kwargs.get("limit", 100) search = kwargs.get("search", "") clear_logs = kwargs.get("clear", False) # 브라우저에서 콘솔 로그 수집 logs = [] # JavaScript로 콘솔 로그 가져오기 console_script = """ () => { if (window._consoleLogsBuffer) { return window._consoleLogsBuffer; } return []; } """ try: logs_data = await self.page.evaluate(console_script) if logs_data: logs = logs_data except: # 콘솔 로그 버퍼가 없는 경우 logs = [] # 필터링 if log_type != "all": logs = [log for log in logs if log.get("type") == log_type] if search: logs = [log for log in logs if search.lower() in str(log.get("text", "")).lower()] # 제한 if limit > 0: logs = logs[:limit] # 로그 지우기 if clear_logs: await self.page.evaluate("() => { if (window._consoleLogsBuffer) window._consoleLogsBuffer = []; }") return ActionResult( success=True, data={"logs": logs, "count": len(logs)}, message=f"Retrieved {len(logs)} console logs", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"log_type": log_type, "search": search} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Console logs retrieval failed: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def set_custom_user_agent(self, user_agent: str) -> ActionResult: """사용자 에이전트 설정""" try: # 새 컨텍스트에서 사용자 에이전트 설정 if self.context: await self.context.close() self.context = await self.browser.new_context( viewport={ "width": self.config.viewport_width, "height": self.config.viewport_height }, user_agent=user_agent ) # 기존 페이지 닫고 새 페이지 생성 if self.page: await self.page.close() self.page = await self.context.new_page() self.page.set_default_timeout(self.config.timeout) # 설정 업데이트 self.config.user_agent = user_agent return ActionResult( success=True, data={"user_agent": user_agent}, message="User agent set successfully", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"user_agent": user_agent} ) except Exception as e: return ActionResult( success=False, data=None, message=f"User agent setting failed: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def get_page_info(self) -> ActionResult: """페이지 정보 가져오기""" try: page_info = { "url": self.page.url, "title": await self.page.title(), "viewport": await self.page.viewport_size(), "user_agent": await self.page.evaluate("() => navigator.userAgent") } return ActionResult( success=True, data=page_info, message="Page information retrieved successfully", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata=page_info ) except Exception as e: return ActionResult( success=False, data=None, message=f"Page info retrieval failed: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) # ================================================================= # HTTP 요청 처리 (6개 기능) # ================================================================= async def _http_request(self, **kwargs) -> ActionResult: """HTTP 요청 처리""" try: method = kwargs.get("method", "GET").upper() url = kwargs.get("url", "") headers = kwargs.get("headers", {}) data = kwargs.get("data") token = kwargs.get("token") if not url: return ActionResult( success=False, data=None, message="URL is required for HTTP request", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "missing_url"} ) # 토큰이 있으면 헤더에 추가 if token: headers["Authorization"] = f"Bearer {token}" # 브라우저를 통한 HTTP 요청 if method == "GET": response = await self._http_get(url, headers) elif method == "POST": response = await self._http_post(url, data, headers) elif method == "PUT": response = await self._http_put(url, data, headers) elif method == "PATCH": response = await self._http_patch(url, data, headers) elif method == "DELETE": response = await self._http_delete(url, headers) else: return ActionResult( success=False, data=None, message=f"Unsupported HTTP method: {method}", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "unsupported_method", "method": method} ) return response except Exception as e: return ActionResult( success=False, data=None, message=f"HTTP request failed: {str(e)}", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) async def _http_get(self, url: str, headers: Dict[str, str] = None) -> ActionResult: """HTTP GET 요청""" try: # Playwright를 통한 GET 요청 response = await self.page.request.get(url, headers=headers or {}) response_data = { "status": response.status, "headers": await response.all_headers(), "body": await response.text(), "url": response.url } return ActionResult( success=response.ok, data=response_data, message=f"GET request to {url} completed", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"method": "GET", "status": response.status} ) except Exception as e: return ActionResult( success=False, data=None, message=f"GET request failed: {str(e)}", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "method": "GET"} ) async def _http_post(self, url: str, data: str = None, headers: Dict[str, str] = None) -> ActionResult: """HTTP POST 요청""" try: response = await self.page.request.post(url, data=data, headers=headers or {}) response_data = { "status": response.status, "headers": await response.all_headers(), "body": await response.text(), "url": response.url } return ActionResult( success=response.ok, data=response_data, message=f"POST request to {url} completed", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"method": "POST", "status": response.status} ) except Exception as e: return ActionResult( success=False, data=None, message=f"POST request failed: {str(e)}", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "method": "POST"} ) async def _http_put(self, url: str, data: str = None, headers: Dict[str, str] = None) -> ActionResult: """HTTP PUT 요청""" try: response = await self.page.request.put(url, data=data, headers=headers or {}) response_data = { "status": response.status, "headers": await response.all_headers(), "body": await response.text(), "url": response.url } return ActionResult( success=response.ok, data=response_data, message=f"PUT request to {url} completed", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"method": "PUT", "status": response.status} ) except Exception as e: return ActionResult( success=False, data=None, message=f"PUT request failed: {str(e)}", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "method": "PUT"} ) async def _http_patch(self, url: str, data: str = None, headers: Dict[str, str] = None) -> ActionResult: """HTTP PATCH 요청""" try: response = await self.page.request.patch(url, data=data, headers=headers or {}) response_data = { "status": response.status, "headers": await response.all_headers(), "body": await response.text(), "url": response.url } return ActionResult( success=response.ok, data=response_data, message=f"PATCH request to {url} completed", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"method": "PATCH", "status": response.status} ) except Exception as e: return ActionResult( success=False, data=None, message=f"PATCH request failed: {str(e)}", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "method": "PATCH"} ) async def _http_delete(self, url: str, headers: Dict[str, str] = None) -> ActionResult: """HTTP DELETE 요청""" try: response = await self.page.request.delete(url, headers=headers or {}) response_data = { "status": response.status, "headers": await response.all_headers(), "body": await response.text(), "url": response.url } return ActionResult( success=response.ok, data=response_data, message=f"DELETE request to {url} completed", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"method": "DELETE", "status": response.status} ) except Exception as e: return ActionResult( success=False, data=None, message=f"DELETE request failed: {str(e)}", action_type=ActionType.HTTP_REQUEST, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e), "method": "DELETE"} ) # ================================================================= # 도우미 메서드들 # ================================================================= async def _record_action(self, action_type: str, parameters: Dict[str, Any]): """코드 생성 세션에 액션 기록""" try: # 활성 세션들에 액션 추가 for session_id, session in self.codegen_sessions.items(): if session.get("active", False): session["actions"].append({ "type": action_type, "parameters": parameters, "timestamp": time.time() }) # 데이터베이스에도 업데이트 await self.db.update_session_actions( session["db_session_id"], session["actions"] ) except Exception as e: logger.warning(f"⚠️ 액션 기록 실패: {e}") async def _custom_action(self, **kwargs) -> ActionResult: """사용자 정의 액션 처리""" try: action_name = kwargs.get("action", "unknown") # 특별한 액션들 처리 if action_name == "start_codegen_session": return await self.start_codegen_session(kwargs.get("options", {})) elif action_name == "end_codegen_session": return await self.end_codegen_session(kwargs.get("session_id", "")) elif action_name == "get_codegen_session": return await self.get_codegen_session(kwargs.get("session_id", "")) elif action_name == "clear_codegen_session": return await self.clear_codegen_session(kwargs.get("session_id", "")) elif action_name == "get_visible_text": return await self.get_visible_text() elif action_name == "get_visible_html": return await self.get_visible_html() elif action_name == "go_back": return await self.go_back() elif action_name == "go_forward": return await self.go_forward() elif action_name == "close_browser": return await self.close_browser() elif action_name == "get_console_logs": return await self.get_console_logs(**kwargs) elif action_name == "set_custom_user_agent": return await self.set_custom_user_agent(kwargs.get("user_agent", "")) elif action_name == "get_page_info": return await self.get_page_info() elif action_name == "save_as_pdf": return await self.save_as_pdf(**kwargs) else: return ActionResult( success=False, data=None, message=f"Unknown custom action: {action_name}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": "unknown_action", "action": action_name} ) except Exception as e: return ActionResult( success=False, data=None, message=f"Custom action failed: {str(e)}", action_type=ActionType.CUSTOM, module_type=ModuleType.WEB, execution_time=0, metadata={"error": str(e)} ) # ================================================================= # 통합 WebModule MCP 인터페이스 # ================================================================= class WebModuleMCP: """WebModule MCP 인터페이스""" def __init__(self): self.web_module = WebModule() async def initialize(self) -> bool: """MCP 초기화""" return await self.web_module.initialize() async def cleanup(self): """MCP 정리""" await self.web_module.cleanup() # 코드 생성 세션 관리 async def start_codegen_session(self, options: Dict[str, Any]) -> Dict[str, Any]: """코드 생성 세션 시작""" result = await self.web_module.start_codegen_session(options) return asdict(result) async def end_codegen_session(self, session_id: str) -> Dict[str, Any]: """코드 생성 세션 종료""" result = await self.web_module.end_codegen_session(session_id) return asdict(result) async def get_codegen_session(self, session_id: str) -> Dict[str, Any]: """코드 생성 세션 정보 조회""" result = await self.web_module.get_codegen_session(session_id) return asdict(result) async def clear_codegen_session(self, session_id: str) -> Dict[str, Any]: """코드 생성 세션 정리""" result = await self.web_module.clear_codegen_session(session_id) return asdict(result) # 브라우저 기본 조작 async def playwright_navigate(self, url: str, **kwargs) -> Dict[str, Any]: """페이지 내비게이션""" result = await self.web_module.execute_action(ActionType.NAVIGATE, url=url, **kwargs) return asdict(result) async def playwright_screenshot(self, **kwargs) -> Dict[str, Any]: """스크린샷 촬영""" result = await self.web_module.execute_action(ActionType.SCREENSHOT, **kwargs) return asdict(result) async def playwright_get_visible_text(self) -> Dict[str, Any]: """페이지 텍스트 가져오기""" result = await self.web_module.get_visible_text() return asdict(result) async def playwright_get_visible_html(self) -> Dict[str, Any]: """페이지 HTML 가져오기""" result = await self.web_module.get_visible_html() return asdict(result) async def playwright_go_back(self) -> Dict[str, Any]: """뒤로 가기""" result = await self.web_module.go_back() return asdict(result) async def playwright_go_forward(self) -> Dict[str, Any]: """앞으로 가기""" result = await self.web_module.go_forward() return asdict(result) async def playwright_close(self) -> Dict[str, Any]: """브라우저 닫기""" result = await self.web_module.close_browser() return asdict(result) # 사용자 상호작용 async def playwright_click(self, selector: str, **kwargs) -> Dict[str, Any]: """요소 클릭""" result = await self.web_module.execute_action(ActionType.CLICK, selector=selector, **kwargs) return asdict(result) async def playwright_iframe_click(self, iframe_selector: str, selector: str) -> Dict[str, Any]: """iframe 내부 요소 클릭""" result = await self.web_module.execute_action( ActionType.CLICK, selector=selector, iframeSelector=iframe_selector ) return asdict(result) async def playwright_fill(self, selector: str, value: str) -> Dict[str, Any]: """입력 필드 채우기""" result = await self.web_module.execute_action(ActionType.FILL, selector=selector, value=value) return asdict(result) async def playwright_select(self, selector: str, value: str) -> Dict[str, Any]: """선택 박스 옵션 선택""" result = await self.web_module.execute_action(ActionType.SELECT, selector=selector, value=value) return asdict(result) async def playwright_hover(self, selector: str) -> Dict[str, Any]: """요소에 마우스 호버""" result = await self.web_module.execute_action(ActionType.HOVER, selector=selector) return asdict(result) async def playwright_drag(self, source_selector: str, target_selector: str) -> Dict[str, Any]: """드래그 앤 드롭""" result = await self.web_module.execute_action( ActionType.DRAG, source_selector=source_selector, target_selector=target_selector ) return asdict(result) async def playwright_press_key(self, key: str, selector: str = None) -> Dict[str, Any]: """키보드 키 누르기""" result = await self.web_module.execute_action(ActionType.KEY_PRESS, key=key, selector=selector) return asdict(result) async def playwright_save_as_pdf(self, **kwargs) -> Dict[str, Any]: """PDF로 저장""" result = await self.web_module.save_as_pdf(**kwargs) return asdict(result) # 페이지 정보 및 실행 async def playwright_evaluate(self, script: str) -> Dict[str, Any]: """JavaScript 실행""" result = await self.web_module.execute_action(ActionType.EVALUATE, script=script) return asdict(result) async def playwright_console_logs(self, **kwargs) -> Dict[str, Any]: """콘솔 로그 가져오기""" result = await self.web_module.get_console_logs(**kwargs) return asdict(result) async def playwright_custom_user_agent(self, user_agent: str) -> Dict[str, Any]: """사용자 에이전트 설정""" result = await self.web_module.set_custom_user_agent(user_agent) return asdict(result) # HTTP 요청 async def playwright_get(self, url: str, **kwargs) -> Dict[str, Any]: """HTTP GET 요청""" result = await self.web_module.execute_action(ActionType.HTTP_REQUEST, method="GET", url=url, **kwargs) return asdict(result) async def playwright_post(self, url: str, value: str, **kwargs) -> Dict[str, Any]: """HTTP POST 요청""" result = await self.web_module.execute_action(ActionType.HTTP_REQUEST, method="POST", url=url, data=value, **kwargs) return asdict(result) async def playwright_put(self, url: str, value: str, **kwargs) -> Dict[str, Any]: """HTTP PUT 요청""" result = await self.web_module.execute_action(ActionType.HTTP_REQUEST, method="PUT", url=url, data=value, **kwargs) return asdict(result) async def playwright_patch(self, url: str, value: str, **kwargs) -> Dict[str, Any]: """HTTP PATCH 요청""" result = await self.web_module.execute_action(ActionType.HTTP_REQUEST, method="PATCH", url=url, data=value, **kwargs) return asdict(result) async def playwright_delete(self, url: str, **kwargs) -> Dict[str, Any]: """HTTP DELETE 요청""" result = await self.web_module.execute_action(ActionType.HTTP_REQUEST, method="DELETE", url=url, **kwargs) return asdict(result) # 테스트 함수 async def test_web_module(): """웹 모듈 테스트""" print("🧪 UniGUI WebModule 테스트 시작...") web_mcp = WebModuleMCP() try: # 초기화 print("\n📱 WebModule 초기화...") initialized = await web_mcp.initialize() print(f"초기화 결과: {initialized}") if not initialized: print("❌ WebModule 초기화 실패 (Playwright 미설치 가능)") return # 코드 생성 세션 테스트 print("\n🎬 코드 생성 세션 테스트...") session_result = await web_mcp.start_codegen_session({ "outputPath": "/tmp/test_output", "testNamePrefix": "UniGUITest", "includeComments": True }) print(f"세션 생성: {session_result['success']}") if session_result['success']: session_id = session_result['data']['session_id'] print(f"세션 ID: {session_id}") # 네비게이션 테스트 print("\n🔗 네비게이션 테스트...") nav_result = await web_mcp.playwright_navigate("https://example.com") print(f"네비게이션: {nav_result['success']}") # 스크린샷 테스트 print("\n📸 스크린샷 테스트...") screenshot_result = await web_mcp.playwright_screenshot( name="test_screenshot", savePng=True, storeBase64=False ) print(f"스크린샷: {screenshot_result['success']}") # 세션 종료 print("\n🔚 세션 종료...") end_result = await web_mcp.end_codegen_session(session_id) print(f"세션 종료: {end_result['success']}") except Exception as e: print(f"❌ 테스트 실패: {e}") finally: # 정리 await web_mcp.cleanup() print("\n🎯 WebModule 테스트 완료!") if __name__ == "__main__": asyncio.run(test_web_module())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Skynotdie/mky'

If you have feedback or need assistance with the MCP directory API, please join our Discord server