"""Futu OpenAPI provider for HK/US/CN market data."""
import threading
import time
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from futu import (
OpenQuoteContext,
RET_OK,
RET_ERROR,
SysConfig,
KLType,
AuType,
SubType,
QotRight,
SimpleFilter,
CurKlineHandlerBase,
StockQuoteHandlerBase
)
import pandas as pd
from ..util import get_env, get_env_int, now_iso, ProviderError
from ..schemas import (
Quote,
HistoryRecord,
SymbolSearchResult
)
class FutuProvider:
"""
Futu OpenAPI provider.
Requires local FutuOpenD gateway running.
"""
_instance = None
_lock = threading.Lock()
_context: Optional[OpenQuoteContext] = None
def __init__(self, host: str = "127.0.0.1", port: int = 11111):
"""
Initialize Futu provider.
Args:
host: FutuOpenD host
port: FutuOpenD port
"""
self.host = get_env("FUTU_OPEND_HOST", host)
self.port = get_env_int("FUTU_OPEND_PORT", port)
self._ensure_context()
def _ensure_context(self):
"""Ensure OpenQuoteContext is connected."""
with self._lock:
if self._context is None:
try:
SysConfig.set_all_thread_daemon(True)
self._context = OpenQuoteContext(host=self.host, port=self.port)
# Small delay to ensure connection
time.sleep(0.5)
except Exception as e:
raise ProviderError(
code="PROVIDER_CONNECT_FAILED",
message=f"Failed to connect to FutuOpenD at {self.host}:{self.port}. Ensure OpenD is running and logged in.",
details={"error": str(e)},
provider="futu"
)
def close(self):
"""Close connection."""
with self._lock:
if self._context:
self._context.close()
self._context = None
def yahoo_to_futu_code(self, symbol: str) -> str:
"""
Convert Yahoo symbol to Futu code.
Examples:
0700.HK -> HK.00700
AAPL -> US.AAPL
600519.SS -> SH.600519
000001.SZ -> SZ.000001
"""
symbol = symbol.upper()
# HK format
if symbol.endswith(".HK"):
code = symbol.replace(".HK", "")
return f"HK.{code.zfill(5)}"
# CN format
elif symbol.endswith(".SS") or symbol.endswith(".SH"):
code = symbol.split(".")[0]
return f"SH.{code}"
elif symbol.endswith(".SZ"):
code = symbol.split(".")[0]
return f"SZ.{code}"
# US format (default assumption if no suffix, or direct US format)
elif "." not in symbol:
return f"US.{symbol}"
# Already Futu format?
elif symbol.startswith(("HK.", "US.", "SH.", "SZ.")):
return symbol
raise ProviderError(
code="INVALID_SYMBOL_FORMAT",
message=f"Could not convert symbol {symbol} to Futu format.",
provider="futu"
)
def futu_to_yahoo_code(self, code: str) -> str:
"""
Convert Futu code to Yahoo symbol.
"""
code = code.upper()
if code.startswith("HK."):
return f"{code[3:].lstrip('0') or '0'}.HK"
elif code.startswith("US."):
return code[3:]
elif code.startswith("SH."):
return f"{code[3:]}.SS"
elif code.startswith("SZ."):
return f"{code[3:]}.SZ"
return code
def resolve_symbol(self, symbol: str) -> SymbolSearchResult:
"""Resolve symbol format and market."""
try:
futu_code = self.yahoo_to_futu_code(symbol)
yahoo_code = self.futu_to_yahoo_code(futu_code)
market = futu_code.split(".")[0]
return SymbolSearchResult(
symbol=symbol,
name=symbol, # Name fetching requires API call, skipping for simple resolve
market=market,
source="futu",
yahoo_symbol=yahoo_code,
futu_code=futu_code
)
except Exception as e:
raise ProviderError(
code="INVALID_SYMBOL_FORMAT",
message=str(e),
provider="futu"
)
def get_quote(self, symbol: str) -> Quote:
"""Get snapshot quote."""
self._ensure_context()
futu_code = self.yahoo_to_futu_code(symbol)
ret, data = self._context.get_market_snapshot([futu_code])
if ret != RET_OK:
raise ProviderError(
code="PROVIDER_ERROR",
message=f"Futu get_market_snapshot failed: {data}",
details={"ret_code": ret},
provider="futu"
)
if data.empty:
raise ProviderError(
code="NO_DATA",
message=f"No quote data for {symbol}",
provider="futu"
)
row = data.iloc[0]
# Determine timestamp
# update_time is string "YYYY-MM-DD HH:MM:SS"
ts = row['update_time']
if not ts:
ts = now_iso()
return Quote(
symbol=symbol,
name=row.get('name'),
market=futu_code.split(".")[0],
exchange=futu_code.split(".")[0], # Futu uses market as exchange usually
currency=None, # Snapshot doesn't strictly provide currency, implied by market
price=row.get('last_price'),
change=row.get('change_val'),
change_pct=row.get('change_rate'), # Futu returns percentage? Check docs. Usually decimal or native? SDK says rate.
open=row.get('open_price'),
high=row.get('high_price'),
low=row.get('low_price'),
prev_close=row.get('prev_close_price'),
volume=row.get('volume'),
timestamp=str(ts),
source="futu",
source_detail={"futu_code": futu_code}
)
def get_history(self, symbol: str, start_date: str, end_date: str, interval: str = "1d", adjust: str = "qfq") -> List[HistoryRecord]:
"""Get historical K-line data."""
self._ensure_context()
futu_code = self.yahoo_to_futu_code(symbol)
# Map interval
ktype_map = {
"1d": KLType.K_DAY,
"1wk": KLType.K_WEEK,
"1mo": KLType.K_MON
}
ktype = ktype_map.get(interval, KLType.K_DAY)
# Map adjust
autype_map = {
"none": AuType.NONE,
"qfq": AuType.QFQ,
"hfq": AuType.HFQ
}
autype = autype_map.get(adjust, AuType.QFQ)
all_df = pd.DataFrame()
page_req_key = None
while True:
ret, data, page_req_key = self._context.request_history_kline(
futu_code,
start=start_date,
end=end_date,
ktype=ktype,
autype=autype,
max_count=1000,
page_req_key=page_req_key
)
if ret != RET_OK:
# If error is permission related or history unavailable
if "permission" in str(data).lower():
raise ProviderError(
code="HISTORY_NOT_AVAILABLE",
message=f"Futu history permission denied: {data}",
provider="futu"
)
raise ProviderError(
code="PROVIDER_ERROR",
message=f"Futu request_history_kline failed: {data}",
provider="futu"
)
if not data.empty:
all_df = pd.concat([all_df, data])
if page_req_key is None:
break
if len(all_df) > 5000: # Hard limit safety
break
if all_df.empty:
return []
records = []
for _, row in all_df.iterrows():
# Date format in Futu is 'YYYY-MM-DD HH:MM:SS' usually, take substring
date_str = str(row['time_key'])[:10]
records.append(HistoryRecord(
date=date_str,
open=row['open'],
high=row['high'],
low=row['low'],
close=row['close'],
volume=int(row['volume'])
))
return records
def subscribe(self, symbols: List[str], sub_types: List[str], enable: bool = True) -> Dict[str, Any]:
"""Subscribe or unsubscribe."""
self._ensure_context()
futu_codes = [self.yahoo_to_futu_code(s) for s in symbols]
# Map sub types
st_map = {
"QUOTE": SubType.QUOTE,
"KLINE": SubType.K_DAY, # Defaulting to Day K-line for generic KLINE sub? Or make explicit.
"ORDER_BOOK": SubType.ORDER_BOOK,
"TICKER": SubType.TICKER
}
# For multiple types, Futu subscribe accepts a list of SubTypes
futu_sub_types = [st_map[st] for st in sub_types if st in st_map]
if not futu_sub_types:
raise ProviderError("INVALID_ARGUMENT", "No valid subscription types provided", provider="futu")
if enable:
ret, err = self._context.subscribe(futu_codes, futu_sub_types, is_first_push=True)
else:
ret, err = self._context.unsubscribe(futu_codes, futu_sub_types)
if ret != RET_OK:
if "subscription limit" in str(err).lower():
raise ProviderError("SUBSCRIPTION_LIMIT", f"Subscription limit reached: {err}", provider="futu")
raise ProviderError("PROVIDER_ERROR", f"Subscribe failed: {err}", provider="futu")
return {"status": "success", "ret_code": ret}
def get_order_book(self, symbol: str, num: int = 10) -> Dict[str, Any]:
"""Get order book."""
self._ensure_context()
futu_code = self.yahoo_to_futu_code(symbol)
ret, data = self._context.get_order_book(futu_code, num=num)
if ret != RET_OK:
raise ProviderError("PROVIDER_ERROR", f"Get order book failed: {data}", provider="futu")
return {
"symbol": symbol,
"futu_code": futu_code,
"bid": data['Bid'], # List of (price, volume, count)
"ask": data['Ask']
}
def get_ticker(self, symbol: str, num: int = 100) -> Dict[str, Any]:
"""Get ticker (trade ticks)."""
self._ensure_context()
futu_code = self.yahoo_to_futu_code(symbol)
ret, data = self._context.get_rt_ticker(futu_code, num=num)
if ret != RET_OK:
raise ProviderError("PROVIDER_ERROR", f"Get ticker failed: {data}", provider="futu")
# Convert DataFrame to list of dicts
ticks = data.to_dict('records')
return {
"symbol": symbol,
"futu_code": futu_code,
"ticks": ticks
}