Skip to main content
Glama
kimi.py21.2 kB
"""Kimi (Moonshot) provider implementation.""" import base64 import json import logging import os import time from typing import Any, Optional from .base import ModelProvider, ModelCapabilities, ModelResponse, ProviderType, create_temperature_constraint from .openai_compatible import OpenAICompatibleProvider logger = logging.getLogger(__name__) class KimiModelProvider(OpenAICompatibleProvider): """Provider implementation for Kimi (Moonshot) models.""" # API configuration DEFAULT_BASE_URL = os.getenv("KIMI_API_URL", "https://api.moonshot.ai/v1") # Simple in-process LRU for Kimi context tokens per session/tool/prefix _cache_tokens: dict[str, tuple[str, float]] = {} _cache_tokens_order: list[str] = [] _cache_tokens_ttl: float = float(os.getenv("KIMI_CACHE_TOKEN_TTL_SECS", "1800")) _cache_tokens_max: int = int(os.getenv("KIMI_CACHE_TOKEN_LRU_MAX", "256")) # Model capabilities - extended thinking disabled by default for compliance SUPPORTED_MODELS: dict[str, ModelCapabilities] = { # Prioritize k2 preview models when allowed "kimi-k2-0905-preview": ModelCapabilities( provider=ProviderType.KIMI, model_name="kimi-k2-0905-preview", friendly_name="Kimi", context_window=128000, max_output_tokens=8192, supports_images=True, max_image_size_mb=20.0, supports_function_calling=True, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Kimi K2 2024-09 preview", aliases=["kimi-k2-0905", "kimi-k2"], ), "kimi-k2-0711-preview": ModelCapabilities( provider=ProviderType.KIMI, model_name="kimi-k2-0711-preview", friendly_name="Kimi", context_window=128000, max_output_tokens=8192, supports_images=True, max_image_size_mb=20.0, supports_function_calling=True, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Kimi K2 2024-07 preview", aliases=["kimi-k2-0711"], ), # Canonical moonshot v1 series "moonshot-v1-8k": ModelCapabilities( provider=ProviderType.KIMI, model_name="moonshot-v1-8k", friendly_name="Kimi", context_window=8192, max_output_tokens=2048, supports_images=False, supports_function_calling=False, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Moonshot v1 8k", ), "moonshot-v1-32k": ModelCapabilities( provider=ProviderType.KIMI, model_name="moonshot-v1-32k", friendly_name="Kimi", context_window=32768, max_output_tokens=4096, supports_images=False, supports_function_calling=False, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Moonshot v1 32k", ), # New models from Moonshot docs "kimi-k2-turbo-preview": ModelCapabilities( provider=ProviderType.KIMI, model_name="kimi-k2-turbo-preview", friendly_name="Kimi", context_window=256000, max_output_tokens=8192, supports_images=True, max_image_size_mb=20.0, supports_function_calling=True, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Kimi K2 Turbo high-speed 256k", aliases=["kimi-k2-turbo"], ), "moonshot-v1-128k": ModelCapabilities( provider=ProviderType.KIMI, model_name="moonshot-v1-128k", friendly_name="Kimi", context_window=128000, max_output_tokens=8192, supports_images=False, supports_function_calling=False, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Moonshot v1 128k", ), "moonshot-v1-8k-vision-preview": ModelCapabilities( provider=ProviderType.KIMI, model_name="moonshot-v1-8k-vision-preview", friendly_name="Kimi", context_window=8192, max_output_tokens=2048, supports_images=True, max_image_size_mb=20.0, supports_function_calling=False, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Moonshot v1 8k vision preview", ), "moonshot-v1-32k-vision-preview": ModelCapabilities( provider=ProviderType.KIMI, model_name="moonshot-v1-32k-vision-preview", friendly_name="Kimi", context_window=32768, max_output_tokens=4096, supports_images=True, max_image_size_mb=20.0, supports_function_calling=False, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Moonshot v1 32k vision preview", ), "moonshot-v1-128k-vision-preview": ModelCapabilities( provider=ProviderType.KIMI, model_name="moonshot-v1-128k-vision-preview", friendly_name="Kimi", context_window=128000, max_output_tokens=8192, supports_images=True, max_image_size_mb=20.0, supports_function_calling=False, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Moonshot v1 128k vision preview", ), "kimi-latest": ModelCapabilities( provider=ProviderType.KIMI, model_name="kimi-latest", friendly_name="Kimi", context_window=128000, max_output_tokens=8192, supports_images=True, max_image_size_mb=20.0, supports_function_calling=True, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, description="Kimi latest vision 128k", ), "kimi-thinking-preview": ModelCapabilities( provider=ProviderType.KIMI, model_name="kimi-thinking-preview", friendly_name="Kimi", context_window=128000, max_output_tokens=8192, supports_images=True, max_image_size_mb=20.0, supports_function_calling=True, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=True, description="Kimi multimodal reasoning 128k", ), } def __init__(self, api_key: str, base_url: Optional[str] = None, **kwargs): self.base_url = base_url or self.DEFAULT_BASE_URL # Provider-specific timeout overrides via env try: rt = os.getenv("KIMI_READ_TIMEOUT_SECS", "").strip() ct = os.getenv("KIMI_CONNECT_TIMEOUT_SECS", "").strip() wt = os.getenv("KIMI_WRITE_TIMEOUT_SECS", "").strip() pt = os.getenv("KIMI_POOL_TIMEOUT_SECS", "").strip() if rt: kwargs["read_timeout"] = float(rt) if ct: kwargs["connect_timeout"] = float(ct) if wt: kwargs["write_timeout"] = float(wt) if pt: kwargs["pool_timeout"] = float(pt) # Provide a Kimi-specific sane default if nothing configured if "read_timeout" not in kwargs and not rt: # Default to 300s to avoid multi-minute hangs on web-enabled prompts kwargs["read_timeout"] = float(os.getenv("KIMI_DEFAULT_READ_TIMEOUT_SECS", "300")) except Exception: pass super().__init__(api_key, base_url=self.base_url, **kwargs) def get_provider_type(self) -> ProviderType: return ProviderType.KIMI def validate_model_name(self, model_name: str) -> bool: # Allow aliases to pass validation resolved = self._resolve_model_name(model_name) return resolved in self.SUPPORTED_MODELS def supports_thinking_mode(self, model_name: str) -> bool: resolved = self._resolve_model_name(model_name) capabilities = self.SUPPORTED_MODELS.get(resolved) return bool(capabilities and capabilities.supports_extended_thinking) def list_models(self, respect_restrictions: bool = True): # Use base implementation with restriction awareness return super().list_models(respect_restrictions=respect_restrictions) def get_model_configurations(self) -> dict[str, ModelCapabilities]: # Use our static SUPPORTED_MODELS return self.SUPPORTED_MODELS def get_all_model_aliases(self) -> dict[str, list[str]]: # Extract aliases from the capabilities result = {} for name, caps in self.SUPPORTED_MODELS.items(): if caps.aliases: result[name] = caps.aliases return result def get_capabilities(self, model_name: str) -> ModelCapabilities: resolved = self._resolve_model_name(model_name) caps = self.SUPPORTED_MODELS.get(resolved) if not caps: # Default capability if unknown model (use safe defaults) return ModelCapabilities( provider=ProviderType.KIMI, model_name=resolved, friendly_name="Kimi", context_window=8192, max_output_tokens=2048, supports_images=False, supports_function_calling=False, supports_streaming=True, supports_system_prompts=True, supports_extended_thinking=False, ) return caps def count_tokens(self, text: str, model_name: str) -> int: # Language-aware heuristic: for CJK-heavy inputs, ~0.6 tokens/char; else ~4 chars/token if not text: return 1 try: total = len(text) cjk = 0 for ch in text: o = ord(ch) if (0x4E00 <= o <= 0x9FFF) or (0x3040 <= o <= 0x30FF) or (0x3400 <= o <= 0x4DBF): cjk += 1 ratio = cjk / max(1, total) if ratio > 0.2: return max(1, int(total * 0.6)) return max(1, int(total / 4)) except Exception: return max(1, len(text) // 4) def _lru_key(self, session_id: str, tool_name: str, prefix_hash: str) -> str: return f"{session_id}:{tool_name}:{prefix_hash}" def save_cache_token(self, session_id: str, tool_name: str, prefix_hash: str, token: str) -> None: try: k = self._lru_key(session_id, tool_name, prefix_hash) self._cache_tokens[k] = (token, time.time()) self._cache_tokens_order.append(k) # Purge expired and over-capacity entries self._purge_cache_tokens() logger.info("Kimi cache token saved key=%s suffix=%s", k[-24:], token[-6:]) except Exception: pass def get_cache_token(self, session_id: str, tool_name: str, prefix_hash: str) -> Optional[str]: try: k = self._lru_key(session_id, tool_name, prefix_hash) v = self._cache_tokens.get(k) if not v: return None token, ts = v if time.time() - ts > self._cache_tokens_ttl: self._cache_tokens.pop(k, None) return None return token except Exception: return None def _purge_cache_tokens(self) -> None: try: # TTL-based cleanup now = time.time() ttl = self._cache_tokens_ttl self._cache_tokens = {k: v for k, v in self._cache_tokens.items() if now - v[1] <= ttl} # LRU size limit if len(self._cache_tokens) > self._cache_tokens_max: # remove oldest keys to_remove = len(self._cache_tokens) - self._cache_tokens_max removed = 0 for k in list(self._cache_tokens_order): if k in self._cache_tokens: self._cache_tokens.pop(k, None) removed += 1 if removed >= to_remove: break except Exception: pass def upload_file(self, file_path: str, purpose: str = "file-extract") -> str: """Upload a local file to Moonshot (Kimi) and return file_id. Args: file_path: Path to a local file purpose: Moonshot purpose tag (e.g., 'file-extract', 'assistants') Returns: The provider-assigned file id string """ from pathlib import Path # Resolve relative paths from project root if not absolute p = Path(file_path) if not p.is_absolute(): try: project_root = Path.cwd() p = (project_root / p).resolve() except Exception: p = p if not p.exists(): # Friendlier message: show CWD and suggest possible path cwd = str(Path.cwd()) raise FileNotFoundError(f"File not found: {file_path} (cwd={cwd})") # Optional client-side size guardrail (helps before provider returns 4xx) try: max_mb_env = os.getenv("KIMI_FILES_MAX_SIZE_MB", "") if max_mb_env: max_bytes = float(max_mb_env) * 1024 * 1024 if p.stat().st_size > max_bytes: raise ValueError(f"Kimi upload exceeds max size {max_mb_env} MB: {p.name}") except Exception: # Never block upload if env is malformed; rely on provider errors pass result = self.client.files.create(file=p, purpose=purpose) file_id = getattr(result, "id", None) or (result.get("id") if isinstance(result, dict) else None) if not file_id: raise RuntimeError("Moonshot upload did not return a file id") return file_id def _prefix_hash(self, messages: list[dict[str, Any]]) -> str: try: import hashlib # Serialize a stable prefix of messages (roles + first 2k chars) parts: list[str] = [] for m in messages[:6]: # limit to first few messages role = str(m.get("role", "")) content = str(m.get("content", ""))[:2048] parts.append(role + "\n" + content + "\n") joined = "\n".join(parts) return hashlib.sha256(joined.encode("utf-8", errors="ignore")).hexdigest() except Exception: return "" def chat_completions_create(self, *, model: str, messages: list[dict[str, Any]], tools: Optional[list[Any]] = None, tool_choice: Optional[Any] = None, temperature: float = 0.6, **kwargs) -> dict: """Wrapper that injects idempotency and Kimi context-cache headers, captures cache token, and returns normalized dict. """ import logging as _log session_id = kwargs.get("_session_id") or kwargs.get("session_id") call_key = kwargs.get("_call_key") or kwargs.get("call_key") tool_name = kwargs.get("_tool_name") or "kimi_chat_with_tools" prefix_hash = self._prefix_hash(messages) # Build extra headers extra_headers = {"Msh-Trace-Mode": "on"} if call_key: extra_headers["Idempotency-Key"] = str(call_key) # Attach cached context token if available cache_token = None if session_id and prefix_hash: cache_token = self.get_cache_token(session_id, tool_name, prefix_hash) if cache_token: extra_headers["Msh-Context-Cache-Token"] = cache_token _log.info("Kimi attach cache token suffix=%s", cache_token[-6:]) # Call with raw response to capture headers when possible content_text = "" raw_payload = None try: api = getattr(self.client.chat.completions, "with_raw_response", None) if api: raw = api.create( model=model, messages=messages, tools=tools, tool_choice=tool_choice, temperature=temperature, stream=False, extra_headers=extra_headers, ) # Parse JSON body try: raw_payload = raw.parse() except Exception: raw_payload = getattr(raw, "http_response", None) # Extract headers try: hdrs = getattr(raw, "http_response", None) hdrs = getattr(hdrs, "headers", None) or {} token_saved = None for k, v in hdrs.items(): lk = (k.lower() if isinstance(k, str) else str(k).lower()) if lk in ("msh-context-cache-token-saved", "msh_context_cache_token_saved"): token_saved = v break if token_saved and session_id and prefix_hash: self.save_cache_token(session_id, tool_name, prefix_hash, token_saved) except Exception: pass # Pull content try: choice0 = (raw_payload.choices if hasattr(raw_payload, "choices") else raw_payload.get("choices"))[0] msg = getattr(choice0, "message", None) or choice0.get("message", {}) content_text = getattr(msg, "content", None) or msg.get("content", "") except Exception: content_text = "" else: # Fallback without raw headers support resp = self.client.chat.completions.create( model=model, messages=messages, tools=tools, tool_choice=tool_choice, temperature=temperature, stream=False, extra_headers=extra_headers, ) raw_payload = getattr(resp, "model_dump", lambda: resp)() try: content_text = resp.choices[0].message.content except Exception: content_text = (raw_payload.get("choices", [{}])[0].get("message", {}) or {}).get("content", "") except Exception as e: _log.error("Kimi chat call error: %s", e) raise # Normalize usage to a plain dict to ensure JSON-serializable output _usage = None try: if hasattr(raw_payload, "usage"): u = getattr(raw_payload, "usage") if hasattr(u, "model_dump"): _usage = u.model_dump() elif isinstance(u, dict): _usage = u else: _usage = { "prompt_tokens": getattr(u, "prompt_tokens", None), "completion_tokens": getattr(u, "completion_tokens", None), "total_tokens": getattr(u, "total_tokens", None), } elif isinstance(raw_payload, dict): _usage = raw_payload.get("usage") except Exception: _usage = None return { "provider": "KIMI", "model": model, "content": content_text or "", "tool_calls": None, "usage": _usage, "raw": getattr(raw_payload, "model_dump", lambda: raw_payload)() if hasattr(raw_payload, "model_dump") else raw_payload, } def generate_content( self, prompt: str, model_name: str, system_prompt: Optional[str] = None, temperature: float = 0.3, max_output_tokens: Optional[int] = None, images: Optional[list[str]] = None, **kwargs, ) -> ModelResponse: # Delegate to OpenAI-compatible base using Moonshot base_url # Ensure non-streaming by default for MCP tools kwargs.setdefault("stream", False) return super().generate_content( prompt=prompt, model_name=self._resolve_model_name(model_name), system_prompt=system_prompt, temperature=temperature, max_output_tokens=max_output_tokens, images=images, **kwargs, )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Zazzles2908/EX_AI-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server