get_key_type
Retrieves the type and TTL (time to live) of a specified Redis key for monitoring key properties.
Instructions
获取键的类型和TTL
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| key | Yes | Redis键名 |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- src/redis_mcp_server/db.py:120-130 (handler)Core implementation of get_key_type in RedisConnection class. Calls Redis TYPE and TTL commands, checks key access permissions, returns dict with key name, type, and TTL info.
def get_key_type(self, key: str) -> dict[str, Any]: """获取键的类型""" if not self._is_key_allowed(key): raise PermissionError(f"键 '{key}' 不允许访问") key_type = self._client.type(key) ttl = self._client.ttl(key) return { "key": key, "type": key_type, "ttl": ttl if ttl >= 0 else ("永不过期" if ttl == -1 else "键不存在"), } - src/redis_mcp_server/server.py:85-98 (handler)MCP tool handler for get_key_type. Decorated with @mcp.tool(), calls db.get_key_type(), handles PermissionError and general exceptions, returns JSON string.
@mcp.tool() def get_key_type(key: str) -> str: """获取键的类型和TTL Args: key: Redis键名 """ try: result = db.get_key_type(key) return json.dumps(result, ensure_ascii=False, indent=2, default=str) except PermissionError as e: return json.dumps({"error": str(e)}, ensure_ascii=False) except Exception as e: return json.dumps({"error": str(e)}, ensure_ascii=False) - src/redis_mcp_server/server.py:85-86 (registration)Registration of get_key_type via @mcp.tool() decorator on the handler function.
@mcp.tool() def get_key_type(key: str) -> str: - src/redis_mcp_server/server.py:87-91 (schema)Input parameter schema for get_key_type tool: takes a single 'key' (str) argument.
"""获取键的类型和TTL Args: key: Redis键名 """ - src/redis_mcp_server/db.py:39-130 (helper)Access control helpers _is_key_blocked and _is_key_allowed used by get_key_type to validate key access before querying Redis.
def _is_key_blocked(self, key: str) -> bool: """检查键是否在禁止访问列表中""" if self.config.blocked_key_patterns: for pattern in self.config.blocked_key_patterns: if fnmatch.fnmatch(key, pattern): return True return False def _is_key_allowed(self, key: str) -> bool: """检查键是否允许访问(前缀过滤+黑名单)""" if self._is_key_blocked(key): return False if self.config.key_prefix and not key.startswith(self.config.key_prefix): return False return True def _check_write_permission(self) -> None: """检查是否有写权限""" if self.config.read_only: raise PermissionError( "当前为只读模式,写操作被禁止。" "如需开启写操作,请设置 REDIS_MCP_READ_ONLY=false" ) def _truncate_value(self, value: str) -> str: """截断过长的值""" if len(value) > self.config.max_value_length: return value[: self.config.max_value_length] + "\n... (已截断)" return value # ============================================================ # 只读操作 # ============================================================ def ping(self) -> dict[str, Any]: """测试Redis连接""" result = self._client.ping() return {"connected": result, "host": self.config.host, "port": self.config.port} def info(self, section: Optional[str] = None) -> dict[str, Any]: """获取Redis服务器信息""" raw = self._client.info(section) # 转换非序列化类型 result = {} for k, v in raw.items(): if isinstance(v, (int, float, str, bool)): result[k] = v elif isinstance(v, bytes): result[k] = v.decode("utf-8", errors="replace") elif isinstance(v, dict): result[k] = {sk: sv for sk, sv in v.items() if isinstance(sv, (int, float, str, bool))} else: result[k] = str(v) return result def dbsize(self) -> dict[str, Any]: """获取当前数据库键数量""" return {"db": self.config.db, "key_count": self._client.dbsize()} def scan_keys( self, pattern: str = "*", count: Optional[int] = None ) -> dict[str, Any]: """扫描匹配的键""" max_count = count or self.config.max_key_count keys = [] cursor = 0 while True: cursor, batch = self._client.scan(cursor=cursor, match=pattern, count=min(max_count, 100)) keys.extend(batch) if cursor == 0 or len(keys) >= max_count: break keys = keys[:max_count] # 过滤不允许访问的键 keys = [k for k in keys if self._is_key_allowed(k)] return { "pattern": pattern, "keys": keys, "count": len(keys), "truncated": len(keys) >= max_count, } def get_key_type(self, key: str) -> dict[str, Any]: """获取键的类型""" if not self._is_key_allowed(key): raise PermissionError(f"键 '{key}' 不允许访问") key_type = self._client.type(key) ttl = self._client.ttl(key) return { "key": key, "type": key_type, "ttl": ttl if ttl >= 0 else ("永不过期" if ttl == -1 else "键不存在"), }