import logging
import json
import xml.etree.ElementTree as ET
from typing import Any, Callable, Optional, Dict, List
from pathlib import Path
from mcp.types import TextContent
logger = logging.getLogger("mcp-opendart")
def _normalize_lifespan_context(lifespan_context: Any) -> Any:
"""
lifespan_context를 정규화합니다.
dict 형태로 래핑된 경우 실제 컨텍스트를 추출합니다.
Args:
lifespan_context: 원본 lifespan_context
Returns:
정규화된 OpenDartContext
"""
# dict 형태로 래핑된 경우 처리
if isinstance(lifespan_context, dict):
# 일반적인 키 이름들 확인
for key in ['app_lifespan_context', 'lifespan_context', 'context', 'ctx']:
if key in lifespan_context:
return lifespan_context[key]
# dict 자체가 컨텍스트인 경우 (dict에 필요한 속성이 있는지 확인)
if hasattr(lifespan_context, 'client') or 'client' in lifespan_context:
return lifespan_context
return lifespan_context
def _get_context_from_ctx(ctx: Any) -> Optional[Any]:
"""
MCPContext에서 OpenDartContext를 추출합니다.
Args:
ctx: MCPContext or None
Returns:
OpenDartContext or None
"""
if ctx is None:
return None
try:
# ctx.request_context.lifespan_context 접근 시도
if hasattr(ctx, 'request_context'):
request_ctx = ctx.request_context
if hasattr(request_ctx, 'lifespan_context'):
lifespan_ctx = request_ctx.lifespan_context
return _normalize_lifespan_context(lifespan_ctx)
except Exception as e:
logger.debug(f"Context 추출 실패: {e}")
return None
return None
def as_json_text(payload) -> TextContent:
"""
다양한 타입의 데이터를 JSON TextContent로 변환합니다.
Args:
payload: 변환할 데이터 (dict, list, str, 기타)
Returns:
TextContent: JSON 문자열로 변환된 TextContent
"""
# 1) dict/list -> JSON 문자열
if isinstance(payload, (dict, list)):
txt = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
# 2) 이미 문자열인 경우
elif isinstance(payload, str):
# 이미 JSON이면 그대로 통과 (이중 인코딩 방지)
try:
json.loads(payload)
txt = payload
except json.JSONDecodeError:
# JSON이 아니면 안전하게 래핑
txt = json.dumps({"raw": payload}, ensure_ascii=False, separators=(",", ":"))
# 3) 기타 타입은 직렬화 규칙 지정
else:
def _default(o):
# bytes 타입 처리
if isinstance(o, bytes):
try:
return o.decode('utf-8')
except UnicodeDecodeError:
return f"<bytes: {len(o)} bytes>"
# pydantic/dataclass 지원 등
try:
return o.model_dump()
except Exception:
return str(o)
txt = json.dumps(payload, ensure_ascii=False, default=_default, separators=(",", ":"))
return TextContent(type="text", text=txt)
def xml_to_dict(element: ET.Element) -> Dict[str, Any]:
"""
XML Element를 딕셔너리로 변환합니다.
Args:
element: XML Element
Returns:
Dict: 변환된 딕셔너리
"""
result: Dict[str, Any] = {}
# 태그명을 키로 사용
tag = element.tag.split('}')[-1] if '}' in element.tag else element.tag
# 속성이 있으면 추가 (불필요한 속성 필터링)
if element.attrib:
filtered_attrs = _filter_attributes(element.attrib)
if filtered_attrs:
result['@attributes'] = filtered_attrs
# 텍스트 내용이 있으면 추가
if element.text and element.text.strip():
if len(element) == 0: # 자식 요소가 없으면 텍스트만
result['#text'] = element.text.strip()
else: # 자식 요소와 텍스트가 모두 있으면
result['#text'] = element.text.strip()
# 자식 요소들 처리
children: Dict[str, Any] = {}
for child in element:
child_tag = child.tag.split('}')[-1] if '}' in child.tag else child.tag
child_dict = xml_to_dict(child)
if child_tag in children:
# 같은 태그가 여러 개 있으면 리스트로 변환
if not isinstance(children[child_tag], list):
children[child_tag] = [children[child_tag]]
children[child_tag].append(child_dict)
else:
children[child_tag] = child_dict
# 자식 요소들을 결과에 추가
result.update(children)
return result
def _filter_attributes(attributes: Dict[str, str]) -> Dict[str, str]:
"""
XML 속성에서 의미있는 속성만 필터링합니다.
Args:
attributes: 원본 속성 딕셔너리
Returns:
Dict: 필터링된 속성 딕셔너리
"""
# 의미있는 속성들만 유지
meaningful_attrs = {
'ACODE', 'ADATE', 'AUNIT', 'AUNITVALUE', 'ACLASS', 'ATOC', 'AASSOCNOTE', 'ATOCID',
'ACOPY', 'ADELETE', 'AMOVECOL', 'AUPDATECONT', 'ACOPYCOL', 'ADELETECOL',
'WIDTH', 'HEIGHT', 'ALIGN', 'VALIGN', 'COLSPAN', 'ROWSPAN',
'REFNO', 'USERMARK', 'BORDER', 'AFIXTABLE', 'ACLASS'
}
filtered = {}
for key, value in attributes.items():
# 네임스페이스 제거
clean_key = key.split('}')[-1] if '}' in key else key
# 의미있는 속성만 유지
if clean_key in meaningful_attrs:
filtered[clean_key] = value
# 특별한 경우들
elif clean_key in ['noNamespaceSchemaLocation', 'xmlns:xsi']:
filtered[clean_key] = value
return filtered
def parse_disclosure_xml(xml_path: str) -> Dict[str, Any]:
"""
공시서류 XML 파일을 파싱하여 JSON 형태로 변환합니다.
Args:
xml_path: XML 파일 경로
Returns:
Dict: 파싱된 공시서류 데이터
"""
try:
# 파일 크기 확인
file_size = Path(xml_path).stat().st_size
logger.info(f"XML 파일 크기: {file_size} bytes")
# 큰 파일의 경우 스트리밍 파싱 사용
if file_size > 10 * 1024 * 1024: # 10MB 이상
logger.info("대용량 XML 파일 - 스트리밍 파싱 사용")
return parse_large_xml(xml_path)
# XML 파싱 시도 (여러 방법으로 시도)
xml_dict = None
# 방법 1: 표준 파싱
try:
tree = ET.parse(xml_path)
root = tree.getroot()
xml_dict = xml_to_dict(root)
except ET.ParseError as e:
logger.warning(f"표준 XML 파싱 실패: {e}")
# 방법 2: 오류 복구 파싱
try:
xml_dict = parse_xml_with_recovery(xml_path)
except Exception as e2:
logger.error(f"복구 파싱도 실패: {e2}")
# 방법 3: 기본 정보만 추출
xml_dict = extract_basic_xml_info(xml_path)
# 메타데이터 추가
result = {
"file_path": xml_path,
"file_size": file_size,
"parsed_at": json.dumps({"timestamp": str(Path(xml_path).stat().st_mtime)}),
"document_data": xml_dict
}
return result
except Exception as e:
logger.error(f"공시서류 파싱 오류: {e}")
return {"error": f"파일 처리 실패: {str(e)}", "file_path": xml_path}
def parse_xml_with_recovery(xml_path: str) -> Dict[str, Any]:
"""
XML 파싱 오류가 발생했을 때 복구를 시도합니다.
Args:
xml_path: XML 파일 경로
Returns:
Dict: 복구된 XML 데이터
"""
try:
# 파일을 읽어서 문제가 있는 부분을 수정
with open(xml_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# 일반적인 XML 오류 수정
content = content.replace('&', '&') # & 문자 이스케이프
content = content.replace('<', '<').replace('>', '>') # 잘못된 태그 수정
# 임시 파일로 저장 후 파싱
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.xml', delete=False, encoding='utf-8') as temp_file:
temp_file.write(content)
temp_path = temp_file.name
try:
tree = ET.parse(temp_path)
root = tree.getroot()
result = xml_to_dict(root)
return result
finally:
# 임시 파일 삭제
Path(temp_path).unlink(missing_ok=True)
except Exception as e:
logger.error(f"XML 복구 파싱 실패: {e}")
raise e
def extract_basic_xml_info(xml_path: str) -> Dict[str, Any]:
"""
XML 파싱이 완전히 실패했을 때 기본 정보만 추출합니다.
Args:
xml_path: XML 파일 경로
Returns:
Dict: 기본 정보만 포함된 데이터
"""
try:
with open(xml_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# 정규표현식으로 기본 정보 추출
import re
# 문서명 추출
doc_name_match = re.search(r'<DOCUMENT-NAME[^>]*>([^<]+)</DOCUMENT-NAME>', content)
doc_name = doc_name_match.group(1) if doc_name_match else "알 수 없음"
# 회사명 추출
company_match = re.search(r'<COMPANY-NAME[^>]*>([^<]+)</COMPANY-NAME>', content)
company_name = company_match.group(1) if company_match else "알 수 없음"
# 버전 정보 추출
version_match = re.search(r'<FORMULA-VERSION[^>]*ADATE="([^"]+)"', content)
version_date = version_match.group(1) if version_match else ""
return {
"root_tag": "DOCUMENT",
"summary": "XML 파싱 오류로 인한 기본 정보만 제공",
"extracted_info": {
"document_name": doc_name,
"company_name": company_name,
"version_date": version_date
},
"note": "전체 파싱을 위해서는 XML 파일을 수동으로 확인하세요"
}
except Exception as e:
logger.error(f"기본 정보 추출 실패: {e}")
return {
"root_tag": "DOCUMENT",
"summary": "파일 읽기 실패",
"error": str(e)
}
def search_xml_text(xml_path: str, search_term: str, case_sensitive: bool = False, exact_match: bool = False, strip_tags: bool = True) -> List[Dict[str, Any]]:
"""
XML 파일에서 텍스트 검색
Args:
xml_path: XML 파일 경로
search_term: 검색할 텍스트
case_sensitive: 대소문자 구분 여부
exact_match: 정확한 매치 여부
strip_tags: XML 태그 제거 여부 (기본값: True)
Returns:
List[Dict]: 검색 결과 리스트
"""
import re
try:
with open(xml_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# XML 태그 제거 (선택적)
if strip_tags:
# XML 태그 제거하고 순수 텍스트만 추출
text_content = re.sub(r'<[^>]+>', '', content)
# 연속된 공백을 하나로 정리
text_content = re.sub(r'\s+', ' ', text_content)
lines = text_content.split('\n')
else:
lines = content.split('\n')
# 검색 옵션 설정
flags = 0 if case_sensitive else re.IGNORECASE
if exact_match:
pattern = re.escape(search_term)
else:
pattern = search_term
# 검색 실행
matches = []
for i, line in enumerate(lines, 1):
if re.search(pattern, line, flags):
# 매치된 라인에서 컨텍스트 추출
start = max(0, i - 3)
end = min(len(lines), i + 4)
context_lines = lines[start:end]
# 매치된 부분 하이라이트
highlighted_line = re.sub(
f'({re.escape(search_term)})',
r'**\1**',
line,
flags=flags
)
matches.append({
"line_number": i,
"matched_text": line.strip(),
"highlighted_text": highlighted_line,
"context": {
"start_line": start + 1,
"end_line": end,
"lines": context_lines
}
})
return matches
except Exception as e:
logger.error(f"XML 텍스트 검색 오류: {e}")
return []
def search_xml_semantic(xml_path: str, search_term: str, similarity_threshold: float = 0.7, max_results: int = 20, strip_tags: bool = True) -> List[Dict[str, Any]]:
"""
XML 파일에서 의미 기반 검색
Args:
xml_path: XML 파일 경로
search_term: 검색할 텍스트
similarity_threshold: 유사도 임계값
max_results: 최대 결과 수
strip_tags: XML 태그 제거 여부 (기본값: True)
Returns:
List[Dict]: 검색 결과 리스트
"""
import re
from difflib import SequenceMatcher
try:
with open(xml_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# XML 태그 제거 (선택적)
if strip_tags:
# XML 태그 제거하고 순수 텍스트만 추출
text_content = re.sub(r'<[^>]+>', '', content)
# 연속된 공백을 하나로 정리
text_content = re.sub(r'\s+', ' ', text_content)
else:
text_content = content
# 텍스트를 문장 단위로 분리
sentences = re.split(r'[.!?。!?]', text_content)
results = []
for i, sentence in enumerate(sentences):
if len(sentence.strip()) < 10: # 너무 짧은 문장 제외
continue
# 유사도 계산
similarity = SequenceMatcher(None, search_term.lower(), sentence.lower()).ratio()
if similarity >= similarity_threshold:
# 매치된 문장에서 컨텍스트 추출
line_start = text_content.find(sentence)
line_number = text_content[:line_start].count('\n') + 1
results.append({
"line_number": line_number,
"matched_text": sentence.strip(),
"similarity_score": round(similarity, 3),
"context": {
"before": sentences[i-1].strip() if i > 0 else "",
"after": sentences[i+1].strip() if i < len(sentences)-1 else ""
}
})
# 유사도 순으로 정렬하고 최대 결과 수만큼 반환
results.sort(key=lambda x: x['similarity_score'], reverse=True)
return results[:max_results]
except Exception as e:
logger.error(f"XML 의미 검색 오류: {e}")
return []
def get_xml_sections(xml_path: str, section_name: Optional[str] = None) -> Dict[str, Any]:
"""
XML 파일의 섹션별 내용 조회
Args:
xml_path: XML 파일 경로
section_name: 조회할 섹션명 (None이면 전체 섹션 목록)
Returns:
Dict: 섹션 내용 또는 섹션 목록
"""
import re
try:
with open(xml_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
if section_name is None:
# 전체 섹션 목록 추출
section_pattern = r'<TITLE[^>]*>([^<]+)</TITLE>'
sections = re.findall(section_pattern, content)
return {
"total_sections": len(sections),
"sections": sections
}
else:
# 특정 섹션 내용 추출
# 섹션 제목을 찾아서 다음 섹션까지의 내용 추출
section_pattern = rf'<TITLE[^>]*>{re.escape(section_name)}</TITLE>'
match = re.search(section_pattern, content)
if not match:
return {
"error": f"섹션을 찾을 수 없습니다: {section_name}",
"available_sections": re.findall(r'<TITLE[^>]*>([^<]+)</TITLE>', content)
}
# 섹션 시작 위치
start_pos = match.start()
# 다음 섹션 찾기
next_section_pattern = r'<TITLE[^>]*>([^<]+)</TITLE>'
next_match = re.search(next_section_pattern, content[start_pos + 1:])
if next_match:
end_pos = start_pos + 1 + next_match.start()
section_content = content[start_pos:end_pos]
else:
section_content = content[start_pos:]
return {
"section_name": section_name,
"content": section_content,
"content_length": len(section_content)
}
except Exception as e:
logger.error(f"XML 섹션 조회 오류: {e}")
return {"error": str(e)}
def parse_large_xml(xml_path: str) -> Dict[str, Any]:
"""
대용량 XML 파일을 스트리밍 방식으로 파싱합니다.
Args:
xml_path: XML 파일 경로
Returns:
Dict: 파싱된 공시서류 데이터 (요약 정보)
"""
try:
import xml.etree.ElementTree as ET
# 스트리밍 파서 사용
context = ET.iterparse(xml_path, events=("start", "end"))
context = iter(context)
event, root = next(context)
# 기본 정보 추출
result = {
"file_path": xml_path,
"file_size": Path(xml_path).stat().st_size,
"parsed_at": json.dumps({"timestamp": str(Path(xml_path).stat().st_mtime)}),
"document_data": {
"root_tag": root.tag,
"attributes": root.attrib,
"summary": "대용량 XML 파일 - 요약 정보만 제공",
"note": "전체 파싱을 위해서는 별도의 XML 처리 도구 사용 권장"
}
}
# 주요 태그들만 추출
important_tags = ['DOCUMENT-NAME', 'COMPANY-NAME', 'SUMMARY']
for event, elem in context:
if event == "end" and elem.tag in important_tags:
if elem.tag not in result["document_data"]:
result["document_data"][elem.tag] = []
result["document_data"][elem.tag].append({
"text": elem.text.strip() if elem.text else "",
"attributes": elem.attrib
})
elem.clear() # 메모리 절약
return result
except Exception as e:
logger.error(f"대용량 XML 파싱 오류: {e}")
return {"error": f"대용량 XML 파싱 실패: {str(e)}", "file_path": xml_path}
def save_disclosure_cache(rcp_no: str, data: Dict[str, Any]) -> str:
"""
공시서류 데이터를 캐시 파일로 저장합니다.
Args:
rcp_no: 접수번호
data: 저장할 데이터
Returns:
str: 저장된 캐시 파일 경로
"""
try:
# 캐시 디렉토리 생성
project_root = Path(__file__).parent.parent.parent
cache_dir = project_root / 'mcp_opendart' / 'utils' / 'data' / 'disclosure_cache'
cache_dir.mkdir(parents=True, exist_ok=True)
# 캐시 파일 경로
cache_file = cache_dir / f'disclosure_{rcp_no}.json'
# JSON으로 저장
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"공시서류 캐시 저장 완료: {cache_file}")
return str(cache_file)
except Exception as e:
logger.error(f"캐시 저장 오류: {e}")
raise
def load_disclosure_cache(rcp_no: str) -> Optional[Dict[str, Any]]:
"""
공시서류 캐시 파일을 로드합니다.
Args:
rcp_no: 접수번호
Returns:
Optional[Dict]: 캐시된 데이터 또는 None
"""
try:
project_root = Path(__file__).parent.parent.parent
cache_file = project_root / 'mcp_opendart' / 'utils' / 'data' / 'disclosure_cache' / f'disclosure_{rcp_no}.json'
if cache_file.exists():
with open(cache_file, 'r', encoding='utf-8') as f:
data: Dict[str, Any] = json.load(f)
logger.info(f"공시서류 캐시 로드 완료: {cache_file}")
return data
else:
return None
except Exception as e:
logger.error(f"캐시 로드 오류: {e}")
return None
# Human readable key 매핑 사전
KEY_MAPPING = {
# 기본 응답 필드
"result": "response_result",
"status": "status_code",
"message": "status_message",
"list": "data_list",
# 기업 기본 정보
"corp_name": "corporation_name",
"corp_name_eng": "corporation_name_english",
"corp_code": "corporation_code",
"corp_cls": "corporation_class",
"stock_name": "stock_name",
"stock_code": "stock_code",
"ceo_nm": "ceo_name",
"jurir_no": "corporate_registration_number",
"bizr_no": "business_registration_number",
"adres": "address",
"hm_url": "homepage_url",
"ir_url": "investor_relations_url",
"phn_no": "phone_number",
"fax_no": "fax_number",
"induty_code": "industry_code",
"est_dt": "establishment_date",
"acc_mt": "accounting_month",
# 공시 관련
"rcept_no": "receipt_number",
"flr_nm": "filer_name",
"report_nm": "report_name",
"rcept_dt": "receipt_date",
"rm": "remarks",
# 재무 관련
"bsns_year": "business_year",
"reprt_code": "report_code",
"fs_div": "financial_statement_division",
"sj_div": "subject_division",
"account_nm": "account_name",
"account_detail": "account_detail",
"account_id": "account_id",
"thstrm_nm": "this_term_name",
"thstrm_amount": "this_term_amount",
"thstrm_add_amount": "this_term_additional_amount",
"frmtrm_nm": "former_term_name",
"frmtrm_amount": "former_term_amount",
"frmtrm_q_nm": "former_quarter_name",
"frmtrm_q_amount": "former_quarter_amount",
"frmtrm_add_amount": "former_term_additional_amount",
"bfefrmtrm_nm": "before_former_term_name",
"bfefrmtrm_amount": "before_former_term_amount",
# 주주 및 지분 관련
"nm": "name",
"relate": "relationship",
"stock_knd": "stock_kind",
"bsis_posesn_stock_co": "basis_possession_stock_count",
"bsis_posesn_stock_qota_rt": "basis_possession_stock_quota_rate",
"trmend_posesn_stock_co": "term_end_possession_stock_count",
"trmend_posesn_stock_qota_rt": "term_end_possession_stock_quota_rate",
"change_posesn_stock_co": "change_possession_stock_count",
"change_posesn_stock_qota_rt": "change_possession_stock_quota_rate",
# 임원 관련
"nmpr": "number_of_people",
"mendng_totamt": "total_compensation_amount",
"jan_avrg_mendng_am": "average_compensation_per_person",
"fyer_salary_totamt": "annual_salary_total_amount",
"jan_salary_am": "average_salary_per_person",
"psn1_avrg_pymntamt": "average_payment_per_person",
"pymnt_totamt": "total_payment_amount",
"se": "classification",
# 직원 관련
"sexdstn": "gender_classification",
"fo_bbm": "business_division",
"rgllbr_co": "regular_employee_count",
"cnttk_co": "contract_employee_count",
"rgllbr_abacpt_labrr_co": "regular_part_time_worker_count",
"cnttk_abacpt_labrr_co": "contract_part_time_worker_count",
"avrg_cnwk_sdytrn": "average_service_years",
"reform_bfe_emp_co_rgllbr": "before_reform_regular_employee_count",
"reform_bfe_emp_co_cnttk": "before_reform_contract_employee_count",
"reform_bfe_emp_co_etc": "before_reform_other_employee_count",
# 날짜 관련
"stlm_dt": "settlement_date",
"bgn_de": "begin_date",
"end_de": "end_date",
# 기타 일반적인 약어들
"prv_mt": "previous_month",
"thstrm_dt": "this_term_date",
"frmtrm_dt": "former_term_date",
"ord_stock": "ordinary_stock",
"pref_stock": "preferred_stock",
"etc_stock": "other_stock",
"tot_stock": "total_stock",
}
# 금액 관련 필드들 (숫자형으로 변환해야 하는 필드들)
AMOUNT_FIELDS = {
# 재무제표 금액 필드들
"thstrm_amount", "before_this_term_amount", "frmtrm_amount", "before_former_term_amount",
"bfefrmtrm_amount", "before_before_former_term_amount",
"mendng_totamt", "total_compensation_amount", "jan_avrg_mendng_am", "average_compensation_per_person",
"fyer_salary_totamt", "annual_salary_total_amount", "jan_salary_am", "average_salary_per_person",
"psn1_avrg_pymntamt", "average_payment_per_person", "pymnt_totamt", "total_payment_amount",
# 기타 금액 필드들 (원래 키 이름들도 포함)
"thstrm_amount", "frmtrm_amount", "bfefrmtrm_amount",
"mendng_totamt", "jan_avrg_mendng_am", "fyer_salary_totamt",
"jan_salary_am", "psn1_avrg_pymntamt", "pymnt_totamt",
# 일반적인 금액 필드명들
"amount", "amt", "money", "price", "cost", "value", "total", "sum",
"capital", "asset", "liability", "equity", "revenue", "income", "expense",
"sales", "profit", "loss", "debt", "cash", "investment", "dividend"
}
def convert_to_numeric(value: Any) -> Any:
"""
문자열 금액을 숫자형으로 변환합니다.
Args:
value: 변환할 값
Returns:
변환된 숫자 값 (변환 실패시 원본 반환)
"""
if not isinstance(value, str):
return value
# 빈 문자열이나 '-' 처리
if not value or value.strip() in ['-', '', 'null', 'NULL', 'None']:
return 0
try:
# 쉼표 제거하고 숫자로 변환
cleaned_value = value.replace(',', '').replace(' ', '').strip()
# 음수 처리
if cleaned_value.startswith('(') and cleaned_value.endswith(')'):
cleaned_value = '-' + cleaned_value[1:-1]
# 정수 또는 실수로 변환
if '.' in cleaned_value:
return float(cleaned_value)
else:
return int(cleaned_value)
except (ValueError, TypeError):
# 변환 실패시 원본 반환
return value
def transform_keys(data: Any) -> Any:
"""
데이터의 키를 human readable 형태로 변환하고, 금액 필드를 숫자형으로 변환합니다.
Args:
data: 변환할 데이터 (dict, list, 또는 기타)
Returns:
키가 변환되고 금액이 숫자형으로 변환된 데이터
"""
if isinstance(data, dict):
transformed = {}
for key, value in data.items():
# 키 변환
new_key = KEY_MAPPING.get(key, key)
# 금액 필드인 경우 숫자형으로 변환
if new_key in AMOUNT_FIELDS or key in AMOUNT_FIELDS:
if isinstance(value, str):
value = convert_to_numeric(value)
# 재귀적으로 값도 변환
transformed[new_key] = transform_keys(value)
return transformed
elif isinstance(data, list):
return [transform_keys(item) for item in data]
else:
return data
def with_context(
ctx: Optional[Any],
tool_name: str,
fallback_func: Callable[[Any], Any],
transform_response: bool = True
) -> Any:
"""
MCP context를 안전하게 처리합니다.
ctx 주입이 있으면 사용하고, 없으면 전역 컨텍스트로 fallback합니다.
응답 데이터의 키를 human readable 형태로 변환합니다.
Args:
ctx: MCPContext or None
tool_name: 도구명 (로깅용)
fallback_func: context.ds001.get_something 등 context 의존 로직
transform_response: 응답 키를 변환할지 여부 (기본값: True)
Returns:
fallback_func 실행 결과 (키가 변환된 데이터)
Raises:
ValueError: 전역 컨텍스트도 없을 때
"""
logger.info(f"📌 Tool: {tool_name} 호출됨")
# 1. ctx에서 컨텍스트 추출 시도
opendart_ctx = _get_context_from_ctx(ctx)
if opendart_ctx is not None:
try:
result = fallback_func(opendart_ctx)
logger.info("✅ MCP 내부 컨텍스트 사용")
return transform_keys(result) if transform_response else result
except Exception as e:
logger.warning(f"⚠️ MCPContext 접근 실패, 전역 컨텍스트로 fallback: {e}")
# fallback으로 계속 진행
# 2. 전역 컨텍스트로 fallback
try:
from mcp_opendart.server import get_global_context
global_ctx = get_global_context()
if global_ctx is not None:
result = fallback_func(global_ctx)
logger.info("✅ 전역 컨텍스트 사용 (fallback)")
return transform_keys(result) if transform_response else result
except Exception as e:
logger.error(f"⚠️ 전역 컨텍스트 접근 실패: {e}")
# 3. 둘 다 실패
raise ValueError("OpenDART context is required but not provided. Lifespan context not initialized.")
async def with_context_async(
ctx: Optional[Any],
tool_name: str,
fallback_func: Callable[[Any], Any],
transform_response: bool = True
) -> Any:
"""
비동기 버전의 with_context 함수.
ctx 주입이 있으면 사용하고, 없으면 전역 컨텍스트로 fallback합니다.
응답 데이터의 키를 human readable 형태로 변환합니다.
Args:
ctx: MCPContext or None
tool_name: 도구명 (로깅용)
fallback_func: context.ds001.get_something 등 context 의존 로직
transform_response: 응답 키를 변환할지 여부 (기본값: True)
Returns:
fallback_func 실행 결과 (키가 변환된 데이터)
Raises:
ValueError: 전역 컨텍스트도 없을 때
"""
logger.info(f"📌 Tool: {tool_name} 호출됨 (async)")
# 1. ctx에서 컨텍스트 추출 시도
opendart_ctx = _get_context_from_ctx(ctx)
if opendart_ctx is not None:
try:
result = fallback_func(opendart_ctx)
logger.info("✅ MCP 내부 컨텍스트 사용 (async)")
return transform_keys(result) if transform_response else result
except Exception as e:
logger.warning(f"⚠️ MCPContext 접근 실패, 전역 컨텍스트로 fallback (async): {e}")
# fallback으로 계속 진행
# 2. 전역 컨텍스트로 fallback
try:
from mcp_opendart.server import get_global_context
global_ctx = get_global_context()
if global_ctx is not None:
result = fallback_func(global_ctx)
logger.info("✅ 전역 컨텍스트 사용 (fallback, async)")
return transform_keys(result) if transform_response else result
except Exception as e:
logger.error(f"⚠️ 전역 컨텍스트 접근 실패 (async): {e}")
# 3. 둘 다 실패
raise ValueError("OpenDART context is required but not provided. Lifespan context not initialized.")