import requests
import os
import json
import schedule
import threading
import time
from datetime import datetime
from pathlib import Path
# Notion API 도구 (API 토큰 인증 전용)
def convert_rich_text_to_markdown(rich_text_list):
"""Rich text 배열을 마크다운으로 변환"""
if not rich_text_list:
return ""
result = ""
for text_obj in rich_text_list:
text = text_obj.get('plain_text', '')
annotations = text_obj.get('annotations', {})
href = text_obj.get('href')
# 링크 처리
if href:
text = f"[{text}]({href})"
# 텍스트 스타일 적용
if annotations.get('bold'):
text = f"**{text}**"
if annotations.get('italic'):
text = f"*{text}*"
if annotations.get('strikethrough'):
text = f"~~{text}~~"
if annotations.get('underline'):
text = f"<u>{text}</u>"
if annotations.get('code'):
text = f"`{text}`"
result += text
return result
def find_archive_database(notion_token):
"""
노션에서 문서 아카이브 데이터베이스를 찾습니다.
'아카이브', 'archive', '문서', 'document' 등의 키워드로 검색합니다.
"""
try:
headers = {
"Authorization": f"Bearer {notion_token}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# 아카이브 관련 키워드로 검색
archive_keywords = ["아카이브", "archive", "문서", "document", "docs", "knowledge"]
found_databases = []
for keyword in archive_keywords:
search_url = "https://api.notion.com/v1/search"
search_data = {
"query": keyword,
"filter": {
"value": "database",
"property": "object"
},
"page_size": 10
}
response = requests.post(search_url, headers=headers, json=search_data, timeout=10)
if response.status_code == 200:
results = response.json().get('results', [])
for db in results:
# 데이터베이스 제목 추출
title = ""
if 'title' in db:
title = ''.join([text.get('plain_text', '') for text in db['title']])
found_databases.append({
'id': db['id'],
'title': title,
'url': db.get('url', ''),
'created_time': db.get('created_time', ''),
'last_edited_time': db.get('last_edited_time', '')
})
# 중복 제거 (ID 기준)
unique_databases = []
seen_ids = set()
for db in found_databases:
if db['id'] not in seen_ids:
unique_databases.append(db)
seen_ids.add(db['id'])
return unique_databases
except Exception as e:
print(f"아카이브 데이터베이스 검색 중 오류: {e}")
return []
def get_archive_pages(notion_token, database_id=None):
"""
아카이브 데이터베이스의 페이지들을 조회합니다.
database_id가 없으면 자동으로 아카이브 데이터베이스를 찾습니다.
"""
try:
headers = {
"Authorization": f"Bearer {notion_token}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# 데이터베이스 ID가 없으면 아카이브 데이터베이스 찾기
if not database_id:
archive_dbs = find_archive_database(notion_token)
if not archive_dbs:
return {"error": "아카이브 데이터베이스를 찾을 수 없습니다."}
database_id = archive_dbs[0]['id'] # 첫 번째 아카이브 데이터베이스 사용
# 데이터베이스 페이지들 조회
query_url = f"https://api.notion.com/v1/databases/{database_id}/query"
query_data = {"page_size": 100}
query_response = requests.post(query_url, headers=headers, json=query_data, timeout=10)
if query_response.status_code != 200:
return {"error": f"데이터베이스 쿼리 실패: {query_response.status_code}"}
query_data = query_response.json()
pages = []
for page in query_data.get('results', []):
# 페이지 제목 추출
title = ""
if 'properties' in page:
for prop_name, prop_data in page['properties'].items():
if prop_data.get('type') == 'title' and prop_data.get('title'):
title = ''.join([text.get('plain_text', '') for text in prop_data['title']])
break
pages.append({
'id': page['id'],
'title': title,
'url': page.get('url', ''),
'created_time': page.get('created_time', ''),
'last_edited_time': page.get('last_edited_time', ''),
'properties': page.get('properties', {})
})
return {
"database_id": database_id,
"total_pages": len(pages),
"pages": pages
}
except Exception as e:
return {"error": f"아카이브 페이지 조회 중 오류: {e}"}
def sync_archive_pages(app, notion_token, data_dir):
"""
아카이브 페이지들을 동기화합니다.
각 페이지의 내용을 마크다운 파일로도 저장합니다.
"""
try:
# 아카이브 페이지들 조회
archive_data = get_archive_pages(notion_token)
if "error" in archive_data:
app.logger.info(f"아카이브 동기화 실패: {archive_data['error']}")
return False
# 동기화 결과 저장 (JSON)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
result_path = os.path.join(data_dir, f'archive_sync_{timestamp}.json')
with open(result_path, 'w', encoding='utf-8') as f:
json.dump(archive_data, f, indent=2, ensure_ascii=False)
# 각 페이지의 내용을 마크다운으로 저장
markdown_files = []
for page in archive_data['pages']:
page_id = page['id']
page_title = page['title']
# 안전한 파일명 생성
safe_title = page_title.replace(' ', '_').replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_')
if not safe_title or safe_title == '_':
safe_title = f"페이지_{page_id[:8]}"
# 마크다운 파일 경로 (날짜 제거)
markdown_path = os.path.join(data_dir, f'{safe_title}.md')
# 페이지 내용을 마크다운으로 가져오기
get_notion_page_content(page_id, notion_token, markdown_path)
if os.path.exists(markdown_path):
markdown_files.append(markdown_path)
app.logger.info(f"마크다운 파일 생성: {markdown_path}")
app.logger.info(f"아카이브 동기화 완료: {archive_data['total_pages']}개 페이지")
app.logger.info(f"JSON 결과 저장: {result_path}")
app.logger.info(f"마크다운 파일 {len(markdown_files)}개 생성 완료")
return True
except Exception as e:
app.logger.info(f"아카이브 동기화 중 오류: {e}")
return False
def start_daily_sync(app, notion_token):
"""
매일 오전 9시에 아카이브 동기화를 실행하는 스케줄러를 시작합니다.
"""
def sync_job():
app.logger.info(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 일일 아카이브 동기화 시작")
# 데이터 디렉토리 생성
data_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data'))
os.makedirs(data_dir, exist_ok=True)
# 아카이브 동기화 실행
sync_archive_pages(app, notion_token, data_dir)
app.logger.info(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 일일 아카이브 동기화 완료")
# 매일 오전 9시에 실행
schedule.every().day.at("09:00").do(sync_job)
app.logger.info("일일 아카이브 동기화 스케줄러가 설정되었습니다. (매일 오전 9시)")
# 스케줄러 실행 (별도 스레드에서)
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60) # 1분마다 체크
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
return scheduler_thread
def get_notion_page_content(page_id, notion_token, data_path):
"""
Notion 페이지의 내용을 마크다운 형태로 가져와서 파일에 저장합니다.
"""
try:
headers = {
"Authorization": f"Bearer {notion_token}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# 페이지 정보 가져오기
page_url = f"https://api.notion.com/v1/pages/{page_id}"
page_response = requests.get(page_url, headers=headers, timeout=10)
if page_response.status_code != 200:
with open(data_path, 'w', encoding='utf-8') as f:
f.write(f"페이지 정보 조회 실패: {page_response.status_code}\n")
f.write(f"오류 내용: {page_response.text}\n")
return
page_data = page_response.json()
# 페이지 블록들 가져오기 (모든 페이지 가져오기)
all_blocks = []
next_cursor = None
while True:
blocks_url = f"https://api.notion.com/v1/blocks/{page_id}/children"
params = {"page_size": 100}
if next_cursor:
params["start_cursor"] = next_cursor
blocks_response = requests.get(blocks_url, headers=headers, params=params, timeout=10)
if blocks_response.status_code != 200:
with open(data_path, 'w', encoding='utf-8') as f:
f.write(f"페이지 블록 조회 실패: {blocks_response.status_code}\n")
f.write(f"오류 내용: {blocks_response.text}\n")
return
blocks_data = blocks_response.json()
all_blocks.extend(blocks_data.get('results', []))
if not blocks_data.get('has_more'):
break
next_cursor = blocks_data.get('next_cursor')
# 결과 파일에 저장
with open(data_path, 'w', encoding='utf-8') as f:
# 페이지 제목 (properties에서 추출)
page_title = ""
if 'properties' in page_data:
for prop_name, prop_data in page_data['properties'].items():
if prop_data.get('type') == 'title' and prop_data.get('title'):
page_title = convert_rich_text_to_markdown(prop_data['title'])
break
# 마크다운 헤더
f.write(f"# {page_title}\n\n")
f.write(f"*페이지 ID: {page_id}* \n")
f.write(f"*조회 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* \n\n")
# 페이지 속성 정보를 YAML front matter로 추가
if 'properties' in page_data:
yaml_properties = {}
for prop_name, prop_data in page_data['properties'].items():
if prop_name != 'title': # 제목은 이미 위에 표시됨
prop_type = prop_data.get('type', 'unknown')
if prop_type == 'rich_text':
text_content = convert_rich_text_to_markdown(prop_data.get('rich_text', []))
if text_content.strip():
yaml_properties[prop_name] = text_content.strip()
elif prop_type == 'url':
url_value = prop_data.get('url', '')
if url_value:
yaml_properties[prop_name] = url_value
elif prop_type == 'select':
select_data = prop_data.get('select')
if select_data and select_data.get('name'):
yaml_properties[prop_name] = select_data.get('name')
elif prop_type == 'people':
people_data = prop_data.get('people', [])
if people_data:
names = [person.get('name', '') for person in people_data if person.get('name')]
if names:
yaml_properties[prop_name] = names if len(names) > 1 else names[0]
elif prop_type == 'created_time':
created_time = prop_data.get('created_time', '')
if created_time:
# 날짜 형식을 더 읽기 쉽게 변환
try:
dt = datetime.fromisoformat(created_time.replace('Z', '+00:00'))
yaml_properties[prop_name] = dt.strftime('%Y-%m-%d')
except:
yaml_properties[prop_name] = created_time
elif prop_type == 'checkbox':
yaml_properties[prop_name] = prop_data.get('checkbox', False)
elif prop_type == 'number':
number_value = prop_data.get('number')
if number_value is not None:
yaml_properties[prop_name] = number_value
elif prop_type == 'email':
email_value = prop_data.get('email', '')
if email_value:
yaml_properties[prop_name] = email_value
elif prop_type == 'phone_number':
phone_value = prop_data.get('phone_number', '')
if phone_value:
yaml_properties[prop_name] = phone_value
# YAML front matter 생성
if yaml_properties:
import yaml
yaml_content = yaml.dump(yaml_properties, default_flow_style=False, allow_unicode=True, sort_keys=False)
f.write(f"---\n{yaml_content}---\n\n")
# 페이지 내용 (블록들)
list_number = 1
in_list = False
for block in all_blocks:
block_type = block.get('type', '')
block_content = block.get(block_type, {})
block_id = block.get('id', '')
# 리스트 번호 초기화
if block_type not in ['numbered_list_item', 'bulleted_list_item']:
list_number = 1
if in_list:
f.write("\n")
in_list = False
if block_type == 'paragraph':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
# 빈 내용도 표시하도록 수정
f.write(f"{text_content}\n\n")
elif block_type == 'heading_1':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
f.write(f"# {text_content}\n\n")
elif block_type == 'heading_2':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
f.write(f"## {text_content}\n\n")
elif block_type == 'heading_3':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
f.write(f"### {text_content}\n\n")
elif block_type == 'bulleted_list_item':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
f.write(f"- {text_content}\n")
in_list = True
elif block_type == 'numbered_list_item':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
f.write(f"{list_number}. {text_content}\n")
list_number += 1
in_list = True
elif block_type == 'code':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
language = block_content.get('language', '')
# 마크다운 코드 블록인지 확인
if language.lower() in ['markdown', 'md'] or 'markdown' in text_content.lower()[:50]:
# 마크다운 내용을 그대로 출력
f.write(f"{text_content}\n\n")
else:
f.write(f"```{language}\n{text_content}\n```\n\n")
elif block_type == 'quote':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
f.write(f"> {text_content}\n\n")
elif block_type == 'divider':
f.write("---\n\n")
elif block_type == 'to_do':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
checked = block_content.get('checked', False)
checkbox = "- [x]" if checked else "- [ ]"
f.write(f"{checkbox} {text_content}\n")
elif block_type == 'callout':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
icon = block_content.get('icon', {}).get('emoji', '💡')
f.write(f"> {icon} {text_content}\n\n")
elif block_type == 'toggle':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
f.write(f"<details>\n<summary>{text_content}</summary>\n\n")
# 토글 내용은 별도로 처리해야 함 (현재는 간단히 처리)
f.write("</details>\n\n")
elif block_type == 'table':
# 테이블 처리 (간단한 형태)
f.write(f"| {block_content.get('table_width', 1)} |\n")
f.write("| --- |\n")
f.write("| 내용 |\n\n")
elif block_type == 'image':
image_url = ""
if block_content.get('type') == 'external':
image_url = block_content.get('external', {}).get('url', '')
elif block_content.get('type') == 'file':
image_url = block_content.get('file', {}).get('url', '')
caption = convert_rich_text_to_markdown(block_content.get('caption', []))
if image_url:
f.write(f"\n\n")
elif block_type == 'video':
video_url = ""
if block_content.get('type') == 'external':
video_url = block_content.get('external', {}).get('url', '')
elif block_content.get('type') == 'file':
video_url = block_content.get('file', {}).get('url', '')
caption = convert_rich_text_to_markdown(block_content.get('caption', []))
if video_url:
f.write(f"\n\n")
elif block_type == 'file':
file_url = ""
if block_content.get('type') == 'external':
file_url = block_content.get('external', {}).get('url', '')
elif block_content.get('type') == 'file':
file_url = block_content.get('file', {}).get('url', '')
caption = convert_rich_text_to_markdown(block_content.get('caption', []))
if file_url:
f.write(f"[{caption}]({file_url})\n\n")
elif block_type == 'child_database':
title = block_content.get('title', '')
f.write(f"## 📊 {title}\n\n")
f.write(f"*데이터베이스 ID: {block_id}*\n\n")
elif block_type == 'child_page':
title = convert_rich_text_to_markdown(block_content.get('title', []))
f.write(f"## 📄 {title}\n\n")
f.write(f"*페이지 ID: {block_id}*\n\n")
else:
# 기타 블록 타입은 상세히 표시
f.write(f"<!-- {block_type.upper()}: {json.dumps(block_content, ensure_ascii=False)} -->\n\n")
# 모든 블록 타입에 대해 텍스트 내용이 있는지 확인
if 'rich_text' in block_content:
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
if text_content.strip():
f.write(f"**{block_type} 내용**: {text_content}\n\n")
# 원본 JSON 데이터 제거 (깔끔한 마크다운만 유지)
except Exception as e:
with open(data_path, 'w', encoding='utf-8') as f:
f.write(f"Notion 페이지 조회 중 오류 발생: {str(e)}\n")
def get_notion_database_content(database_id, notion_token, data_path):
"""
Notion 데이터베이스의 내용을 가져와서 파일에 저장합니다.
"""
try:
headers = {
"Authorization": f"Bearer {notion_token}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# 데이터베이스 정보 가져오기
db_url = f"https://api.notion.com/v1/databases/{database_id}"
db_response = requests.get(db_url, headers=headers, timeout=10)
if db_response.status_code != 200:
with open(data_path, 'w', encoding='utf-8') as f:
f.write(f"데이터베이스 정보 조회 실패: {db_response.status_code}\n")
f.write(f"오류 내용: {db_response.text}\n")
return
db_data = db_response.json()
# 데이터베이스 페이지들 가져오기
query_url = f"https://api.notion.com/v1/databases/{database_id}/query"
query_data = {"page_size": 100}
query_response = requests.post(query_url, headers=headers, json=query_data, timeout=10)
if query_response.status_code != 200:
with open(data_path, 'w', encoding='utf-8') as f:
f.write(f"데이터베이스 쿼리 실패: {query_response.status_code}\n")
f.write(f"오류 내용: {query_response.text}\n")
return
query_data = query_response.json()
# 결과 파일에 저장
with open(data_path, 'w', encoding='utf-8') as f:
f.write(f"=== Notion 데이터베이스 정보 ===\n")
f.write(f"데이터베이스 ID: {database_id}\n")
f.write(f"조회 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# 데이터베이스 제목
if 'title' in db_data:
title_text = ''.join([text.get('plain_text', '') for text in db_data['title']])
f.write(f"데이터베이스 제목: {title_text}\n\n")
# 데이터베이스 설명
if 'description' in db_data and db_data['description']:
desc_text = ''.join([text.get('plain_text', '') for text in db_data['description']])
f.write(f"설명: {desc_text}\n\n")
# 속성 정보
f.write("=== 데이터베이스 속성 ===\n")
for prop_name, prop_data in db_data.get('properties', {}).items():
prop_type = prop_data.get('type', 'unknown')
f.write(f"- {prop_name}: {prop_type}\n")
f.write("\n")
# 페이지들
f.write(f"=== 데이터베이스 페이지들 (총 {len(query_data.get('results', []))}개) ===\n")
for i, page in enumerate(query_data.get('results', []), 1):
f.write(f"\n--- 페이지 {i} ---\n")
f.write(f"페이지 ID: {page.get('id', 'N/A')}\n")
f.write(f"페이지 URL: {page.get('url', 'N/A')}\n")
f.write(f"생성일: {page.get('created_time', 'N/A')}\n")
f.write(f"수정일: {page.get('last_edited_time', 'N/A')}\n")
# 페이지 속성들
f.write("속성:\n")
for prop_name, prop_data in page.get('properties', {}).items():
prop_type = prop_data.get('type', 'unknown')
f.write(f" - {prop_name} ({prop_type}): ")
if prop_type == 'title':
title_text = ''.join([text.get('plain_text', '') for text in prop_data.get('title', [])])
f.write(f"{title_text}\n")
elif prop_type == 'rich_text':
text_content = ''.join([text.get('plain_text', '') for text in prop_data.get('rich_text', [])])
f.write(f"{text_content}\n")
elif prop_type == 'select':
select_data = prop_data.get('select')
if select_data:
select_name = select_data.get('name', '')
f.write(f"{select_name}\n")
else:
f.write("선택 없음\n")
elif prop_type == 'multi_select':
multi_select_data = prop_data.get('multi_select', [])
if multi_select_data:
select_names = [item.get('name', '') for item in multi_select_data]
f.write(f"{', '.join(select_names)}\n")
else:
f.write("선택 없음\n")
elif prop_type == 'date':
date_info = prop_data.get('date')
if date_info:
f.write(f"{date_info.get('start', '')} ~ {date_info.get('end', '')}\n")
else:
f.write("날짜 없음\n")
elif prop_type == 'checkbox':
f.write(f"{'체크됨' if prop_data.get('checkbox') else '체크 안됨'}\n")
elif prop_type == 'number':
f.write(f"{prop_data.get('number', 'N/A')}\n")
elif prop_type == 'url':
f.write(f"{prop_data.get('url', 'N/A')}\n")
elif prop_type == 'email':
f.write(f"{prop_data.get('email', 'N/A')}\n")
elif prop_type == 'phone_number':
f.write(f"{prop_data.get('phone_number', 'N/A')}\n")
else:
f.write(f"{json.dumps(prop_data, ensure_ascii=False)}\n")
# 원본 JSON 데이터도 저장 (디버깅용)
f.write("\n=== 원본 데이터베이스 데이터 (JSON) ===\n")
f.write(json.dumps(db_data, ensure_ascii=False, indent=2))
except Exception as e:
with open(data_path, 'w', encoding='utf-8') as f:
f.write(f"Notion 데이터베이스 조회 중 오류 발생: {str(e)}\n")
def register(app):
# 스케줄러 시작 (앱 시작 시)
notion_token = os.getenv('NOTION_TOKEN')
if notion_token:
start_daily_sync(app, notion_token)
@app.resource("notion://data/files")
async def get_data_files_resource():
"""동기화된 데이터 파일들 리소스 (마크다운 형식)"""
data_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data'))
if not os.path.exists(data_dir):
return "# 데이터 파일 목록\n\n> 데이터 디렉토리가 존재하지 않습니다.\n"
files = []
for file_path in os.listdir(data_dir):
full_path = os.path.join(data_dir, file_path)
if os.path.isfile(full_path):
files.append({
"name": file_path,
"path": full_path,
"size": os.path.getsize(full_path),
"modified": os.path.getmtime(full_path)
})
# 마크다운 형식으로 변환
markdown_content = f"# 동기화된 데이터 파일 목록\n\n"
markdown_content += f"**디렉토리**: `{data_dir}`\n"
markdown_content += f"**총 파일 수**: {len(files)}개\n"
markdown_content += f"**조회 시간**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
if files:
# 파일 유형별로 분류
json_files = [f for f in files if f['name'].endswith('.json')]
md_files = [f for f in files if f['name'].endswith('.md')]
txt_files = [f for f in files if f['name'].endswith('.txt')]
other_files = [f for f in files if not f['name'].endswith(('.json', '.md', '.txt'))]
if json_files:
markdown_content += "## 📊 JSON 파일\n\n"
for file_info in json_files:
size_kb = file_info['size'] / 1024
modified_time = datetime.fromtimestamp(file_info['modified']).strftime('%Y-%m-%d %H:%M:%S')
markdown_content += f"- **{file_info['name']}** ({size_kb:.1f}KB) - {modified_time}\n"
markdown_content += "\n"
if md_files:
markdown_content += "## 📝 마크다운 파일\n\n"
for file_info in md_files:
size_kb = file_info['size'] / 1024
modified_time = datetime.fromtimestamp(file_info['modified']).strftime('%Y-%m-%d %H:%M:%S')
markdown_content += f"- **{file_info['name']}** ({size_kb:.1f}KB) - {modified_time}\n"
markdown_content += "\n"
if txt_files:
markdown_content += "## 📄 텍스트 파일\n\n"
for file_info in txt_files:
size_kb = file_info['size'] / 1024
modified_time = datetime.fromtimestamp(file_info['modified']).strftime('%Y-%m-%d %H:%M:%S')
markdown_content += f"- **{file_info['name']}** ({size_kb:.1f}KB) - {modified_time}\n"
markdown_content += "\n"
if other_files:
markdown_content += "## 📁 기타 파일\n\n"
for file_info in other_files:
size_kb = file_info['size'] / 1024
modified_time = datetime.fromtimestamp(file_info['modified']).strftime('%Y-%m-%d %H:%M:%S')
markdown_content += f"- **{file_info['name']}** ({size_kb:.1f}KB) - {modified_time}\n"
markdown_content += "\n"
else:
markdown_content += "> 데이터 파일이 없습니다.\n"
return markdown_content
@app.prompt()
async def notion_content_search():
"""노션 콘텐츠 검색 프롬프트 - 특정 키워드나 주제로 아카이브를 검색합니다."""
return {
"description": "노션 아카이브에서 특정 키워드나 주제와 관련된 페이지를 찾아 관련 정보를 제공합니다.",
"arguments": [
{
"name": "search_keyword",
"description": "검색할 키워드나 주제",
"required": True
},
{
"name": "search_scope",
"description": "검색 범위 (title, content, all)",
"required": False
}
]
}
@app.prompt()
async def notion_sync_status():
"""노션 동기화 상태 확인 프롬프트 - 동기화 상태와 최근 변경사항을 확인합니다."""
return {
"description": "노션 아카이브의 동기화 상태를 확인하고, 최근 변경된 페이지나 동기화 이력을 제공합니다.",
"arguments": [
{
"name": "time_range",
"description": "확인할 시간 범위 (today, week, month)",
"required": False
}
]
}
@app.tool()
async def notion_sync_archive(notion_token: str = None):
"""
지금 즉시 아카이브 페이지들을 동기화합니다.
Args:
notion_token: Notion API 토큰 (환경변수 NOTION_TOKEN에서 자동 로드)
"""
try:
# 환경변수에서 토큰 가져오기
if not notion_token:
notion_token = os.getenv('NOTION_TOKEN')
if not notion_token:
return {"content": [{"type": "text", "text": "Notion API 토큰이 필요합니다. NOTION_TOKEN 환경변수를 설정하거나 notion_token 파라미터를 제공하세요."}]}
# 데이터 디렉토리 생성
data_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data'))
os.makedirs(data_dir, exist_ok=True)
# 아카이브 동기화 실행
success = sync_archive_pages(app, notion_token, data_dir)
if success:
return {"content": [{"type": "text", "text": "아카이브 동기화가 성공적으로 완료되었습니다!"}]}
else:
return {"content": [{"type": "text", "text": "아카이브 동기화에 실패했습니다. 로그를 확인해주세요."}]}
except Exception as e:
app.logger.error(f"[NOTION_ARCHIVE_SYNC_ERR] {str(e)}")
return {"content": [{"type": "text", "text": f"아카이브 동기화 중 오류 발생: {str(e)}"}]}
@app.tool()
async def notion_search_pages(query: str, notion_token: str = None):
"""
Notion에서 페이지를 검색합니다.
Args:
query: 검색할 키워드
notion_token: Notion API 토큰 (환경변수 NOTION_TOKEN에서 자동 로드)
"""
try:
# 환경변수에서 토큰 가져오기
if not notion_token:
notion_token = os.getenv('NOTION_TOKEN')
if not notion_token:
return {"content": [{"type": "text", "text": "Notion API 토큰이 필요합니다. NOTION_TOKEN 환경변수를 설정하거나 notion_token 파라미터를 제공하세요."}]}
app.logger.info("notion_token: " + notion_token)
headers = {
"Authorization": f"Bearer {notion_token}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# 검색 요청
search_url = "https://api.notion.com/v1/search"
search_data = {
"query": query,
"page_size": 10
}
response = requests.post(search_url, headers=headers, json=search_data, timeout=10)
if response.status_code != 200:
return {"content": [{"type": "text", "text": f"Notion 검색 실패: {response.status_code} - {response.text}"}]}
search_results = response.json()
# 결과 저장
data_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data'))
os.makedirs(data_dir, exist_ok=True)
result_path = os.path.join(data_dir, f'notion_search_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
with open(result_path, 'w', encoding='utf-8') as f:
f.write(f"=== Notion 검색 결과 ===\n")
f.write(f"검색어: {query}\n")
f.write(f"검색 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"총 결과 수: {len(search_results.get('results', []))}\n\n")
for i, page in enumerate(search_results.get('results', []), 1):
f.write(f"--- 결과 {i} ---\n")
f.write(f"페이지 ID: {page.get('id', 'N/A')}\n")
f.write(f"페이지 URL: {page.get('url', 'N/A')}\n")
# 페이지 제목 추출
if 'properties' in page:
for prop_name, prop_data in page['properties'].items():
if prop_data.get('type') == 'title' and prop_data.get('title'):
title_text = ''.join([text.get('plain_text', '') for text in prop_data['title']])
f.write(f"제목: {title_text}\n")
break
f.write(f"생성일: {page.get('created_time', 'N/A')}\n")
f.write(f"수정일: {page.get('last_edited_time', 'N/A')}\n")
f.write("\n")
return {"content": [{"type": "text", "text": f"Notion 검색 완료! '{query}'에 대한 {len(search_results.get('results', []))}개 결과를 찾았습니다.\n결과는 {result_path}에 저장됩니다."}]}
except Exception as e:
app.logger.info(f"[NOTION_SEARCH_ERR] {str(e)}")
return {"content": [{"type": "text", "text": f"Notion 검색 중 오류 발생: {str(e)}"}]}
@app.tool()
async def notion_get_database(database_id: str, notion_token: str = None):
"""
Notion 데이터베이스의 내용을 가져옵니다.
Args:
database_id: Notion 데이터베이스 ID (URL에서 추출)
notion_token: Notion API 토큰 (환경변수 NOTION_TOKEN에서 자동 로드)
"""
try:
# 환경변수에서 토큰 가져오기
if not notion_token:
notion_token = os.getenv('NOTION_TOKEN')
if not notion_token:
return {"content": [{"type": "text", "text": "Notion API 토큰이 필요합니다. NOTION_TOKEN 환경변수를 설정하거나 notion_token 파라미터를 제공하세요."}]}
# 데이터베이스 ID 정리 (URL에서 ID만 추출)
if 'notion.so' in database_id:
# URL에서 데이터베이스 ID 추출
database_id = database_id.split('/')[-1].split('?')[0].split('#')[0]
# 하이픈 제거
database_id = database_id.replace('-', '')
# 결과 저장 디렉토리 생성
data_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data'))
os.makedirs(data_dir, exist_ok=True)
# 결과 파일 경로
result_path = os.path.join(data_dir, f'notion_database_{database_id}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt')
# 비동기로 데이터베이스 내용 가져오기
import threading
threading.Thread(
target=get_notion_database_content,
args=(database_id, notion_token, result_path),
daemon=True
).start()
return {"content": [{"type": "text", "text": f"Notion 데이터베이스 조회 시작! (데이터베이스 ID: {database_id})\n결과는 {result_path}에 저장됩니다."}]}
except Exception as e:
app.logger.info(f"[NOTION_DB_ERR] {str(e)}")
return {"content": [{"type": "text", "text": f"Notion 데이터베이스 조회 중 오류 발생: {str(e)}"}]}
@app.tool()
async def notion_get_page(page_id: str, notion_token: str = None):
"""
Notion 페이지에서 마크다운 형태의 내용을 추출합니다.
Args:
page_id: Notion 페이지 ID (URL에서 추출)
notion_token: Notion API 토큰 (환경변수 NOTION_TOKEN에서 자동 로드)
"""
try:
# 환경변수에서 토큰 가져오기
if not notion_token:
notion_token = os.getenv('NOTION_TOKEN')
if not notion_token:
return {"content": [{"type": "text", "text": "Notion API 토큰이 필요합니다. NOTION_TOKEN 환경변수를 설정하거나 notion_token 파라미터를 제공하세요."}]}
# 페이지 ID 정리 (URL에서 ID만 추출)
if 'notion.so' in page_id:
page_id = page_id.split('/')[-1].split('?')[0].split('#')[0]
page_id = page_id.replace('-', '')
headers = {
"Authorization": f"Bearer {notion_token}",
"Notion-Version": "2022-06-28",
"Content-Type": "application/json"
}
# 페이지 정보 가져오기
page_url = f"https://api.notion.com/v1/pages/{page_id}"
page_response = requests.get(page_url, headers=headers, timeout=10)
if page_response.status_code != 200:
return {"content": [{"type": "text", "text": f"페이지 정보 조회 실패: {page_response.status_code} - {page_response.text}"}]}
page_data = page_response.json()
# 페이지 블록들 가져오기
all_blocks = []
next_cursor = None
while True:
blocks_url = f"https://api.notion.com/v1/blocks/{page_id}/children"
params = {"page_size": 100}
if next_cursor:
params["start_cursor"] = next_cursor
blocks_response = requests.get(blocks_url, headers=headers, params=params, timeout=10)
if blocks_response.status_code != 200:
return {"content": [{"type": "text", "text": f"페이지 블록 조회 실패: {blocks_response.status_code} - {blocks_response.text}"}]}
blocks_data = blocks_response.json()
all_blocks.extend(blocks_data.get('results', []))
if not blocks_data.get('has_more'):
break
next_cursor = blocks_data.get('next_cursor')
# 마크다운 내용 추출
markdown_content = ""
# 페이지 제목 추출
page_title = ""
if 'properties' in page_data:
for prop_name, prop_data in page_data['properties'].items():
if prop_data.get('type') == 'title' and prop_data.get('title'):
page_title = convert_rich_text_to_markdown(prop_data['title'])
break
markdown_content += f"# {page_title}\n\n"
# 페이지 속성 정보를 YAML front matter로 추가
if 'properties' in page_data:
yaml_properties = {}
for prop_name, prop_data in page_data['properties'].items():
if prop_name != 'title': # 제목은 이미 위에 표시됨
prop_type = prop_data.get('type', 'unknown')
if prop_type == 'rich_text':
text_content = convert_rich_text_to_markdown(prop_data.get('rich_text', []))
if text_content.strip():
yaml_properties[prop_name] = text_content.strip()
elif prop_type == 'url':
url_value = prop_data.get('url', '')
if url_value:
yaml_properties[prop_name] = url_value
elif prop_type == 'select':
select_data = prop_data.get('select')
if select_data and select_data.get('name'):
yaml_properties[prop_name] = select_data.get('name')
elif prop_type == 'people':
people_data = prop_data.get('people', [])
if people_data:
names = [person.get('name', '') for person in people_data if person.get('name')]
if names:
yaml_properties[prop_name] = names if len(names) > 1 else names[0]
elif prop_type == 'created_time':
created_time = prop_data.get('created_time', '')
if created_time:
# 날짜 형식을 더 읽기 쉽게 변환
from datetime import datetime
try:
dt = datetime.fromisoformat(created_time.replace('Z', '+00:00'))
yaml_properties[prop_name] = dt.strftime('%Y-%m-%d')
except:
yaml_properties[prop_name] = created_time
elif prop_type == 'checkbox':
yaml_properties[prop_name] = prop_data.get('checkbox', False)
elif prop_type == 'number':
number_value = prop_data.get('number')
if number_value is not None:
yaml_properties[prop_name] = number_value
elif prop_type == 'email':
email_value = prop_data.get('email', '')
if email_value:
yaml_properties[prop_name] = email_value
elif prop_type == 'phone_number':
phone_value = prop_data.get('phone_number', '')
if phone_value:
yaml_properties[prop_name] = phone_value
# YAML front matter 생성
if yaml_properties:
import yaml
yaml_content = yaml.dump(yaml_properties, default_flow_style=False, allow_unicode=True, sort_keys=False)
markdown_content = f"---\n{yaml_content}---\n\n{markdown_content}"
# 블록들에서 마크다운 추출
for block in all_blocks:
block_type = block.get('type', '')
block_content = block.get(block_type, {})
if block_type == 'code':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
language = block_content.get('language', '')
# 마크다운 코드 블록인지 확인
if (language.lower() in ['markdown', 'md'] or
'markdown' in text_content.lower()[:50] or
text_content.strip().startswith('#') or
text_content.strip().startswith('##') or
text_content.strip().startswith('###')):
markdown_content += f"{text_content}\n\n"
else:
# 일반 코드 블록 처리
markdown_content += f"```{language}\n{text_content}\n```\n\n"
elif block_type == 'paragraph':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
# 마크다운 문법이 포함된 문단인지 확인
if any(md_syntax in text_content for md_syntax in ['**', '*', '`', '[', ']', '#', '- ', '1. ']):
markdown_content += f"{text_content}\n\n"
elif block_type == 'heading_1':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
markdown_content += f"# {text_content}\n\n"
elif block_type == 'heading_2':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
markdown_content += f"## {text_content}\n\n"
elif block_type == 'heading_3':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
markdown_content += f"### {text_content}\n\n"
elif block_type == 'bulleted_list_item':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
markdown_content += f"- {text_content}\n"
elif block_type == 'numbered_list_item':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
markdown_content += f"1. {text_content}\n"
elif block_type == 'quote':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
markdown_content += f"> {text_content}\n\n"
elif block_type == 'divider':
markdown_content += "---\n\n"
elif block_type == 'to_do':
text_content = convert_rich_text_to_markdown(block_content.get('rich_text', []))
checked = block_content.get('checked', False)
checkbox = "- [x]" if checked else "- [ ]"
markdown_content += f"{checkbox} {text_content}\n"
# 결과 저장
data_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'data'))
os.makedirs(data_dir, exist_ok=True)
# 파일명을 페이지 제목으로 설정 (안전한 파일명으로 변환)
safe_title = page_title.replace(' ', '_').replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_')
if not safe_title or safe_title == '_':
safe_title = f"페이지_{page_id[:8]}"
result_path = os.path.join(data_dir, f'{safe_title}.md')
with open(result_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
return {"content": [{"type": "text", "text": f"마크다운 내용 추출 완료! (페이지: {page_title})\n결과는 {result_path}에 저장됩니다.\n\n추출된 마크다운:\n```markdown\n{markdown_content[:500]}{'...' if len(markdown_content) > 500 else ''}\n```"}]}
except Exception as e:
app.logger.info(f"[NOTION_MD_ERR] {str(e)}")
return {"content": [{"type": "text", "text": f"마크다운 추출 중 오류 발생: {str(e)}"}]}