"""Connpass API client."""
import asyncio
import httpx
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from .config import (
CONNPASS_BASE_URL,
CONNPASS_API_KEY,
RATE_LIMIT_DELAY,
REQUEST_TIMEOUT,
MAX_COUNT
)
from .errors import ConnpassError
class ConnpassClient:
"""Connpass API クライアント。"""
# クラス変数: 最後のリクエスト時刻を保持(全インスタンスで共有)
_last_request_time: Optional[datetime] = None
_lock = asyncio.Lock()
def __init__(self):
self.base_url = CONNPASS_BASE_URL
self.api_key = CONNPASS_API_KEY
def _validate_api_key(self) -> Optional[dict]:
"""APIキーの存在確認。"""
if not self.api_key:
return ConnpassError.missing_api_key()
return None
def _normalize_count(self, count: int) -> int:
"""countパラメータを正規化。"""
return min(count, MAX_COUNT)
def _build_headers(self) -> dict:
"""HTTPヘッダーを構築。"""
return {"X-API-Key": self.api_key}
async def _wait_for_rate_limit(self):
"""
レート制限を守るために必要な待機時間を計算して待機。
1秒に1リクエストを保証します。
"""
async with self._lock:
if self._last_request_time is not None:
elapsed = (datetime.now() - self._last_request_time).total_seconds()
wait_time = RATE_LIMIT_DELAY - elapsed
if wait_time > 0:
await asyncio.sleep(wait_time)
# リクエスト実行時刻を記録
ConnpassClient._last_request_time = datetime.now()
async def _make_request(
self,
url: str,
params: Dict[str, Any]
) -> dict:
"""
APIリクエストを実行。
Args:
url: リクエストURL
params: クエリパラメータ
Returns:
APIレスポンス
"""
# レート制限: リクエスト前に待機
await self._wait_for_rate_limit()
try:
async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT) as client:
response = await client.get(
url,
headers=self._build_headers(),
params=params
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
return ConnpassError.handle_http_error(e)
except Exception as e:
return ConnpassError.handle_general_error(e)
async def get_user_attended_events(
self,
nickname: str,
start: int = 1,
count: int = 10
) -> dict:
"""
ユーザーの参加イベントを取得。
Args:
nickname: ユーザーニックネーム
start: 開始位置
count: 取得件数
Returns:
イベント情報
"""
error = self._validate_api_key()
if error:
return error
url = f"{self.base_url}/api/v2/users/{nickname}/attended_events/"
params = {
"start": start,
"count": self._normalize_count(count)
}
return await self._make_request(url, params)
async def get_events(
self,
group_id: Optional[int] = None,
prefecture: Optional[str] = None,
keyword: Optional[str] = None,
keyword_or: Optional[str] = None,
start: int = 1,
count: int = 10
) -> dict:
"""
イベントを検索。
Args:
group_id: グループID
prefecture: 都道府県コード
keyword: 検索キーワード(AND)
keyword_or: 検索キーワード(OR)
start: 開始位置
count: 取得件数
Returns:
イベント情報
"""
error = self._validate_api_key()
if error:
return error
url = f"{self.base_url}/api/v2/events/"
params = {
"start": start,
"count": self._normalize_count(count)
}
if group_id is not None:
params["group_id"] = group_id
if prefecture:
params["prefecture"] = prefecture
if keyword:
params["keyword"] = keyword
if keyword_or:
params["keyword_or"] = keyword_or
return await self._make_request(url, params)