"""
재무제표 주석 추출 모듈
연결재무제표 주석, 재무제표 주석, 사업의 내용을 동적으로 추출
하드코딩 최소화
"""
import re
import json
import shutil
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Optional
def detect_financial_notes_sections(xml_content: str) -> Dict[str, Dict[str, Any]]:
"""
연결재무제표 주석과 재무제표 주석 섹션을 동적으로 탐지
하드코딩 없이 패턴 매칭으로 처리
"""
sections = {}
# 연결재무제표 주석 패턴들
consolidated_patterns = [
r'<TITLE[^>]*>.*?연결재무제표\s*주석.*?</TITLE>',
r'<TITLE[^>]*>.*?Notes\s*to\s*the\s*consolidated.*?</TITLE>',
r'<TITLE[^>]*>.*?3\.\s*연결재무제표\s*주석.*?</TITLE>'
]
# 재무제표 주석 패턴들
separate_patterns = [
r'<TITLE[^>]*>.*?재무제표\s*주석.*?</TITLE>',
r'<TITLE[^>]*>.*?Notes\s*to\s*the\s*separate.*?</TITLE>',
r'<TITLE[^>]*>.*?5\.\s*재무제표\s*주석.*?</TITLE>'
]
lines = xml_content.split('\n')
# 연결재무제표 주석 찾기
for i, line in enumerate(lines):
for pattern in consolidated_patterns:
if re.search(pattern, line, re.IGNORECASE):
title_match = re.search(r'<TITLE[^>]*>(.*?)</TITLE>', line)
sections['consolidated_notes'] = {
'start_line': i + 1,
'title': title_match.group(1).strip() if title_match else line.strip(),
'pattern_used': pattern
}
break
if 'consolidated_notes' in sections:
break
# 재무제표 주석 찾기 (연결재무제표 주석 이후에만)
consolidated_start = sections.get('consolidated_notes', {}).get('start_line', 0)
for i, line in enumerate(lines):
if i + 1 <= consolidated_start: # 연결재무제표 주석 이전은 스킵
continue
for pattern in separate_patterns:
if re.search(pattern, line, re.IGNORECASE):
title_match = re.search(r'<TITLE[^>]*>(.*?)</TITLE>', line)
sections['separate_notes'] = {
'start_line': i + 1,
'title': title_match.group(1).strip() if title_match else line.strip(),
'pattern_used': pattern
}
break
if 'separate_notes' in sections:
break
return sections
def detect_company_overview_sections(xml_content: str) -> Dict[str, Dict[str, Any]]:
"""
I. 회사의 개요 섹션을 다중 패턴으로 탐지
TITLE 태그와 TD 태그(목차) 모두 지원
"""
sections: Dict[str, Dict[str, Any]] = {}
lines = xml_content.split('\n')
# 패턴 1: TITLE 태그
overview_start = None
for i, line in enumerate(lines):
if re.search(r'<TITLE[^>]*>.*?I\.\s*회사의\s*개요.*?</TITLE>', line):
overview_start = i
break
# 패턴 2: TD 태그 (목차) - 목차 구조 지원
if not overview_start:
for i, line in enumerate(lines):
if re.search(r'<TD[^>]*>.*?I\.\s*회사의\s*개요.*?</TD>', line):
overview_start = i
break
if not overview_start:
return sections
# II. 사업의 내용을 끝점으로 사용 (다중 패턴)
overview_end = len(lines) # 기본값: 파일 끝
# TITLE 태그로 II. 사업의 내용 찾기
for i in range(overview_start + 1, len(lines)):
if re.search(r'<TITLE[^>]*>.*?II\.\s*사업의\s*내용.*?</TITLE>', lines[i]):
overview_end = i
break
# TD 태그로 II. 사업의 내용 찾기 (TITLE이 없을 경우)
if overview_end == len(lines):
for i in range(overview_start + 1, len(lines)):
if re.search(r'<TD[^>]*>.*?II\.\s*사업의\s*내용.*?</TD>', lines[i]):
overview_end = i
break
sections['company_overview'] = {
'start_line': overview_start + 1,
'end_line': overview_end,
'title': 'I. 회사의 개요'
}
return sections
def detect_business_content_sections(xml_content: str) -> Dict[str, Dict[str, Any]]:
"""
II. 사업의 내용 섹션을 다중 패턴으로 탐지
TITLE 태그와 TD 태그(목차) 모두 지원
"""
sections: Dict[str, Dict[str, Any]] = {}
lines = xml_content.split('\n')
# 패턴 1: TITLE 태그 (기존)
business_start = None
for i, line in enumerate(lines):
if re.search(r'<TITLE[^>]*>.*?II\.\s*사업의\s*내용.*?</TITLE>', line):
business_start = i
break
# 패턴 2: TD 태그 (목차) - 삼성전자 등 목차 구조 지원
if not business_start:
for i, line in enumerate(lines):
if re.search(r'<TD[^>]*>.*?II\.\s*사업의\s*내용.*?</TD>', line):
business_start = i
break
if not business_start:
return sections
# III. 재무에 관한 사항을 끝점으로 사용 (다중 패턴)
business_end = len(lines) # 기본값: 파일 끝
# TITLE 태그로 III. 재무에 관한 사항 찾기
for i in range(business_start + 1, len(lines)):
if re.search(r'<TITLE[^>]*>.*?III\.\s*재무에\s*관한\s*사항.*?</TITLE>', lines[i]):
business_end = i
break
# TD 태그로 III. 재무에 관한 사항 찾기 (TITLE이 없을 경우)
if business_end == len(lines):
for i in range(business_start + 1, len(lines)):
if re.search(r'<TD[^>]*>.*?III\.\s*재무에\s*관한\s*사항.*?</TD>', lines[i]):
business_end = i
break
sections['business_content'] = {
'start_line': business_start + 1,
'end_line': business_end,
'title': 'II. 사업의 내용'
}
return sections
def detect_company_overview_subsections(xml_content: str, overview_section: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""
회사의 개요 내 모든 하위 섹션을 동적으로 탐지
II. 사업의 내용 이전까지의 모든 SECTION-2를 찾기
"""
lines = xml_content.split('\n')
start_line = overview_section['start_line'] - 1
end_line = overview_section['end_line']
subsections = {}
current_section = None
section_start_line = None
for i in range(start_line, end_line):
line = lines[i]
# SECTION-2 시작 감지
if re.search(r'<SECTION-2[^>]*>', line):
# 이전 섹션의 끝점 설정
if current_section and section_start_line:
subsections[current_section]['end_line'] = i
subsections[current_section]['line_count'] = i - section_start_line + 1
# 새 섹션 시작
section_start_line = i + 1
# 다음 TITLE 찾기 (최대 5줄 내)
for j in range(i, min(i + 5, end_line)):
title_match = re.search(r'<TITLE[^>]*>(.*?)</TITLE>', lines[j])
if title_match:
current_section = title_match.group(1).strip()
subsections[current_section] = {
'start_line': j + 1,
'title': current_section,
'section_type': 'subsection'
}
break
# 마지막 섹션의 끝점 설정
if current_section and section_start_line:
subsections[current_section]['end_line'] = end_line
subsections[current_section]['line_count'] = end_line - section_start_line + 1
return subsections
def detect_business_subsections(xml_content: str, business_section: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""
사업의 내용 내 모든 하위 섹션을 동적으로 탐지
III. 재무에 관한 사항 이전까지의 모든 SECTION-2를 찾기
"""
lines = xml_content.split('\n')
start_line = business_section['start_line'] - 1
end_line = business_section['end_line']
subsections = {}
current_section = None
for i in range(start_line, end_line):
line = lines[i]
# SECTION-2 시작 감지
if re.search(r'<SECTION-2[^>]*>', line):
# 다음 TITLE 찾기 (최대 5줄 내)
for j in range(i, min(i + 5, end_line)):
title_match = re.search(r'<TITLE[^>]*>(.*?)</TITLE>', lines[j])
if title_match:
current_section = title_match.group(1).strip()
subsections[current_section] = {
'start_line': j + 1,
'title': current_section,
'section_type': 'subsection'
}
break
# SECTION-2 끝 감지
elif re.search(r'</SECTION-2>', line) and current_section:
subsections[current_section]['end_line'] = i + 1
current_section = None
return subsections
def safe_filename(name: str) -> str:
"""
한글/특수문자를 안전한 파일명으로 변환
"""
# 한글, 영문, 숫자, 하이픈, 언더스코어만 허용
safe_name = re.sub(r'[^\w\-_가-힣]', '_', name)
# 연속된 언더스코어 제거
safe_name = re.sub(r'_+', '_', safe_name)
# 앞뒤 언더스코어 제거
safe_name = safe_name.strip('_')
return safe_name or 'unnamed_section'
def should_regenerate_cache(cache_path: Path) -> bool:
"""
심플한 캐시 무효화 조건 검증
"""
metadata_path = cache_path / 'metadata.json'
if not metadata_path.exists():
return True
try:
with open(metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
# 조건 1: 섹션 수 부족 (이전 버전)
sections_found = metadata.get('sections_found', [])
if len(sections_found) < 2:
return True
# 조건 2: 사업의 내용 누락 (새 기능)
if 'business_content_found' not in metadata:
return True
# 조건 3: 회사의 개요 누락 (새 기능)
if 'company_overview_found' not in metadata:
return True
# 조건 4: 각 섹션의 라인 수가 너무 적음
for section in ['consolidated_notes', 'separate_notes']:
section_meta_path = cache_path / section / 'metadata.json'
if section_meta_path.exists():
with open(section_meta_path, 'r', encoding='utf-8') as f:
section_meta = json.load(f)
if section_meta.get('line_count', 0) < 100: # 최소 100라인
return True
return False
except:
return True
def detect_section_boundaries(xml_content: str, sections: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
각 섹션의 끝을 견고하게 탐지 (다중 fallback 전략)
"""
lines = xml_content.split('\n')
for section_name, section_info in sections.items():
start_line = section_info['start_line'] - 1 # 0-based index
# 다중 fallback 전략으로 끝점 탐지
end_line = len(lines) # 기본값: 파일 끝
# 전략 1: 특정 섹션별 끝점 패턴 (삼성전자 등)
if section_name == 'consolidated_notes':
# 연결재무제표 주석 → 4. 재무제표
for i in range(start_line + 1, len(lines)):
if re.search(r'<TITLE[^>]*>.*?4\.\s*재무제표.*?</TITLE>', lines[i]):
end_line = i
break
elif section_name == 'separate_notes':
# 재무제표 주석 → 6. 배당에 관한 사항
for i in range(start_line + 1, len(lines)):
if re.search(r'<TITLE[^>]*>.*?6\.\s*배당에\s*관한\s*사항.*?</TITLE>', lines[i]):
end_line = i
break
# 전략 2: 일반적인 다음 TITLE 태그 찾기 (숫자 패턴)
if end_line == len(lines):
for i in range(start_line + 1, len(lines)):
if re.search(r'<TITLE[^>]*>.*?\d+\.\s*[^<]*</TITLE>', lines[i]):
end_line = i
break
elif re.search(r'<TITLE[^>]*>.*?[IVX]+\.\s*[^<]*</TITLE>', lines[i]):
end_line = i
break
# 전략 3: SECTION-2 태그 완전 분석 (하위 섹션 고려)
if end_line == len(lines):
for i in range(start_line + 1, len(lines)):
# SECTION-2 끝 태그 찾기
if re.search(r'</SECTION-2>', lines[i]):
end_line = i + 1
break
# 다음 SECTION-1 태그 찾기
elif re.search(r'<SECTION-1[^>]*>', lines[i]):
end_line = i
break
# 전략 4: 다음 주요 섹션 찾기 (더 넓은 범위)
if end_line == len(lines):
for i in range(start_line + 1, min(start_line + 1000, len(lines))):
line = lines[i]
if any(keyword in line for keyword in ['<TITLE', '<SECTION-1', '<SECTION-2']):
end_line = i
break
# 안전하게 설정
sections[section_name]['end_line'] = end_line
sections[section_name]['line_count'] = end_line - start_line - 1
return sections
def extract_tables_from_section(xml_content: str, section_info: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
섹션에서 모든 테이블을 안전하게 추출
"""
lines = xml_content.split('\n')
start_line = section_info['start_line'] - 1
end_line = section_info.get('end_line', len(lines)) # 안전한 기본값
tables: List[Dict[str, Any]] = []
current_table: Optional[Dict[str, Any]] = None
in_table = False
current_row: List[str] = []
for i in range(start_line, end_line):
line = lines[i]
# 테이블 시작
if re.search(r'<TABLE[^>]*>', line):
if current_table:
tables.append(current_table)
current_table = {
'start_line': i + 1,
'content': [line],
'headers': [],
'rows': [],
'table_id': f"table_{len(tables) + 1:03d}"
}
in_table = True
current_row = []
# 테이블 내부
elif in_table and current_table:
current_table['content'].append(line)
# 헤더 추출
if re.search(r'<TH[^>]*>', line):
header_text = re.sub(r'<[^>]+>', '', line).strip()
if header_text:
current_table['headers'].append(header_text)
# 데이터 행 추출
elif re.search(r'<TD[^>]*>', line):
cell_text = re.sub(r'<[^>]+>', '', line).strip()
if cell_text:
current_row.append(cell_text)
# 행 끝
elif re.search(r'</TR>', line):
if current_row:
current_table['rows'].append(current_row)
current_row = []
# 테이블 끝 (더 정확한 패턴)
elif re.search(r'</TABLE>', line):
current_table['end_line'] = i + 1
current_table['content'].append(line)
# 테이블 완성도 검증
if current_table.get('headers') or current_table.get('rows'):
tables.append(current_table)
current_table = None
in_table = False
current_row = []
# 마지막 테이블 처리
if current_table:
tables.append(current_table)
return tables
def extract_paragraphs_from_section(xml_content: str, section_info: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
섹션에서 모든 문단을 견고하게 추출
"""
lines = xml_content.split('\n')
start_line = section_info['start_line'] - 1
end_line = section_info.get('end_line', len(lines)) # 안전한 기본값
paragraphs: List[Dict[str, Any]] = []
current_paragraph = ""
in_paragraph = False
para_start_line = 0
for i in range(start_line, end_line):
line = lines[i]
# P 태그 시작 감지 (더 정확한 패턴)
if re.search(r'<P[^>]*>', line) and not re.search(r'</P>', line):
in_paragraph = True
para_start_line = i + 1
# P 태그 내용 추출 (중첩 태그 제거)
para_content = re.sub(r'<[^>]+>', '', line).strip()
current_paragraph = para_content
elif in_paragraph:
# P 태그 내부 내용 (여러 줄에 걸친 경우)
if re.search(r'</P>', line):
# P 태그 끝
para_content = re.sub(r'<[^>]+>', '', line).strip()
current_paragraph += " " + para_content
# 완전한 문단 저장
if current_paragraph and len(current_paragraph) > 10:
paragraphs.append({
'line_number': para_start_line,
'content': current_paragraph,
'para_id': f"para_{len(paragraphs) + 1:03d}"
})
in_paragraph = False
current_paragraph = ""
else:
# P 태그 내부 내용 추가
para_content = re.sub(r'<[^>]+>', '', line).strip()
if para_content:
current_paragraph += " " + para_content
elif re.search(r'<P[^>]*>.*?</P>', line):
# 한 줄에 완전한 P 태그
para_content = re.sub(r'<[^>]+>', '', line).strip()
if para_content and len(para_content) > 10:
paragraphs.append({
'line_number': i + 1,
'content': para_content,
'para_id': f"para_{len(paragraphs) + 1:03d}"
})
return paragraphs
def extract_financial_notes(xml_content: str, corp_name: str, rcp_no: str) -> Dict[str, Any]:
"""
연결재무제표 주석, 재무제표 주석, 사업의 내용, 회사의 개요를 추출
하드코딩 최소화
"""
# 1. 재무제표 주석 섹션 탐지
sections = detect_financial_notes_sections(xml_content)
# 2. 사업의 내용 섹션 탐지
business_sections = detect_business_content_sections(xml_content)
# 3. 회사의 개요 섹션 탐지
overview_sections = detect_company_overview_sections(xml_content)
if not sections and not business_sections and not overview_sections:
return {
'status': 'error',
'message': '재무제표 주석, 사업의 내용, 회사의 개요 섹션을 찾을 수 없습니다.'
}
# 4. 재무제표 주석 섹션 경계 탐지
if sections:
sections = detect_section_boundaries(xml_content, sections)
# 5. 사업의 내용 하위 섹션 탐지
business_data = {}
if business_sections:
business_section = business_sections['business_content']
business_subsections = detect_business_subsections(xml_content, business_section)
for subsection_name, subsection_info in business_subsections.items():
tables = extract_tables_from_section(xml_content, subsection_info)
paragraphs = extract_paragraphs_from_section(xml_content, subsection_info)
business_data[subsection_name] = {
'title': subsection_info['title'],
'start_line': subsection_info['start_line'],
'end_line': subsection_info['end_line'],
'tables': tables,
'paragraphs': paragraphs,
'table_count': len(tables),
'paragraph_count': len(paragraphs)
}
# 6. 회사의 개요 하위 섹션 탐지
overview_data = {}
if overview_sections:
overview_section = overview_sections['company_overview']
overview_subsections = detect_company_overview_subsections(xml_content, overview_section)
for subsection_name, subsection_info in overview_subsections.items():
tables = extract_tables_from_section(xml_content, subsection_info)
paragraphs = extract_paragraphs_from_section(xml_content, subsection_info)
overview_data[subsection_name] = {
'title': subsection_info['title'],
'start_line': subsection_info['start_line'],
'end_line': subsection_info['end_line'],
'tables': tables,
'paragraphs': paragraphs,
'table_count': len(tables),
'paragraph_count': len(paragraphs)
}
# 7. 각 섹션에서 데이터 추출
extracted_data = {
'metadata': {
'corp_name': corp_name,
'rcp_no': rcp_no,
'extraction_date': datetime.now().isoformat(),
'sections_found': list(sections.keys()) if sections else [],
'business_content_found': bool(business_data),
'company_overview_found': bool(overview_data)
},
'sections': {}
}
# 재무제표 주석 처리
for section_name, section_info in sections.items():
tables = extract_tables_from_section(xml_content, section_info)
paragraphs = extract_paragraphs_from_section(xml_content, section_info)
extracted_data['sections'][section_name] = {
'title': section_info['title'],
'start_line': section_info['start_line'],
'end_line': section_info['end_line'],
'line_count': section_info['line_count'],
'tables': tables,
'paragraphs': paragraphs,
'table_count': len(tables),
'paragraph_count': len(paragraphs)
}
# 사업의 내용 처리
if business_data:
extracted_data['business_content'] = business_data # type: ignore
# 회사의 개요 처리
if overview_data:
extracted_data['company_overview'] = overview_data # type: ignore
return extracted_data
def save_extracted_data(extracted_data: Dict[str, Any], base_path: str) -> str:
"""
추출된 데이터를 분할하여 저장
disclosure_cache 디렉토리에 저장
"""
base_path_obj = Path(base_path)
base_path_obj.mkdir(parents=True, exist_ok=True)
# 메타데이터 저장
with open(base_path_obj / 'metadata.json', 'w', encoding='utf-8') as f:
json.dump(extracted_data['metadata'], f, ensure_ascii=False, indent=2)
# 캐시 메타데이터 생성 (기존 구조와 일관성 유지)
cache_metadata = {
"file_path": f"disclosure_{extracted_data['metadata']['rcp_no']}.xml",
"file_size": 0, # 실제 파일 크기는 별도로 계산
"parsed_at": f"{{\"timestamp\": \"{datetime.now().timestamp()}\"}}",
"document_data": {
"root_tag": "DOCUMENT",
"summary": f"재무제표 주석 추출 완료 - {extracted_data['metadata']['corp_name']}",
"extracted_info": {
"document_name": "재무제표 주석",
"company_name": extracted_data['metadata']['corp_name'],
"rcp_no": extracted_data['metadata']['rcp_no'],
"extraction_date": extracted_data['metadata']['extraction_date'],
"sections_found": extracted_data['metadata']['sections_found']
},
"financial_notes_summary": {
"total_tables": sum(section['table_count'] for section in extracted_data['sections'].values()),
"total_paragraphs": sum(section['paragraph_count'] for section in extracted_data['sections'].values()),
"sections": {name: {
"title": data['title'],
"table_count": data['table_count'],
"paragraph_count": data['paragraph_count']
} for name, data in extracted_data['sections'].items()}
},
"note": "재무제표 주석이 성공적으로 추출되었습니다. 상세 데이터는 하위 디렉토리를 참조하세요."
}
}
# 캐시 메타데이터 저장
with open(base_path_obj / f"disclosure_{extracted_data['metadata']['rcp_no']}.json", 'w', encoding='utf-8') as f:
json.dump(cache_metadata, f, ensure_ascii=False, indent=2)
# 각 섹션별 저장
for section_name, section_data in extracted_data['sections'].items():
section_path = base_path_obj / section_name
section_path.mkdir(parents=True, exist_ok=True)
# 섹션 메타데이터
section_meta = {
'title': section_data['title'],
'start_line': section_data['start_line'],
'end_line': section_data['end_line'],
'line_count': section_data['line_count'],
'table_count': section_data['table_count'],
'paragraph_count': section_data['paragraph_count']
}
with open(section_path / 'metadata.json', 'w', encoding='utf-8') as f:
json.dump(section_meta, f, ensure_ascii=False, indent=2)
# 테이블별 저장
tables_path = section_path / 'tables'
tables_path.mkdir(exist_ok=True)
for table in section_data['tables']:
table_file = tables_path / f"{table['table_id']}.json"
with open(table_file, 'w', encoding='utf-8') as f:
json.dump(table, f, ensure_ascii=False, indent=2)
# 문단별 저장
paragraphs_path = section_path / 'paragraphs'
paragraphs_path.mkdir(exist_ok=True)
for para in section_data['paragraphs']:
para_file = paragraphs_path / f"{para['para_id']}.json"
with open(para_file, 'w', encoding='utf-8') as f:
json.dump(para, f, ensure_ascii=False, indent=2)
# 사업의 내용 저장
if 'business_content' in extracted_data:
business_path = base_path_obj / 'business_content'
business_path.mkdir(parents=True, exist_ok=True)
# 사업의 내용 메타데이터
business_meta = {
'section_type': 'business_content',
'extraction_date': extracted_data['metadata']['extraction_date'],
'subsections_found': list(extracted_data['business_content'].keys()),
'total_tables': sum(sub.get('table_count', 0) for sub in extracted_data['business_content'].values()),
'total_paragraphs': sum(sub.get('paragraph_count', 0) for sub in extracted_data['business_content'].values())
}
with open(business_path / 'metadata.json', 'w', encoding='utf-8') as f:
json.dump(business_meta, f, ensure_ascii=False, indent=2)
# 각 하위 섹션별 저장
subsections_path = business_path / 'subsections'
subsections_path.mkdir(exist_ok=True)
for subsection_name, subsection_data in extracted_data['business_content'].items():
# 안전한 파일명으로 변환
safe_name = safe_filename(subsection_name)
subsection_path = subsections_path / safe_name
subsection_path.mkdir(parents=True, exist_ok=True)
# 섹션 메타데이터
section_meta = {
'original_name': subsection_name,
'safe_name': safe_name,
'title': subsection_data.get('title', subsection_name),
'start_line': subsection_data.get('start_line', 0),
'end_line': subsection_data.get('end_line', 0),
'table_count': subsection_data.get('table_count', 0),
'paragraph_count': subsection_data.get('paragraph_count', 0)
}
with open(subsection_path / 'metadata.json', 'w', encoding='utf-8') as f:
json.dump(section_meta, f, ensure_ascii=False, indent=2)
# 테이블 저장
if 'tables' in subsection_data:
tables_path = subsection_path / 'tables'
tables_path.mkdir(exist_ok=True)
for table in subsection_data['tables']:
table_file = tables_path / f"{table.get('table_id', 'table_001')}.json"
with open(table_file, 'w', encoding='utf-8') as f:
json.dump(table, f, ensure_ascii=False, indent=2)
# 문단 저장
if 'paragraphs' in subsection_data:
paragraphs_path = subsection_path / 'paragraphs'
paragraphs_path.mkdir(exist_ok=True)
for para in subsection_data['paragraphs']:
para_file = paragraphs_path / f"{para.get('para_id', 'para_001')}.json"
with open(para_file, 'w', encoding='utf-8') as f:
json.dump(para, f, ensure_ascii=False, indent=2)
# 회사의 개요 저장
if 'company_overview' in extracted_data:
overview_path = base_path_obj / 'company_overview'
overview_path.mkdir(parents=True, exist_ok=True)
# 회사의 개요 메타데이터
overview_meta = {
'section_type': 'company_overview',
'extraction_date': extracted_data['metadata']['extraction_date'],
'subsections_found': list(extracted_data['company_overview'].keys()),
'total_tables': sum(sub.get('table_count', 0) for sub in extracted_data['company_overview'].values()),
'total_paragraphs': sum(sub.get('paragraph_count', 0) for sub in extracted_data['company_overview'].values())
}
with open(overview_path / 'metadata.json', 'w', encoding='utf-8') as f:
json.dump(overview_meta, f, ensure_ascii=False, indent=2)
# 각 하위 섹션별 저장
subsections_path = overview_path / 'subsections'
subsections_path.mkdir(exist_ok=True)
for subsection_name, subsection_data in extracted_data['company_overview'].items():
# 안전한 파일명으로 변환
safe_name = safe_filename(subsection_name)
subsection_path = subsections_path / safe_name
subsection_path.mkdir(parents=True, exist_ok=True)
# 섹션 메타데이터
section_meta = {
'original_name': subsection_name,
'safe_name': safe_name,
'title': subsection_data.get('title', subsection_name),
'start_line': subsection_data.get('start_line', 0),
'end_line': subsection_data.get('end_line', 0),
'table_count': subsection_data.get('table_count', 0),
'paragraph_count': subsection_data.get('paragraph_count', 0)
}
with open(subsection_path / 'metadata.json', 'w', encoding='utf-8') as f:
json.dump(section_meta, f, ensure_ascii=False, indent=2)
# 테이블 저장
if 'tables' in subsection_data:
tables_path = subsection_path / 'tables'
tables_path.mkdir(exist_ok=True)
for table in subsection_data['tables']:
table_file = tables_path / f"{table.get('table_id', 'table_001')}.json"
with open(table_file, 'w', encoding='utf-8') as f:
json.dump(table, f, ensure_ascii=False, indent=2)
# 문단 저장
if 'paragraphs' in subsection_data:
paragraphs_path = subsection_path / 'paragraphs'
paragraphs_path.mkdir(exist_ok=True)
for para in subsection_data['paragraphs']:
para_file = paragraphs_path / f"{para.get('para_id', 'para_001')}.json"
with open(para_file, 'w', encoding='utf-8') as f:
json.dump(para, f, ensure_ascii=False, indent=2)
return str(base_path_obj)