"""
Shared schema defs and dynamic attachment of JSON Schemas to MCP tools.
Extracted from core.server to keep server thinner.
"""
from typing import Any, Dict
from .schema import (
enrich_schema_with_shared_defs as _enrich_schema_with_shared_defs,
build_minimal_schema as _build_minimal_schema,
get_function_info as _get_function_info,
complex_defs as _complex_defs,
apply_param_hints as _apply_param_hints,
)
def server_shared_defs(shared_enums: Dict[str, Any]) -> Dict[str, Any]:
"""Build server-level $defs based on provided enum lists (avoids circular imports)."""
defs: Dict[str, Any] = {}
try:
defs.update({
"OhlcvChar": {"type": "string", "enum": ["O","H","L","C","V"], "description": "OHLCV column code"},
"DenoiseMethod": {"type": "string", "enum": list(shared_enums.get("DENOISE_METHODS", []))},
"SimplifyMode": {"type": "string", "enum": list(shared_enums.get("SIMPLIFY_MODES", []))},
"SimplifyMethod": {"type": "string", "enum": list(shared_enums.get("SIMPLIFY_METHODS", []))},
"EncodeSchema": {"type": "string", "enum": ["envelope","delta"]},
"SymbolicSchema": {"type": "string", "enum": ["sax"]},
"PivotMethod": {"type": "string", "enum": list(shared_enums.get("PIVOT_METHODS", []))},
"ForecastMethod": {"type": "string", "enum": list(shared_enums.get("FORECAST_METHODS", []))},
"QuantitySpec": {"type": "string", "enum": ["price","return","volatility"]},
"VolatilityMethod": {"type": "string", "enum": [
"ewma","parkinson","gk","rs","yang_zhang","rolling_std",
"garch","egarch","gjr_garch",
"arima","sarima","ets","theta"
]},
"WhenSpec": {"type": "string", "enum": ["pre_ti","post_ti"]},
"CausalitySpec": {"type": "string", "enum": ["causal","zero_phase"]},
"TargetSpec": {"type": "string", "enum": ["price","return"]},
})
if shared_enums.get("CATEGORY_CHOICES"):
defs["IndicatorCategory"] = {"type": "string", "enum": list(shared_enums["CATEGORY_CHOICES"])}
if shared_enums.get("INDICATOR_NAME_CHOICES"):
defs["IndicatorName"] = {"type": "string", "enum": list(shared_enums["INDICATOR_NAME_CHOICES"])}
except Exception:
pass
return defs
def attach_schemas_to_tools(mcp: Any, shared_enums: Dict[str, Any]) -> None:
"""Attach enriched JSON Schemas to registered MCP tools on the given server."""
try:
registry = None
for attr in ("tools", "_tools", "registry", "tool_registry", "_tool_registry"):
reg = getattr(mcp, attr, None)
if reg and hasattr(reg, 'items'):
registry = reg
break
if not registry:
return
shared_defs = server_shared_defs(shared_enums)
for name, obj in list(registry.items()):
func = None
for attr in ("func", "function", "callable", "handler", "wrapped", "_func"):
try:
val = getattr(obj, attr)
if callable(val):
func = val
break
except Exception:
continue
if func is None:
func = obj if callable(obj) else None
if not callable(func):
continue
info = _get_function_info(func)
schema = _build_minimal_schema(info)
schema = _enrich_schema_with_shared_defs(schema, info)
try:
if "$defs" not in schema:
schema["$defs"] = {}
schema["$defs"].update({k: v for k, v in shared_defs.items() if k not in schema["$defs"]})
schema["$defs"].update({k: v for k, v in _complex_defs().items() if k not in schema["$defs"]})
except Exception:
pass
try:
params = schema.get("parameters", {}).get("properties", {})
required_params = set(schema.get("parameters", {}).get("required", []))
def _set_ref(param_name: str, ref: str, allow_null: bool = False) -> None:
if param_name not in params:
return
if allow_null and param_name not in required_params:
params[param_name] = {"anyOf": [{"$ref": ref}, {"type": "null"}]}
else:
params[param_name] = {"$ref": ref}
if name == "forecast_generate":
_set_ref("method", "#/$defs/ForecastMethod")
_set_ref("target", "#/$defs/TargetSpec")
_set_ref("quantity", "#/$defs/QuantitySpec")
_set_ref("denoise", "#/$defs/DenoiseSpec", allow_null=True)
if "params" in params:
params["params"] = {"type": "object", "additionalProperties": True}
if name == "indicators_list" and "category" in params and "IndicatorCategory" in schema.get("$defs", {}):
_set_ref("category", "#/$defs/IndicatorCategory")
if name == "indicators_describe" and "name" in params and "IndicatorName" in schema.get("$defs", {}):
_set_ref("name", "#/$defs/IndicatorName")
if name == "data_fetch_candles":
if "indicators" in params:
params["indicators"] = {"type": "array", "items": {"$ref": "#/$defs/IndicatorSpec"}}
_set_ref("denoise", "#/$defs/DenoiseSpec", allow_null=True)
_set_ref("simplify", "#/$defs/SimplifySpec", allow_null=True)
if name == "data_fetch_ticks":
_set_ref("simplify", "#/$defs/SimplifySpec", allow_null=True)
# Trading schemas: add enums and param docs where helpful
if name == "trading_pending_place":
# Clarify acceptable order type values for pending orders
if "type" in params:
params["type"] = {
"type": "string",
"enum": [
"BUY", "SELL",
"BUYLIMIT", "BUYSTOP",
"SELLLIMIT", "SELLSTOP"
],
"description": "Direction or explicit pending type. BUY/SELL will auto-select LIMIT/STOP based on price relative to Bid/Ask."
}
# Document expiration flexibility
if "expiration" in params:
params["expiration"] = {
"anyOf": [
{"type": "string"},
{"type": "number"},
{"type": "null"}
],
"description": "GTC tokens (e.g., 'GTC'), ISO datetime, epoch seconds, or natural language (e.g., 'tomorrow 14:00')."
}
except Exception:
pass
_apply_param_hints(schema)
try:
setattr(obj, "schema", schema)
except Exception:
pass
try:
setattr(func, "schema", schema)
except Exception:
pass
except Exception:
pass