Skip to main content
Glama
SeoNaRu

Korean Company Information MCP Server

by SeoNaRu
test_gemini.py25.6 kB
#!/usr/bin/env python3 """ Gemini Function Calling 테스트 스크립트 이 스크립트는 company-info-mcp 서버의 도구들을 Gemini API를 통해 테스트합니다. """ import os import json import asyncio import time import re from typing import Optional from dotenv import load_dotenv import google.generativeai as genai from google.generativeai.generative_models import GenerativeModel from google.api_core import exceptions as google_exceptions # .env 파일 로드 load_dotenv() # Gemini API 키 설정 GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") DART_API_KEY = os.environ.get("DART_API_KEY") if not GEMINI_API_KEY: print("❌ GEMINI_API_KEY 환경 변수를 설정해주세요.") print(" .env 파일에 GEMINI_API_KEY=your_gemini_api_key 추가") exit(1) if not DART_API_KEY: print("⚠️ DART_API_KEY가 설정되지 않았습니다. 일부 기능이 작동하지 않을 수 있습니다.") print(" .env 파일에 DART_API_KEY=your_dart_api_key 추가") # Gemini API 키 설정 # Gemini API 키 설정 # 방법 1: 환경 변수 설정 if GEMINI_API_KEY: os.environ["GOOGLE_API_KEY"] = GEMINI_API_KEY # 방법 2: genai.configure() 시도 (런타임에 동적으로 호출) if GEMINI_API_KEY: try: # getattr을 사용하여 런타임에 configure 함수 확인 configure_func = getattr(genai, 'configure', None) if configure_func: configure_func(api_key=GEMINI_API_KEY) print("✅ genai.configure()로 API 키 설정 완료") except Exception as e: # configure가 없거나 실패하면 환경 변수만 사용 pass def get_mcp_tools() -> Optional[list]: """ MCP 서버에서 도구 목록을 가져와서 Gemini Function Calling 형식으로 변환합니다. """ import requests try: response = requests.get("http://localhost:8097/tools", timeout=10) response.raise_for_status() tools_list = response.json() # Gemini Function Calling 형식으로 변환 function_declarations = [] for tool in tools_list: tool_name = tool.get("name", "") description = tool.get("description", "") parameters = tool.get("parameters", {}) # health 도구는 제외 (테스트용이 아님) if tool_name == "health": continue # Gemini 형식으로 변환 function_declaration = { "name": tool_name, "description": description, "parameters": parameters } function_declarations.append(function_declaration) return [{ "function_declarations": function_declarations }] except Exception as e: print(f"⚠️ MCP 서버에서 도구 목록을 가져오지 못했습니다: {str(e)}") print(" 하드코딩된 도구 목록을 사용합니다.") return None # Function Calling을 위한 도구 정의 (Gemini 형식) # MCP 서버에서 동적으로 가져오거나, fallback으로 하드코딩된 목록 사용 TOOLS = [ { "function_declarations": [ { "name": "search_company_tool", "description": "기업을 회사명으로 검색합니다.", "parameters": { "type": "object", "properties": { "company_name": { "type": "string", "description": "검색할 회사명 (예: '삼성전자', '네이버')" } }, "required": ["company_name"] } }, { "name": "get_company_overview_tool", "description": "기업의 기본정보를 조회합니다.", "parameters": { "type": "object", "properties": { "corp_code": { "type": "string", "description": "기업 고유번호" }, "company_name": { "type": "string", "description": "회사명 (corp_code가 없을 경우 사용)" } } } }, { "name": "get_financial_statement_tool", "description": "기업의 재무제표를 조회합니다.", "parameters": { "type": "object", "properties": { "corp_code": { "type": "string", "description": "기업 고유번호" }, "company_name": { "type": "string", "description": "회사명 (corp_code가 없을 경우 사용)" }, "bsns_year": { "type": "string", "description": "사업연도 (YYYY 형식, 기본값: 최근 연도)" }, "reprt_code": { "type": "string", "description": "보고서 코드 (11011: 사업보고서, 11013: 분기보고서, 기본값: 11011)" } } } }, { "name": "analyze_financial_trend_tool", "description": "기업의 재무 추이를 분석합니다. (최근 N년)", "parameters": { "type": "object", "properties": { "corp_code": { "type": "string", "description": "기업 고유번호" }, "years": { "type": "integer", "description": "분석할 연수 (기본값: 5, 최대: 10)" } }, "required": ["corp_code"] } }, { "name": "get_public_disclosure_tool", "description": "기업의 공시정보를 조회합니다.", "parameters": { "type": "object", "properties": { "corp_code": { "type": "string", "description": "기업 고유번호" }, "bgn_de": { "type": "string", "description": "시작일 (YYYYMMDD 형식, 기본값: 최근 1개월)" }, "end_de": { "type": "string", "description": "종료일 (YYYYMMDD 형식, 기본값: 오늘)" }, "page_no": { "type": "integer", "description": "페이지 번호 (기본값: 1)" }, "page_count": { "type": "integer", "description": "페이지당 건수 (기본값: 10)" } }, "required": ["corp_code"] } }, { "name": "get_executives_tool", "description": "기업의 임원정보를 조회합니다.", "parameters": { "type": "object", "properties": { "corp_code": { "type": "string", "description": "기업 고유번호" }, "company_name": { "type": "string", "description": "회사명 (corp_code가 없을 경우 사용)" }, "bsns_year": { "type": "string", "description": "사업연도 (YYYY 형식, 기본값: 최근 연도)" }, "reprt_code": { "type": "string", "description": "보고서 코드 (11011: 사업보고서, 기본값: 11011)" } } } }, { "name": "get_shareholders_tool", "description": "지분보고서를 조회합니다.", "parameters": { "type": "object", "properties": { "corp_code": { "type": "string", "description": "기업 고유번호" }, "company_name": { "type": "string", "description": "회사명 (corp_code가 없을 경우 사용)" }, "bsns_year": { "type": "string", "description": "사업연도 (YYYY 형식, 기본값: 최근 연도)" }, "reprt_code": { "type": "string", "description": "보고서 코드 (11011: 사업보고서, 11013: 분기보고서, 기본값: 11011)" } } } } ] } ] def check_server_health() -> bool: """ MCP 서버가 실행 중인지 확인합니다. """ import requests try: response = requests.get("http://localhost:8097/health", timeout=5) return response.status_code == 200 except: return False def call_mcp_tool(tool_name: str, **kwargs) -> dict: """ MCP 서버의 도구를 호출합니다. HTTP 모드로 실행 중인 서버에 요청을 보냅니다. """ import requests url = f"http://localhost:8097/tools/{tool_name}" # env에 DART_API_KEY 포함 payload = { **kwargs, "env": { "DART_API_KEY": DART_API_KEY } } try: response = requests.post(url, json=payload, timeout=60) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: return {"error": f"도구 호출 실패: {str(e)}"} def handle_function_call(function_name: str, args: dict) -> dict: """ Gemini가 요청한 함수를 실행하고 결과를 반환합니다. """ print(f"\n🔧 함수 호출: {function_name}") # 타입 변환: Gemini가 float로 전달하는 정수 파라미터를 정수로 변환 integer_params = { "get_public_disclosure_tool": ["page_no", "page_count"], "analyze_financial_trend_tool": ["years"], } if function_name in integer_params: for param_name in integer_params[function_name]: if param_name in args and isinstance(args[param_name], float): args[param_name] = int(args[param_name]) # 기본값 설정 if function_name == "get_financial_statement_tool": if "reprt_code" not in args: args["reprt_code"] = "11011" elif function_name == "analyze_financial_trend_tool": if "years" not in args: args["years"] = 5 elif isinstance(args.get("years"), float): args["years"] = int(args["years"]) elif function_name == "get_public_disclosure_tool": if "page_no" not in args: args["page_no"] = 1 elif isinstance(args.get("page_no"), float): args["page_no"] = int(args["page_no"]) if "page_count" not in args: args["page_count"] = 10 elif isinstance(args.get("page_count"), float): args["page_count"] = int(args["page_count"]) elif function_name == "get_executives_tool": if "reprt_code" not in args: args["reprt_code"] = "11011" elif function_name == "get_shareholders_tool": if "reprt_code" not in args: args["reprt_code"] = "11011" print(f" 파라미터: {json.dumps(args, ensure_ascii=False, indent=2)}") # MCP 서버의 도구 호출 result = call_mcp_tool(function_name, **args) print(f" 결과: {json.dumps(result, ensure_ascii=False, indent=2)[:500]}...") return result # 전역 변수로 대화 컨텍스트 유지 _chat = None _model = None _current_model_name = None def get_or_create_chat(model_name: str = "gemini-2.0-flash-exp"): """ 대화 컨텍스트를 유지하면서 chat 객체를 가져오거나 생성합니다. """ global _chat, _model, _current_model_name # 모델이 변경되었거나 아직 생성되지 않은 경우 if _model is None or _current_model_name != model_name: # API 키 확인 및 설정 if not os.environ.get("GOOGLE_API_KEY") and GEMINI_API_KEY: os.environ["GOOGLE_API_KEY"] = GEMINI_API_KEY # genai.configure() 재시도 (모델 생성 전) if GEMINI_API_KEY: try: configure_func = getattr(genai, 'configure', None) if configure_func: configure_func(api_key=GEMINI_API_KEY) except Exception: os.environ["GOOGLE_API_KEY"] = GEMINI_API_KEY try: _model = GenerativeModel( model_name=model_name, tools=TOOLS ) _current_model_name = model_name _chat = _model.start_chat() print(f"✅ 새로운 대화 세션 시작 (모델: {model_name})") except Exception as e: print(f"❌ 모델 초기화 실패: {str(e)}") try: _model = GenerativeModel( model_name="gemini-1.5-pro", tools=TOOLS ) _current_model_name = "gemini-1.5-pro" _chat = _model.start_chat() print(" 기본 모델(gemini-1.5-pro)로 재시도 중...") except: print(" 모델 초기화에 실패했습니다.") return None return _chat def test_gemini_function_calling(user_query: str, model_name: str = "gemini-2.0-flash-exp"): """ Gemini Function Calling을 테스트합니다. """ print(f"\n🤖 사용자 질문: {user_query}") print(f"📱 사용 모델: {model_name}") print("=" * 60) # 대화 컨텍스트 가져오기 또는 생성 chat = get_or_create_chat(model_name) if chat is None: return None try: # 사용자 질문 전송 (더 명확한 프롬프트 추가) enhanced_query = user_query # 사용자가 명확한 요청을 했는지 확인하고, 필요시 프롬프트 강화 if any(keyword in user_query.lower() for keyword in ["분석", "조회", "검색", "정보", "재무", "공시", "임원", "지분", "기업"]): enhanced_query = f"{user_query}\n\n중요: 사용 가능한 도구(function)를 반드시 사용하여 정보를 조회하고 분석해주세요. 질문만 하지 말고 실제로 도구를 호출해주세요. 필요한 파라미터가 없으면 기본값을 사용하거나 회사명으로 먼저 검색하세요." # Gemini에 메시지 전송 (rate limit 처리 포함) max_retries = 3 retry_count = 0 while retry_count < max_retries: try: response = chat.send_message(enhanced_query) break # 성공하면 루프 탈출 except google_exceptions.ResourceExhausted as e: retry_count += 1 if retry_count >= max_retries: print(f"\n❌ Rate limit 오류: API 할당량을 초과했습니다.") print(f" 오류 메시지: {str(e)[:200]}...") print(f" 해결 방법: 잠시 후 다시 시도하거나 다른 모델을 사용하세요.") return None # retry_delay 추출 (기본값 60초) retry_delay = 60 error_str = str(e) if "retry_delay" in error_str or "seconds:" in error_str: try: match = re.search(r'seconds:\s*(\d+)', error_str) if match: retry_delay = int(match.group(1)) except: pass print(f"\n⚠️ Rate limit 오류 발생 (시도 {retry_count}/{max_retries})") print(f" {retry_delay}초 후 재시도합니다...") time.sleep(retry_delay) continue except Exception as e: if "429" in str(e) or "quota" in str(e).lower(): retry_count += 1 if retry_count >= max_retries: print(f"\n❌ Rate limit 오류: API 할당량을 초과했습니다.") print(f" 잠시 후 다시 시도하세요.") return None retry_delay = 60 print(f"\n⚠️ Rate limit 오류 발생 (시도 {retry_count}/{max_retries})") print(f" {retry_delay}초 후 재시도합니다...") time.sleep(retry_delay) continue else: raise # Function calling이 필요한 경우 처리 max_iterations = 10 # 무한 루프 방지 iteration = 0 while iteration < max_iterations: iteration += 1 # 응답 확인 if not response.candidates: break parts = response.candidates[0].content.parts if not parts: break # Function call 확인 has_function_call = False for part in parts: if hasattr(part, 'function_call') and part.function_call: has_function_call = True function_call = part.function_call function_name = function_call.name args = dict(function_call.args) # 함수 실행 function_result = handle_function_call(function_name, args) # 결과를 Gemini에 전달 (rate limit 처리 포함) max_retries_func = 3 retry_count_func = 0 while retry_count_func < max_retries_func: try: response = chat.send_message({ "function_response": { "name": function_name, "response": function_result } }) break # 성공하면 루프 탈출 except google_exceptions.ResourceExhausted as e: retry_count_func += 1 if retry_count_func >= max_retries_func: print(f"\n❌ Rate limit 오류: 함수 응답 전송 실패") print(f" 잠시 후 다시 시도하세요.") return None retry_delay = 60 error_str = str(e) if "retry_delay" in error_str or "seconds:" in error_str: try: match = re.search(r'seconds:\s*(\d+)', error_str) if match: retry_delay = int(match.group(1)) except: pass print(f"\n⚠️ Rate limit 오류 (함수 응답 전송, 시도 {retry_count_func}/{max_retries_func})") print(f" {retry_delay}초 후 재시도합니다...") time.sleep(retry_delay) continue except Exception as e: if "429" in str(e) or "quota" in str(e).lower(): retry_count_func += 1 if retry_count_func >= max_retries_func: print(f"\n❌ Rate limit 오류: 함수 응답 전송 실패") return None retry_delay = 60 print(f"\n⚠️ Rate limit 오류 (함수 응답 전송, 시도 {retry_count_func}/{max_retries_func})") print(f" {retry_delay}초 후 재시도합니다...") time.sleep(retry_delay) continue else: raise break if not has_function_call: break # 최종 응답 출력 print("\n" + "=" * 60) print("💬 Gemini 응답:") print(response.text) print("=" * 60) return response.text except Exception as e: print(f"\n❌ 오류 발생: {str(e)}") import traceback traceback.print_exc() return None def main(): """ 메인 함수 """ global TOOLS print("🚀 Gemini Function Calling 테스트 시작") print("=" * 60) # 서버 상태 확인 if not check_server_health(): print("❌ MCP 서버가 실행 중이지 않습니다!") print(" 다음 명령어로 서버를 실행하세요:") print(" HTTP_MODE=1 python -m src.main") print("=" * 60) return print("✅ MCP 서버 연결 확인됨") print("=" * 60) # MCP 서버에서 도구 목록 가져오기 print("\n📋 MCP 서버에서 도구 목록을 가져오는 중...") mcp_tools = get_mcp_tools() if mcp_tools: TOOLS = mcp_tools print(f"✅ {len(mcp_tools[0]['function_declarations'])}개의 도구를 가져왔습니다:") for tool in mcp_tools[0]['function_declarations']: print(f" - {tool['name']}") else: print("⚠️ 하드코딩된 도구 목록을 사용합니다.") print("=" * 60) # 테스트 쿼리들 test_queries = [ "삼성전자 회사 정보를 검색해줘", "네이버의 최근 재무제표를 조회해줘", "카카오의 기본 정보를 알려줘" ] # 모델 선택 print("\n사용할 Gemini 모델을 선택하세요:") print("1. gemini-2.0-flash-exp (최신, 권장)") print("2. gemini-1.5-pro") print("3. gemini-1.5-flash") model_choice = input("\n모델 선택 (1-3, 기본값: 1): ").strip() or "1" model_map = { "1": "gemini-2.0-flash-exp", "2": "gemini-1.5-pro", "3": "gemini-1.5-flash" } model_name = model_map.get(model_choice, "gemini-2.0-flash-exp") # 사용자 입력 받기 print("\n테스트 쿼리를 선택하거나 직접 입력하세요:") print("1. 삼성전자 회사 정보를 검색해줘") print("2. 네이버의 최근 재무제표를 조회해줘") print("3. 카카오의 기본 정보를 알려줘") print("4. 직접 입력") choice = input("\n선택 (1-4): ").strip() if choice == "1": query = test_queries[0] elif choice == "2": query = test_queries[1] elif choice == "3": query = test_queries[2] elif choice == "4": query = input("질문을 입력하세요: ").strip() else: query = test_queries[0] if not query: print("❌ 질문을 입력해주세요.") return # 대화형 루프 시작 print("\n💡 대화형 모드: 'quit', 'exit', 'q'를 입력하면 종료됩니다.") print("=" * 60) while True: # 테스트 실행 result = test_gemini_function_calling(query, model_name) if result is None: print("\n⚠️ 응답을 받지 못했습니다.") # 다음 질문 입력 print("\n" + "=" * 60) next_query = input("\n다음 질문을 입력하세요 (종료: quit/exit/q): ").strip() if not next_query: continue # 종료 명령 확인 if next_query.lower() in ['quit', 'exit', 'q', '종료']: print("\n👋 테스트를 종료합니다.") break query = next_query if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SeoNaRu/company-info-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server