Skip to main content
Glama

MCP Agent Platform

eye_agent.py11.8 kB
""" 视觉智能体 """ import cv2 import base64 import numpy as np from typing import Dict, Any, List, Tuple, Optional import asyncio import time import random import string import os from src.agents.base_agent import BaseAgent from src.utils.mcp_protocol import ImageMessage, TextMessage from src.utils.face_recognition import FaceRecognition from src.utils.person_database import PersonDatabase class EyeAgent(BaseAgent): def __init__(self, agent_id: str, host: str, port: int): super().__init__(agent_id, "vision", host, port) self.camera = None self.is_capturing = False self.last_analysis_time = 0 # 上次分析图像的时间 self.analysis_interval = 10.0 # 分析图像的时间间隔(秒) # 初始化人脸识别模块 self.face_recognition = FaceRecognition() # 初始化人物数据库 self.person_database = PersonDatabase() # 记录上次识别到的人物ID,用于避免重复问候 self.last_recognized_person_id = None self.last_recognition_time = 0 self.recognition_cooldown = 600.0 # 同一人物的问候冷却时间(秒) async def start(self): """启动视觉智能体""" await super().start() self.camera = cv2.VideoCapture(0) if not self.camera.isOpened(): self.logger.error("Failed to open camera") return self.is_capturing = True asyncio.create_task(self._capture_loop()) self.logger.info("视觉智能体已启动") self.logger.info(f"已加载{len(self.person_database.get_all_persons())}个已知人物数据") async def stop(self): """停止视觉智能体""" self.is_capturing = False if self.camera: self.camera.release() await super().stop() async def _process_face_recognition(self, image_base64: str) -> Optional[str]: """处理人脸识别 Args: image_base64: Base64编码的图像数据 Returns: 处理后的图像Base64编码,如果没有检测到人脸则返回None """ try: # self.logger.info(f"开始处理人脸图像识别") # 检测人脸并提取特征 faces, face_encodings, face_images = self.face_recognition.process_image(image_base64) # self.logger.info(f"已经监测到人脸图像") if len(faces) == 0: return None # self.logger.info(f"开始定义识别结果") # 识别结果 recognized_names = [] recognized_ids = [] # self.logger.info(f"开始定义识别到的人物信息") recognized_info = [] # 存储识别到的人物信息,用于显示 # self.logger.info(f"开始定义识别的时间") current_time = time.time() # self.logger.info(f"开始处理每个检测到的人脸") # 处理每个检测到的人脸 for i, (face, encoding, face_image) in enumerate(zip(faces, face_encodings, face_images)): self.logger.info(f"查找相似人物") # 查找相似人物 person_id = self.person_database.find_similar_person(encoding) if person_id is not None: # 已知人物 person = self.person_database.get_person(person_id) name = person["name"] recognized_names.append(name) recognized_ids.append(person_id) # 计算上次见面时间 last_seen = person["last_seen"] time_diff = current_time - last_seen time_info = self._format_time_diff(time_diff) if time_diff > 60 else "刚刚" # 更新人物信息 self.person_database.update_person( person_id, last_seen=current_time, face_image=face_image ) # 添加识别信息 recognized_info.append(f"{name} (见过{person['seen_count']}次,上次见面: {time_info})") self.logger.info(f"识别到已知人物: {name} (ID: {person_id}, 见过{person['seen_count']}次)") else: self.logger.info(f"新人物,生成随机名称") # 新人物,生成随机名称 random_name = self._generate_random_name() # 添加到数据库 new_id = self.person_database.add_person( name=random_name, face_encoding=encoding, face_image=face_image ) recognized_names.append(random_name) recognized_ids.append(new_id) recognized_info.append(f"{random_name} (初次见面)") self.logger.info(f"添加新人物: {random_name} (ID: {new_id})") self.logger.info(f"绘制人脸边界框和名称") # 绘制人脸边界框和名称 processed_image = self.face_recognition.draw_faces(image_base64, faces, recognized_names) # 检查是否需要发送问候消息 await self._send_greeting_if_needed(recognized_ids, recognized_names, recognized_info, current_time) return processed_image except Exception as e: self.logger.error(f"处理人脸识别时出错: {e}") return None def _generate_random_name(self) -> str: """生成随机人物名称""" prefixes = ["赵", "钱", "孙", "李", "王", "刘", "田"] names = ["小明", "小红", "小华", "小天", "小宇", "小花", "小美", "小宁", "小德", "小琳"] prefix = random.choice(prefixes) name = random.choice(names) # 添加随机数字后缀,确保唯一性 suffix = ''.join(random.choices(string.digits, k=2)) return f"{prefix}{name}{suffix}" def _format_time_diff(self, time_diff: float) -> str: """格式化时间差 Args: time_diff: 时间差(秒) Returns: 格式化后的时间差字符串 """ # 转换为整数 seconds = int(time_diff) # 计算不同时间单位 minutes = seconds // 60 hours = minutes // 60 days = hours // 24 # 根据时间长短选择合适的显示格式 if days > 0: return f"{days}天前" elif hours > 0: return f"{hours}小时前" elif minutes > 0: return f"{minutes}分钟前" else: return f"{seconds}秒前" async def _send_greeting_if_needed(self, person_ids: List[int], person_names: List[str], person_info: List[str], current_time: float) -> None: """如果需要,发送问候消息 Args: person_ids: 识别到的人物ID列表 person_names: 识别到的人物名称列表 person_info: 识别到的人物详细信息列表 current_time: 当前时间戳 """ if not person_ids: return # 选择第一个检测到的人物进行问候 person_id = person_ids[0] person_name = person_names[0] # 检查是否需要问候(避免频繁问候同一个人) if (self.last_recognized_person_id != person_id or current_time - self.last_recognition_time >= self.recognition_cooldown): # 更新最后识别记录 self.last_recognized_person_id = person_id self.last_recognition_time = current_time # 获取人物信息 person = self.person_database.get_person(person_id) seen_count = person["seen_count"] # 构建问候消息 if seen_count <= 1: greeting = f"你好,{person_name}!很高兴认识你!" else: greeting = f"你好,{person_name}!很高兴再次见到你!" #这是我们第{seen_count}次见面了。" # 发送问候消息到大脑 greeting_message = TextMessage( sender_id=self.agent_id, receiver_id="brain", text=f"[视觉识别] 我看到了{person_name}。{greeting}" ) self.logger.info(f"发送问候消息: {greeting}") await self.send_message("brain", greeting_message.to_dict()) async def _capture_loop(self): """持续捕获视频的循环""" while self.is_capturing: try: ret, frame = self.camera.read() if ret: # 转换图像为JPEG格式的base64字符串 _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70]) image_base64 = base64.b64encode(buffer).decode('utf-8') # 处理人脸识别 processed_image = await self._process_face_recognition(image_base64) # 发送到Web界面 from src.web.server import broadcast_message success = await broadcast_message({ "type": "vision", "content": processed_image or image_base64 }) # 定期发送图像到大脑进行分析 current_time = time.time() if current_time - self.last_analysis_time >= self.analysis_interval: self.last_analysis_time = current_time # 获取当前识别到的人物名称(如果有) person_name = None if self.last_recognized_person_id is not None: person = self.person_database.get_person(self.last_recognized_person_id) if person: person_name = person["name"] # 创建图像消息,包含人物名称 message = ImageMessage( sender_id=self.agent_id, receiver_id="brain", image_data=image_base64, person_name=person_name ) # 发送到大脑智能体 self.logger.info("发送图像到大脑进行分析") await self.send_message("brain", message.to_dict()) # 控制帧率,设置为1FPS await asyncio.sleep(1.0) # 1FPS except Exception as e: self.logger.error(f"Error in capture loop: {e}") await asyncio.sleep(15.0) # 出错时等待5秒再重试

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rolenet/McpAgentRobot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server