import os
import httpx
import asyncio
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
async def verify_connection():
# 1. Get Credentials
encoded_key = os.getenv("KMA_API_KEY_ENCODED")
decoded_key = os.getenv("KMA_API_KEY_DECODED")
if not decoded_key:
print("❌ Error: KMA_API_KEY_DECODED not found in .env")
return
print(f"🔑 Loaded Key (Decoded): {decoded_key[:5]}...{decoded_key[-5:]}")
# 2. Prepare Request Parameters (Ultra Short Term Live)
# Use '1 hour ago' to ensure data exists
now = datetime.now()
base_date = now.strftime("%Y%m%d")
# Round down to nearest hour and subtract 1 hour for safety
base_time = (now - timedelta(hours=1)).strftime("%H00")
url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getUltraSrtNcst"
# KMA API is tricky with keys.
# Option A: Pass DECODED key in params (httpx handles encoding) => Recommended
params = {
"serviceKey": decoded_key,
"pageNo": "1",
"numOfRows": "10",
"dataType": "JSON",
"base_date": base_date,
"base_time": base_time,
"nx": "60", # Seoul
"ny": "127" # Seoul
}
print(f"📡 Sending request to {url}...")
print(f" Date: {base_date}, Time: {base_time}")
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=10.0)
print(f"📥 Status Code: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
# Check for API Header Code '00' (Normal Service)
header_code = data.get("response", {}).get("header", {}).get("resultCode")
if header_code == "00":
print("✅ API Connection Successful!")
print(" Weather Data Sample:")
items = data["response"]["body"]["items"]["item"]
for item in items:
if item["category"] == "T1H": # Temperature
print(f" 🌡️ Temp: {item['obsrValue']}°C")
break
else:
print(f"⚠️ API Error Code: {header_code}")
print(f" Message: {data.get('response', {}).get('header', {}).get('resultMsg')}")
print(f" Full Response: {data}")
except Exception as e:
print(f"⚠️ Failed to parse JSON (might be XML error body): {response.text[:200]}")
else:
print(f"❌ HTTP Error: {response.text[:200]}")
except Exception as e:
print(f"❌ Connection Failed: {e}")
if __name__ == "__main__":
asyncio.run(verify_connection())