Skip to main content
Glama

Rini MCP Server

by mori-mmmm
util_ai.py13.8 kB
import os from dotenv import load_dotenv import asyncio import mimetypes from pydub import AudioSegment import uuid import io load_dotenv() def get_tool_prompt(tools): prompt = f""" 당신이 사용할 수 있는 tool의 목록과 각각의 용도와 인자들은 다음과 같습니다: {tools} tool을 사용하기로 결정한 경우 **코드 블럭에 넣어서** json 포맷으로 바로 생성하세요. 출력 포맷은 다음과 같습니다: 하고 싶은 말이나 설명을 덧붙여도 좋습니다 ```json {{ tool_calls: [ {{ name: (함수 이름), args: {{ (인자 이름1): (인자 값1), (인자 이름2): (인자 값2), ... }} }}, ... ] }} ``` """ return prompt ### Google Gemini ### #import google.generativeai as genai from google import genai from google.genai import types gemini_client = genai.Client(api_key=os.getenv("GOOGLE_API_KEY")) def _prepare_gemini_contents(messages): contents = [] current_content = None for message in messages: role = message.get("role", "user") content_text = message.get("content") if not content_text: continue gemini_role = "user" if role == "assistant": gemini_role = "model" elif role == "system": gemini_role = "user" # 시스템 프롬프트는 user 역할로 처리 text_part = {"text": content_text} if current_content and current_content["role"] == gemini_role: current_content["parts"].append(text_part) else: current_content = { "role": gemini_role, "parts": [text_part] } contents.append(current_content) return contents async def gai_get_m2t(messages, tools=None, model=None, thinking_budget=None, generation_config_override=None, **kwargs): GEMINI_MODEL = "gemini-2.5-pro-preview-05-06" if not model: model = GEMINI_MODEL contents = _prepare_gemini_contents(messages) gen_config_dict = generation_config_override if generation_config_override else {} if thinking_budget is not None and thinking_budget > 0: gen_config_dict['thinking_config'] = {"thinking_budget": thinking_budget} response = await gemini_client.aio.models.generate_content( model=model, contents=contents, config=gen_config_dict if gen_config_dict else None, **kwargs # request_options={'timeout': 600} # 필요시 타임아웃 설정 ) return response.text async def gai_get_t2t(text, tools=None, model=None, thinking_budget=None, generation_config_override=None, **kwargs): messages = [ {"role":"user", "content":text} ] ret = await get_m2t(messages, tools=tools, model=model, thinking_budget=thinking_budget, generation_config_override=generation_config_override, **kwargs) return ret async def gai_get_m2m(messages, tools=None, model=None, thinking_budget=None, generation_config_override=None, **kwargs): ret = await get_m2t(messages, tools=tools, model=model, thinking_budget=thinking_budget, generation_config_override=generation_config_override, **kwargs) messages.append( {"role":"assistant", "content":ret} ) return messages ### OpenAI ### from openai import OpenAI import tiktoken openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) async def get_embedding(text, model="text-embedding-3-large"): response = await asyncio.to_thread( openai_client.embeddings.create, input=[text], model=model ) return response.data[0].embedding async def get_embedding_batch(texts, model="text-embedding-3-large"): response = await asyncio.to_thread( openai_client.embeddings.create, input=texts, model=model ) return [x.embedding for x in response.data] async def oai_get_m2t(messages, tools=None, model=None): OPENAI_MODEL = "gpt-4.1-nano" if not model: model = OPENAI_MODEL print("oai called:", model) if tools: prompt = get_tool_prompt(tools) new_messages = [x for x in messages] new_messages += [{"role":"user", "content": prompt}] completion = await asyncio.to_thread( openai_client.chat.completions.create, model=model, messages=new_messages, ) else: completion = await asyncio.to_thread( openai_client.chat.completions.create, model=model, messages=messages, ) return completion.choices[0].message.content async def oai_get_t2t(prompt, tools=None, model=None): messages = [{"role":"user", "content":prompt}] ret = await oai_get_m2t(messages, tools, model) return ret async def oai_get_m2m(messages, tools=None, model=None): ret = await get_m2t(messages, tools, model) messages.append( {"role":"assistant", "content":ret} ) return messages async def num_tokens_from_messages(messages, model="gpt-4o-2024-08-06"): """Returns the number of tokens used by a list of messages.""" def _sync_num_tokens_from_messages(messages, model): try: encoding = tiktoken.encoding_for_model(model) except KeyError: encoding = tiktoken.get_encoding("cl100k_base") if model == "gpt-4o-2024-08-06": num_tokens = 0 for message in messages: num_tokens += 4 # every message follows <im_start>{role/name}\n{content}<im_end>\n for key, value in message.items(): num_tokens += len(encoding.encode(value, disallowed_special=())) if key == "name": # if there's a name, the role is omitted num_tokens += -1 # role is always required and always 1 token num_tokens += 2 # every reply is primed with <im_start>assistant return num_tokens else: raise NotImplementedError(f"""num_tokens_from_messages() is not presently implemented for model {model}. Please check the library docs for more info on message-to-token conversion.""") return await asyncio.to_thread(_sync_num_tokens_from_messages, messages, model) async def _transcribe_chunk_openai(chunk_audio_segment, chunk_index, original_format="mp3"): """Helper function to transcribe a single audio chunk using OpenAI Whisper.""" temp_file_path = f"audio_chunks/temp_chunk_{uuid.uuid4()}_{chunk_index}.{original_format}" if not await asyncio.to_thread(os.path.exists, "audio_chunks"): try: await asyncio.to_thread(os.makedirs, "audio_chunks") except OSError as e: print(f"Error creating directory audio_chunks: {e}") try: await asyncio.to_thread(chunk_audio_segment.export, temp_file_path, format=original_format) except Exception as e: print(f"Error exporting audio chunk {chunk_index} to {temp_file_path}: {e}") return "" transcription = "" try: with open(temp_file_path, 'rb') as audio_file_chunk_sync: response = await asyncio.to_thread( openai_client.audio.transcriptions.create, model="gpt-4o-transcribe", file=audio_file_chunk_sync ) transcription = response.text if response else "" except Exception as e: print(f"Error transcribing chunk {chunk_index} from {temp_file_path}: {e}") transcription = "" finally: if await asyncio.to_thread(os.path.exists, temp_file_path): try: await asyncio.to_thread(os.remove, temp_file_path) except Exception as e: print(f"Error deleting temporary file {temp_file_path}: {e}") return transcription async def stt(file_path): try: file_format = file_path.split('.')[-1].lower() if file_format not in ["mp3", "wav", "m4a", "ogg", "flac"]: print(f"Unsupported file format {file_format}, defaulting to mp3 for export.") file_format = "mp3" audio = await asyncio.to_thread(AudioSegment.from_file, file_path) ten_minutes_ms = 10 * 60 * 1000 # pydub uses milliseconds chunks = [] def _create_chunks_sync(audio_segment, chunk_size_ms): _chunks = [] for i in range(0, len(audio_segment), chunk_size_ms): _chunks.append(audio_segment[i:i + chunk_size_ms]) return _chunks chunks = await asyncio.to_thread(_create_chunks_sync, audio, ten_minutes_ms) if not chunks: print("No audio chunks to process.") return '' transcription_tasks = [] for i, chunk_segment in enumerate(chunks): transcription_tasks.append(_transcribe_chunk_openai(chunk_segment, i, original_format=file_format)) transcriptions_list = await asyncio.gather(*transcription_tasks) full_transcription = " ".join(filter(None, transcriptions_list)) return full_transcription except FileNotFoundError: print(f"Error: Audio file not found at {file_path}") return '' except Exception as e: print(f"An unexpected error occurred in stt function: {e}") return '' ### Anthropic ### #TODO async def summarize(text, in_msg=None): if in_msg is not None: ret = await get_t2t(f"""다음 텍스트를 아래 내용을 감안해서 요약해줘:\n{text}\n\n이 내용에 대해 답하는데 필요한 내용 위주로 요약해야 해:\n{in_msg}""") else: ret = await get_t2t(f"""다음 텍스트를 아래 내용을 감안해서 요약해줘:\n{text}\n\n""") return ret async def pdf_qa(pdf_text, prompt): ret = await get_t2t(f"""다음 텍스트 내용을 바탕으로 아래 지시사항을 따르거나 질문에 대답해줘:\n{pdf_text}\n\n요청이나 질문 사항은 다음과 같고, 만약 비어있으면 그냥 텍스트 내용을 요약해줘.:\n{prompt}""", model="gpt-4o-2024-08-06") return ret async def get_t2t(text, tools=None, model=None): if model is None: ret = await oai_get_t2t(text, tools, model) elif model.startswith("gemini"): ret = await gai_get_t2t(text, tools, model) else: ret = await oai_get_t2t(text, tools, model) return ret async def get_m2t(messages, model=None, thinking_budget=None, generation_config_override=None, tools=None, **kwargs): if model is None: ret = await oai_get_m2t(messages, tools, model) elif model.startswith("gemini"): ret = await gai_get_m2t(messages, tools=tools, model=model, thinking_budget=thinking_budget, generation_config_override=generation_config_override, **kwargs) else: ret = await oai_get_m2t(messages, tools, model) return ret async def get_m2m(messages, tools=None, model=None): if model is None: ret = await oai_get_m2m(messages, tools, model) elif model.startswith("gemini"): ret = await gai_get_m2m(messages, tools, model) else: ret = await oai_get_m2m(messages, tools, model) return ret async def get_t2i(text): response = await asyncio.to_thread( openai_client.images.generate, model="dall-e-3", prompt=text, size="1024x1024", quality="standard", n=1, ) image_urls = [response.data[i].url for i in range(1)] return image_urls[0] async def get_it2t_url(prompt, img_url): response = await asyncio.to_thread( openai_client.chat.completions.create, model="gpt-4o-2024-08-06", messages=[{ "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": img_url, }, }, ], }], max_tokens=300, ) return response.choices[0].message.content def _read_image_bytes_and_detect_mime(image_path: str): """이미지 파일 경로에서 바이트 데이터와 MIME 타입을 읽고 감지합니다 (동기).""" if not os.path.exists(image_path): raise FileNotFoundError(f"이미지 파일을 찾을 수 없습니다: '{image_path}'") mime_type, _ = mimetypes.guess_type(image_path) # supported_image_types = ['image/png', 'image/jpeg', 'image/webp', 'image/heic', 'image/heif'] with open(image_path, "rb") as image_file: image_bytes = image_file.read() return image_bytes, mime_type async def get_it2t_path(image_path: str, prompt: str): """이미지와 텍스트 프롬프트를 받아 비동기적으로 Gemini 멀티모달 모델 호출""" fixed_model_name = "gemini-2.0-flash" image_bytes, detected_mime_type = await asyncio.to_thread( _read_image_bytes_and_detect_mime, image_path ) image_part = types.Part.from_bytes(mime_type=detected_mime_type, data=image_bytes) text_part = types.Part.from_text(text=prompt) contents = [types.Content(role="user", parts=[image_part, text_part])] response = await gemini_client.aio.models.generate_content( model=fixed_model_name, contents=contents, ) response_text = "" if hasattr(response, 'text') and response.text: response_text = response.text elif response.candidates: first_candidate = response.candidates[0] if first_candidate.content and first_candidate.content.parts and first_candidate.content.parts[0].text: response_text = first_candidate.content.parts[0].text return response_text

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mori-mmmm/Rini-MCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server