Skip to main content
Glama

Rongcloud Native

Official
by rongcloud
engine.py27.1 kB
""" IM SDK Python Wrapper Module Used to load and wrap Rust Universal IM SDK dynamic library """ import os import ctypes import sys import threading import time from typing import Dict, Any, List from src.imsdk import LIB_DIR from src.imsdk.util import dict_to_ctypes,ctypes_to_dict from src.lib import rcim_client from src.lib.rcim_utils import string_cast, char_pointer_cast from src.lib.rcim_client import ( RcimConversationType_Group, RcimConversationType_Private, RcimDisconnectMode_NoPush, RcimEngineBuilder, RcimEngineBuilderParam, RcimEngineSync, RcimLogLevel_Debug, RcimMessageBox, RcimSendMessageOption ) PLATFORM = rcim_client.RcimPlatform_Unknown if sys.platform == 'darwin': PLATFORM = rcim_client.RcimPlatform_MacOS elif sys.platform == 'win32': PLATFORM = rcim_client.RcimPlatform_Windows elif sys.platform == 'linux': PLATFORM = rcim_client.RcimPlatform_Linux USER_ID = "" # Configure logging from src.utils.mcp_utils import logger class IMSDK: """Python wrapper class for IM SDK""" def __init__(self) -> None: """Initialize IM SDK""" # Initialize attributes self.engine = None self.builder = None def engine_build(self, app_key: str, device_id: str, area_code: int = -1, navi_url: str = "", stats_url: str = "" ) -> Dict[str, Any]: """ Initialize IM SDK and return status Args: app_key: Application's AppKey device_id: Device ID area_code: Area code (only valid when navi_url and stats_url are empty) navi_url: Navi URL (valid if area_code is empty and navi_url is not empty) stats_url: Statistic URL (valid if area_code is empty and stats_url is not empty) Returns: Failure: Dictionary containing code and message Success: Dictionary containing code, app_key, device_id and message """ if self.engine: return {"code": -1, "message": "Engine instance already built, please call destroy first"} # Store application info self.app_key = app_key self.device_id = device_id logger.info(f"Initializing IM SDK, AppKey: {app_key}, Device ID: {device_id}, Area Code: {area_code}, Navi URL: {navi_url}, Statistic URL: {stats_url}") # Prepare initialization parameters engine_builder_param = { 'app_key': app_key, "platform": PLATFORM, "device_id": device_id, "package_name": "", "imlib_version": "0.17.1", "device_model": "", "device_manufacturer": "", "os_version": "", "sdk_version_vec": {"name":"rust","version":"0.17.1"}, "sdk_version_vec_len": 1, "app_version": "1.0.0", } # Create parameter struct param = dict_to_ctypes(RcimEngineBuilderParam, engine_builder_param) # Create builder pointer builder = ctypes.pointer(ctypes.pointer(RcimEngineBuilder())) # Call creation function ret = rcim_client.rcim_create_engine_builder(param,builder) if ret != 0: logger.info(f"rcim_create_engine_builder failed, error code: {ret}") return {"code": -1, "message": f"rcim_create_engine_builder failed, error code: {ret}"} # Save builder reference self.builder = builder.contents # Set storage path db_path = os.path.join(LIB_DIR, "rust_db") # Ensure directory exists os.makedirs(db_path, exist_ok=True) ret = rcim_client.rcim_engine_builder_set_store_path(self.builder, char_pointer_cast(db_path)) if ret != 0: logger.info(f"rcim_engine_builder_set_store_path failed, error code: {ret}") if area_code <= 5 and area_code >= 1: ret = rcim_client.rcim_engine_builder_set_area_code(self.builder, area_code) if ret != 0: logger.info(f"rcim_engine_builder_set_area_code failed, error code: {ret}") else: # Set navi server if navi_url != "": navi_list = [navi_url] char_ptrs = (ctypes.POINTER(ctypes.c_char) * len(navi_list))() for i, string in enumerate(navi_list): if string is None: char_ptrs[i] = None continue # Create c_char array pointer for corresponding string char_array = ctypes.create_string_buffer(string.encode('utf-8')) char_ptrs[i] = ctypes.cast(char_array, ctypes.POINTER(ctypes.c_char)) double_ptr = ctypes.cast(char_ptrs, ctypes.POINTER(ctypes.POINTER(ctypes.c_char))) ret = rcim_client.rcim_engine_builder_set_navi_server(self.builder, double_ptr,1) if ret != 0: logger.info(f"rcim_engine_builder_set_navi_server failed, error code: {ret}") if stats_url != "": ret = rcim_client.rcim_engine_builder_set_statistic_server(self.builder, char_pointer_cast(stats_url)) if ret != 0: logger.info(f"rcim_engine_builder_set_statistic_server failed, error code: {ret}") # Create engine engine_ptr = ctypes.pointer(ctypes.pointer(RcimEngineSync())) # Build engine ret = rcim_client.rcim_engine_builder_build(self.builder, engine_ptr) if ret != 0: logger.info(f"rcim_engine_builder_build failed, error code: {ret}") return {"code": -1, "message": f"rcim_engine_builder_build failed, error code: {ret}"} # Save engine reference self.engine = engine_ptr.contents return { "code": 0, "app_key": app_key, "device_id": device_id, "message": "IM SDK initialization successful" } def engine_connect(self, token: str, timeout_sec: int = 10) -> Dict[str, Any]: """ Connect to Rongcloud service Args: token: User connection token timeout_sec: Connection timeout in seconds Returns: Failure: Dictionary containing code and message Success: Dictionary containing code, user_id and message """ if not self.engine: return {"code": -1, "message": "Engine instance not built yet, please call initialize first"} # Create callback data class class ConnectData: def __init__(self): self.result = {"code": -1, "message": ""} def callback(self, user_data, code, user_id): # Get string from String type user_id_str = string_cast(user_id) # Assign user_id_str to global variable _USER_ID global USER_ID USER_ID = user_id_str self.result = { "code": code, "user_id": user_id_str if code == 0 else "", "message": "Connection successful" if code == 0 else "Connection failed" } logger.info(f"Connection callback: {'successful' if code == 0 else 'failed'}, User ID: {user_id_str}, Error code: {code}") # Create callback data callback_data = ConnectData() # Use event to wait for callback completion connect_event = threading.Event() # Use RcimConnectCb type defined in rcim_client module directly # Create callback function def callback_wrapper(user_data, code, user_id): res = callback_data.callback(user_data, code, user_id) connect_event.set() return res # Use correct callback function type callback_fn = rcim_client.RcimConnectCb(callback_wrapper) # Correctly convert token token_buffer = char_pointer_cast(token) # Create timeout parameter timeout_c = ctypes.c_int(timeout_sec) # Call connect function, note engine instance access method rcim_client.rcim_engine_connect( self.engine[0], # Use self.engine[0] to get pointer object token_buffer, timeout_c, None, # Set user_data parameter to None callback_fn ) finished = connect_event.wait(timeout=timeout_sec + 1) if not finished: logger.info("Connection timeout, no callback received") return {"code": -2, "message": "Connection timeout, no callback received"} # Return callback result return callback_data.result def send_text_message(self, receiver: str, content: str, conversation_type, ext_content: dict = {}) -> Dict[str, Any]: """ Send message Args: receiver: Recipient ID content: Message content conversation_type: Conversation type, default is private chat Returns: Failure: Dictionary containing code and message Success: Dictionary containing code, message_id and message """ if not self.engine: return {"code": -1, "message": "Engine instance not built yet, please call initialize first"} if USER_ID == "": return {"code": -1, "message": "Not connected"} try: # Create callback data class class SendMessageData: def __init__(self): self.result = {"code": -1, "message": ""} def callback(self, user_data, code, message): try: self.result = { "code": code, "message": message if code == 0 else "Message sending failed" } logger.info(f"Send message callback: {'successful' if code == 0 else 'failed'}, Message: {message}, Error code: {code}") except Exception as e: logger.info(f"Internal error in callback function: {e}") self.result = {"code": -1, "message": str(e)} # Create callback data callback_data = SendMessageData() event = threading.Event() # Use correct callback function type and parameters def callback_wrapper(user_data, code, message_box): # Extract message ID if message_box and message_box.contents: message_dict = ctypes_to_dict(message_box.contents) res = callback_data.callback(user_data, code, message_dict) event.set() return res callback_fn = rcim_client.RcimCodeMessageCb(callback_wrapper) # Create an empty message callback function def empty_message_callback(user_data, message): # This is an empty implementation, just to satisfy type requirements pass # Define message callback type message_callback_fn = rcim_client.RcimMessageCb(empty_message_callback) message_box_dic = { 'conv_type':conversation_type, 'target_id':receiver, 'object_name': 'RC:TxtMsg', 'content' : { 'content':content }, 'uid': USER_ID, 'is_ext_supported': True if ext_content else False, 'ext_content': ext_content if ext_content else {} } # Create RcimMessageBox struct instance message_box = dict_to_ctypes(RcimMessageBox,message_box_dic) # Create send_message_option object send_option = dict_to_ctypes(RcimSendMessageOption,{}) # Call send function rcim_client.rcim_engine_send_message( self.engine[0], # Use self.engine[0] to get pointer object message_box, send_option, None, # Set user_data parameter to None callback_fn, message_callback_fn ) # Use event to wait for callback completion finished = event.wait(timeout=10) if not finished: logger.info("Message sending timeout, no callback received") return {"code": -2, "message": "Message sending timeout, no callback received"} # Return callback result return callback_data.result except Exception as e: import traceback logger.info(f"Message sending failed: {e}") logger.info(f"Exception stack: {traceback.format_exc()}") return { "code": -1, "message": str(e) } def get_history_messages(self, target_id: str, conversation_type: int, count: int = 10, timestamp: int = 0, order: int = 0) -> List[Dict[str, Any]]: """ Get remote historical messages Args: target_id: Target ID (private user ID or group ID) conversation_type: Conversation type, default is private chat count: Number of messages to retrieve, default is 10 timestamp: Timestamp, default is 0 (start from latest message) order: Sort order, 0 for descending, 1 for ascending Returns: Failure: Dictionary containing code and message Success: Dictionary containing code and message array """ if not self.engine: return [{"code": -1, "message": "Engine instance not built yet, please call initialize first"}] if USER_ID == "": return {"code": -1, "message": "Not connected"} if timestamp == 0: # Get current timestamp (milliseconds) timestamp = int(time.time() * 1000) # Create synchronization event done_event = threading.Event() class GetMessagesData: def __init__(self): self.messages = [] self.code = -1 def callback(self, user_data, code, messages, messages_len): self.code = code if code == 0 and messages and messages_len > 0: for i in range(messages_len): msg_dict = ctypes_to_dict(messages[i]) self.messages.append(msg_dict) done_event.set() # Callback ended, notify main thread # Create callback data callback_data = GetMessagesData() # Use correct callback function type and parameters def callback_wrapper(user_data, code, messages, messages_len): res = callback_data.callback(user_data, code, messages, messages_len) done_event.set() return res callback_fn = rcim_client.RcimGetMessageListCb(callback_wrapper) # Convert other parameters to C types count_c = ctypes.c_int(count) timestamp_c = ctypes.c_int64(timestamp) order_enum = rcim_client.RcimOrder_Descending if order == 0 else rcim_client.RcimOrder_Ascending # Call remote historical messages function rcim_client.rcim_engine_get_remote_history_messages( self.engine[0], # Engine pointer conversation_type, # Conversation type (private) char_pointer_cast(target_id), # Target user ID None, # Channel ID (empty) timestamp_c, # Timestamp count_c, # Message count order_enum, # Sort order True, # Include local messages None, # user_data callback_fn # Callback function ) # Wait for callback, maximum 5 seconds finished = done_event.wait(timeout=10) if not finished: logger.info("Get historical messages timeout, no callback received") return [{"code": -2, "message": "Get historical messages timeout, no callback received"}] if callback_data.code != 0: return [{"code": callback_data.code, "message": callback_data.messages}] return [{"code": 0, "messages": callback_data.messages}] def engine_disconnect(self) -> Dict[str, Any]: """ Disconnect from IM server Returns: Dictionary containing code and message """ global USER_ID if not self.engine: USER_ID = "" return {"code": -1, "message": "Engine instance not built yet, please call initialize first"} if USER_ID == "": return {"code": -1, "message": "Not connected"} # Create callback data class class DisconnectData: def __init__(self): self.result = {"code": -1, "message": ""} def callback(self, user_data, code): self.result = { "code": code, "message": "Disconnection successful" if code == 0 else "Disconnection failed" } if code == 0: global USER_ID USER_ID = "" # Create event for waiting callback completion disconnect_event = threading.Event() # Create callback data callback_data = DisconnectData() def callback_wrapper(user_data, code): res = callback_data.callback(user_data, code) disconnect_event.set() return res # Use correct callback function type callback_fn = rcim_client.RcimEngineErrorCb(callback_wrapper) logger.info("Disconnection preparation") rcim_client.rcim_engine_disconnect(self.engine[0], RcimDisconnectMode_NoPush, None, callback_fn) logger.info("Disconnection completed") # Wait for callback completion, maximum 3 seconds finished = disconnect_event.wait(timeout=10) if not finished: logger.info("Disconnection timeout, no callback received") return {"code": -2, "message": "Disconnection timeout, no callback received"} return callback_data.result def send_image_message(self, target_id: str, thumbnail_base64: str, image_uri: str, conversation_type: int, ext_content: dict = {}) -> Dict[str, Any]: """ Send private image message """ if not self.engine: return {"code": -1, "message": "Engine instance not built yet, please call initialize first"} if USER_ID == "": return {"code": -1, "message": "Not connected"} if thumbnail_base64 == "" or image_uri == "": return {"code": -1, "message": "Thumbnail base64 or image uri is empty"} # 处理 thumbnail_base64,如果包含逗号,取逗号后面的部分 if "," in thumbnail_base64: thumbnail_base64 = thumbnail_base64.split(",", 1)[1] try: # Create callback data class class SendMediaMessageData: def __init__(self): self.result = {"code": -1, "message": ""} def callback(self, user_data, code, message_dict): try: self.result = { "code": code, "message": message_dict if code == 0 else "Message sending failed" } except Exception as e: logger.info(f"Callback function internal error: {e}") self.result = {"code": -1, "message": str(e)} # Create synchronization event event = threading.Event() callback_data = SendMediaMessageData() def callback_wrapper(user_data, code, message_box): try: # Extract message ID message_dict = {} if message_box and message_box.contents: message_dict = ctypes_to_dict(message_box.contents) res = callback_data.callback(user_data, code, message_dict) event.set() return res except Exception as e: logger.error(f"Callback wrapper error: {e}") event.set() return 0 # Empty message callback function def empty_message_callback(user_data, message_box): # Empty implementation, just to satisfy type requirements pass # Empty progress callback function def empty_progress_callback(user_data, message_box, progress): # Empty implementation, just to satisfy type requirements pass callback_fn = rcim_client.RcimCodeMessageCb(callback_wrapper) message_callback_fn = rcim_client.RcimMessageCb(empty_message_callback) progress_callback_fn = rcim_client.RcimSendMessageOnProgressCb(empty_progress_callback) # Create empty function pointer - if you want to pass NULL media_handler_callback_fn = ctypes.cast(None, rcim_client.RcimMediaMessageHandlerCb) message_box_dic = { 'conv_type': conversation_type, 'target_id': target_id, 'object_name': 'RC:ImgMsg', 'content': { 'content': thumbnail_base64, 'imageUri': image_uri }, 'uid': USER_ID, 'is_ext_supported': True if ext_content else False, 'ext_content': ext_content if ext_content else {} } # Create RcimMessageBox struct instance message_box = dict_to_ctypes(RcimMessageBox, message_box_dic) # Create send_message_option object send_option = dict_to_ctypes(RcimSendMessageOption, {}) rcim_client.rcim_engine_send_media_message( self.engine[0], # Use self.engine[0] to get pointer object message_box, send_option, None, # Set user_data parameter to None callback_fn, message_callback_fn, progress_callback_fn, media_handler_callback_fn ) # Use event to wait for callback completion finished = event.wait(timeout=60) # Image upload may take longer if not finished: logger.info("Image message sending timeout, no callback received") return {"code": -2, "message": "Image message sending timeout, no callback received"} # Return callback result return callback_data.result except Exception as e: import traceback logger.info(f"Image message sending failed: {e}") logger.info(f"Exception stack: {traceback.format_exc()}") return { "code": -1, "message": str(e) } def recall_message(self, message_dict: Dict[str, Any]) -> Dict[str, Any]: """ Recall message Args: message_id: Message ID Returns: Success: Dictionary containing code and message """ if not self.engine: return {"code": -1, "message": "Engine instance not built yet, please call initialize first"} if USER_ID == "": return {"code": -1, "message": "Not connected"} try: # Create callback data class class RecallMessageData: def __init__(self): self.result = {"code": -1, "message": ""} def callback(self, user_data, code, recall_notification): try: self.result = { "code": code, "message": "Message recall successful" if code == 0 else "Message recall failed", } except Exception as e: logger.info(f"Callback function internal error: {e}") self.result = {"code": -1, "message": str(e)} # Create callback data callback_data = RecallMessageData() event = threading.Event() # Use correct callback function type and parameters def callback_wrapper(user_data, code, recall_notification): message_dict = {} if recall_notification and recall_notification.contents: message_dict = ctypes_to_dict(recall_notification.contents) res = callback_data.callback(user_data, code, message_dict) event.set() return res callback_fn = rcim_client.RcimRecallMessageCb(callback_wrapper) # Create RcimMessageBox struct instance message_box = dict_to_ctypes(RcimMessageBox, message_dict) # Call recall message function rcim_client.rcim_engine_recall_message( self.engine[0], # Use self.engine[0] to get pointer object message_box, None, # Set user_data parameter to None callback_fn ) # Use event to wait for callback completion finished = event.wait(timeout=10) if not finished: logger.info("Message recall timeout, no callback received") return {"code": -2, "message": "Message recall timeout, no callback received"} # Return callback result return callback_data.result except Exception as e: import traceback logger.info(f"Message recall failed: {e}") logger.info(f"Exception stack: {traceback.format_exc()}") return { "code": -1, "message": str(e) } def destroy(self): """ Destroy IM SDK """ if self.engine: self.engine_disconnect() self.engine = None self.builder = None # Create default SDK instance using default parameters default_sdk = IMSDK()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rongcloud/rongcloud-native-mcp-python'

If you have feedback or need assistance with the MCP directory API, please join our Discord server