Skip to main content
Glama
ReexpressAI

Reexpress MCP Server

Official
by ReexpressAI
mcp_utils_controller.py21.5 kB
# Copyright Reexpress AI, Inc. All rights reserved. # Primary controller for MCP server import os from pathlib import Path import torch import asyncio import uuid from datetime import datetime import constants import utils_model import mcp_utils_data_format import mcp_utils_test import mcp_utils_llm_api import mcp_utils_file_access_manager import mcp_utils_tool_limits_manager import data_validator import utils_visualization import mcp_utils_sqlite_document_db_controller class AdaptationError(Exception): def __init__(self, message="An adaptation error occurred", error_code=None): self.message = message self.error_code = error_code super().__init__(self.message) def __str__(self): if self.error_code is not None: return f"{self.error_code}: {self.message}" return self.message class MCPServerStateController: """ Primary controller for the Model-Context-Protocol (MCP) server. ExternalFileController() handles external files sent to the verification LLMs ToolCallLimitController() handles soft and hard tool-call limits mcp_utils_llm_api.py handles LLM API calls and transforms of the output The environment variables REEXPRESS_MCP_SERVER_REPO_DIR and REEXPRESS_MCP_MODEL_DIR, along with the LLM api variables, must be present. See the documentation for details. """ def __init__(self): REEXPRESS_MCP_SERVER_REPO_DIR = os.getenv("REEXPRESS_MCP_SERVER_REPO_DIR") self.MODEL_DIR = os.getenv("REEXPRESS_MCP_MODEL_DIR") try: self.CREATE_HTML_VISUALIZATION = int(os.getenv("REEXPRESS_MCP_SAVE_OUTPUT")) == 1 # Note the file must exist (content can be empty) self.HTML_VISUALIZATION_FILE = str( Path(self.MODEL_DIR, "visualize", "current_reexpression.html").as_posix()) except: self.CREATE_HTML_VISUALIZATION = False self.HTML_VISUALIZATION_FILE = None # For provenance of any added instances (the file must exist---content can be empty): self.DATA_UPDATE_FILE = str(Path(self.MODEL_DIR, "adaptation", "running_updates.jsonl").as_posix()) # load model self.main_device = torch.device("cpu") self.model = utils_model.load_model_torch(self.MODEL_DIR, torch.device("cpu"), load_for_inference=True) # self.global_uncertainty_statistics = utils_model.load_global_uncertainty_statistics_from_disk(self.MODEL_DIR) # Data Access Manager: Controls file access and content sent directly to the verification LLMs self.mcp_file_access_manager_object = \ mcp_utils_file_access_manager.ExternalFileController(mcp_server_dir=REEXPRESS_MCP_SERVER_REPO_DIR) self.mcp_utils_tool_limits_manager_object = \ mcp_utils_tool_limits_manager.ToolCallLimitController(mcp_server_dir=REEXPRESS_MCP_SERVER_REPO_DIR) self.current_reexpression = None try: self.reexpress_mcp_server_support_documents_file = str( Path(self.MODEL_DIR, "reexpress_mcp_server_db", "reexpress_mcp_server_support_documents.db").as_posix()) self.support_db = \ mcp_utils_sqlite_document_db_controller.DocumentDatabase( self.reexpress_mcp_server_support_documents_file) except: self.support_db = None def controller_directory_set(self, directory: str) -> str: return self.mcp_file_access_manager_object.add_environment_parent_directory(parent_directory=directory) def controller_file_set(self, filename: str) -> str: return self.mcp_file_access_manager_object.add_file(filename_with_path=filename) def controller_file_clear(self) -> str: return self.mcp_file_access_manager_object.remove_all_file_access() def controller_get_tool_availability(self) -> (bool, str): return self.mcp_utils_tool_limits_manager_object.get_tool_availability() def controller_update_counters(self): self.mcp_utils_tool_limits_manager_object.update_counters() def controller_reset_sequential_limit_counter(self) -> str: return self.mcp_utils_tool_limits_manager_object.reset_sequential_limit_counter() async def _run_tasks_with_taskgroup(self, task_configs): tasks = [] async with asyncio.TaskGroup() as tg: for func, args in task_configs: task = tg.create_task(asyncio.to_thread(func, args)) tasks.append(task) return [task.result() for task in tasks] async def get_reexpressed_verification(self, user_question: str, ai_response: str) -> str: content_xml, available_file_names = self.mcp_file_access_manager_object.get_current_external_file_content() if len(content_xml) > 0: attached_documents = f" {content_xml} " else: attached_documents = "" available_file_names = [] attached_document_note = "" if len(attached_documents) > 0: attached_document_note = "(Note: I have included additional documents relevant to this discussion within the <attached_file></attached_file> XML tags.) " previous_query_and_response_to_verify_string = \ f"<question> {attached_documents}{user_question} {attached_document_note}</question> <ai_response> {ai_response} </ai_response>" # currently identical: previous_query_and_response_to_verify_string_gemini = \ f"<question> {attached_documents}{user_question} {attached_document_note}</question> <ai_response> {ai_response} </ai_response>" task_configs = [(mcp_utils_llm_api.get_document_attributes_from_gpt5, previous_query_and_response_to_verify_string), (mcp_utils_llm_api.get_document_attributes_from_gemini_reasoning, previous_query_and_response_to_verify_string_gemini) ] try: results = await self._run_tasks_with_taskgroup(task_configs) gpt5_model_verification_dict, \ gemini_model_verification_dict = results llm_api_error = gpt5_model_verification_dict[constants.LLM_API_ERROR_KEY] or \ gemini_model_verification_dict[constants.LLM_API_ERROR_KEY] except: llm_api_error = True formatted_output_string = "" partial_reexpression = {} if not llm_api_error: # get embedding over model explanations gpt5_model_summary, \ gpt5_model_classification, gpt5_model_explanation, \ gemini_model_classification, gemini_model_explanation = \ mcp_utils_llm_api.get_model_explanations(gpt5_model_verification_dict, gemini_model_verification_dict) agreement_model_embedding, agreement_model_classification = \ mcp_utils_llm_api.llm_api_controller(gpt5_model_summary=gpt5_model_summary, gpt5_model_explanation=gpt5_model_explanation, gemini_model_explanation=gemini_model_explanation) llm_api_error = agreement_model_embedding is None or agreement_model_classification is None if not llm_api_error: reexpression_input = mcp_utils_data_format.construct_document_attributes_and_embedding( gpt5_model_verification_dict, gemini_model_verification_dict, agreement_model_embedding) prediction_meta_data = mcp_utils_test.test(self.main_device, self.model, reexpression_input) if prediction_meta_data is not None: formatted_output_string = mcp_utils_test.format_sdm_estimator_output_for_mcp_tool( prediction_meta_data, gpt5_model_explanation, gemini_model_explanation, agreement_model_classification) partial_reexpression["reexpression_input"] = reexpression_input partial_reexpression["prediction_meta_data"] = prediction_meta_data partial_reexpression[constants.REEXPRESS_MODEL1_CLASSIFICATION] = gpt5_model_classification partial_reexpression[constants.REEXPRESS_MODEL2_CLASSIFICATION] = gemini_model_classification partial_reexpression[constants.REEXPRESS_MODEL1_EXPLANATION] = gpt5_model_explanation partial_reexpression[constants.REEXPRESS_MODEL2_EXPLANATION] = gemini_model_explanation partial_reexpression[constants.REEXPRESS_AGREEMENT_MODEL_CLASSIFICATION] = \ agreement_model_classification partial_reexpression[constants.REEXPRESS_MODEL1_TOPIC_SUMMARY] = gpt5_model_summary now = datetime.now() submitted_time = now.strftime("%Y-%m-%d %H:%M:%S") partial_reexpression[constants.REEXPRESS_SUBMITTED_TIME_KEY] = submitted_time if formatted_output_string == "": formatted_output_string = ( mcp_utils_test.get_formatted_sdm_estimator_output_string( False, constants.CALIBRATION_RELIABILITY_LABEL_OOD, constants.SHORT_EXPLANATION_FOR_CLASSIFICATION_CONFIDENCE__DEFAULT_ERROR, constants.SHORT_EXPLANATION_FOR_CLASSIFICATION_CONFIDENCE__DEFAULT_ERROR, agreement_model_classification=False, hr_class_conditional_accuracy=self.model.hr_class_conditional_accuracy)) self.current_reexpression = None else: partial_reexpression["formatted_output_string"] = formatted_output_string # Currently we do not store attached documents (if any) for archive with data additions. However, we do # save the file names. partial_reexpression[constants.REEXPRESS_QUESTION_KEY] = user_question partial_reexpression[constants.REEXPRESS_AI_RESPONSE_KEY] = ai_response partial_reexpression[constants.REEXPRESS_ATTACHED_FILE_NAMES] = available_file_names self.current_reexpression = partial_reexpression self.save_html_visualization() return formatted_output_string def get_nearest_match_meta_data(self): try: nearest_support_idx = self.current_reexpression["prediction_meta_data"]["nearest_training_idx"] nearest_support_document_id = self.model.train_uuids[nearest_support_idx] nearest_match_meta_data = self.support_db.get_document(nearest_support_document_id) # also add true labels and predictions nearest_match_meta_data["model_train_label"] = self.model.train_labels[nearest_support_idx] nearest_match_meta_data["model_train_predicted_label"] = \ self.model.train_predicted_labels[nearest_support_idx] # label_int is also stored in the db for convenience, but the source of truth is model_train_label return nearest_match_meta_data except: return None def save_html_visualization(self): if self.current_reexpression is not None and self.CREATE_HTML_VISUALIZATION and \ self.HTML_VISUALIZATION_FILE is not None: try: nearest_match_meta_data = self.get_nearest_match_meta_data() html_content = utils_visualization.create_html_page(self.current_reexpression, nearest_match_meta_data=nearest_match_meta_data, nearest_match_meta_data_is_html_escaped=True) html_file_path = Path(self.HTML_VISUALIZATION_FILE) if not html_file_path.exists() or not html_file_path.is_file() \ or html_file_path.is_symlink(): return else: with open(str(html_file_path.as_posix()), 'w', encoding='utf-8') as f: f.write(html_content) except: return def update_model_support(self, label: int) -> str: message = "There is no existing reexpression in the cache." if self.model is None or self.current_reexpression is None: return message try: if label not in [0, 1, data_validator.oodLabel]: raise AdaptationError(f"The provided label is not in [0, 1, {data_validator.oodLabel}].", "LABEL_ERROR") running_updates_file_path = Path(self.DATA_UPDATE_FILE) if not running_updates_file_path.exists() or not running_updates_file_path.is_file() \ or running_updates_file_path.is_symlink(): raise AdaptationError(f"The running_updates.jsonl file does not exist.", "MISSING_UPDATES_FILE") if constants.ADMIN_LABEL_MODE: document_id = f"added_{str(uuid.uuid4())}" else: document_id = f"user_added_{str(uuid.uuid4())}" prediction_meta_data = self.current_reexpression["prediction_meta_data"] exemplar_vector = prediction_meta_data["exemplar_vector"] json_for_archive = {} reexpression_input = self.current_reexpression["reexpression_input"] embedding = [float(x) for x in reexpression_input.squeeze().detach().cpu().tolist()] json_for_archive[constants.REEXPRESS_ID_KEY] = document_id json_for_archive[constants.REEXPRESS_DOCUMENT_KEY] = "" json_for_archive[constants.REEXPRESS_LABEL_KEY] = label json_for_archive[constants.REEXPRESS_EMBEDDING_KEY] = embedding json_for_archive[constants.REEXPRESS_QUESTION_KEY] = \ self.current_reexpression[constants.REEXPRESS_QUESTION_KEY] json_for_archive[constants.REEXPRESS_AI_RESPONSE_KEY] = \ self.current_reexpression[constants.REEXPRESS_AI_RESPONSE_KEY] json_for_archive[constants.REEXPRESS_MODEL1_CLASSIFICATION] = \ self.current_reexpression[constants.REEXPRESS_MODEL1_CLASSIFICATION] json_for_archive[constants.REEXPRESS_MODEL2_CLASSIFICATION] = \ self.current_reexpression[constants.REEXPRESS_MODEL2_CLASSIFICATION] json_for_archive[constants.REEXPRESS_MODEL1_EXPLANATION] = \ self.current_reexpression[constants.REEXPRESS_MODEL1_EXPLANATION] json_for_archive[constants.REEXPRESS_MODEL2_EXPLANATION] = \ self.current_reexpression[constants.REEXPRESS_MODEL2_EXPLANATION] json_for_archive[constants.REEXPRESS_AGREEMENT_MODEL_CLASSIFICATION] = \ self.current_reexpression[constants.REEXPRESS_AGREEMENT_MODEL_CLASSIFICATION] json_for_archive[constants.REEXPRESS_MODEL1_TOPIC_SUMMARY] = \ self.current_reexpression[constants.REEXPRESS_MODEL1_TOPIC_SUMMARY] json_for_archive[constants.REEXPRESS_ATTACHED_FILE_NAMES] = \ self.current_reexpression[constants.REEXPRESS_ATTACHED_FILE_NAMES] json_for_archive[constants.REEXPRESS_INFO_KEY] = self.current_reexpression[constants.REEXPRESS_SUBMITTED_TIME_KEY] utils_model.save_by_appending_json_lines(str(running_updates_file_path.as_posix()), [json_for_archive]) # Note: Currently there is no notion of database rollback if subsequent saving of the index fails (a la # standard sqlite operations with macOS Core Data). However, # in such failures, or when a change to an adaptation is needed, it is always possible to reset to the original # database in the repo and then batch add (after any needed changes to the JSON) the file DATA_UPDATE_FILE, # and then restart the server. See the documentation. print("entering add to support") self.model.add_to_support(label=label, predicted_label=prediction_meta_data["prediction"], document_id=document_id, exemplar_vector=exemplar_vector) message = "ERROR: Unable to save changes to the training set database. The cache has been cleared." utils_model.save_support_set_updates(self.model, self.MODEL_DIR) print("did save support") # add to document database add_to_support_db_message = "" try: success = self.support_db.add_document( document_id=document_id, model1_summary=self.current_reexpression[constants.REEXPRESS_MODEL1_TOPIC_SUMMARY], model1_explanation=self.current_reexpression[constants.REEXPRESS_MODEL1_EXPLANATION], model2_explanation=self.current_reexpression[constants.REEXPRESS_MODEL2_EXPLANATION], model3_explanation='', model4_explanation='', model1_classification_int=int(self.current_reexpression[constants.REEXPRESS_MODEL1_CLASSIFICATION]), model2_classification_int=int(self.current_reexpression[constants.REEXPRESS_MODEL2_CLASSIFICATION]), model3_classification_int=0, model4_classification_int=0, agreement_model_classification_int= int(self.current_reexpression[constants.REEXPRESS_AGREEMENT_MODEL_CLASSIFICATION]), label_int=label, label_was_updated_int=0, document_source="user_added", info=constants.REEXPRESS_MCP_SERVER_VERSION, user_question=json_for_archive[constants.REEXPRESS_QUESTION_KEY], ai_response=self.current_reexpression[constants.REEXPRESS_AI_RESPONSE_KEY] ) assert success except: add_to_support_db_message = f" (However, we were unable to update the support database, so the text will not be available for introspection via the HTML visualization. Before adding additional documents, check that the database file exists at {self.reexpress_mcp_server_support_documents_file}.)" self.current_reexpression = None if label == 0: string_label = f"{constants.MCP_SERVER_NOT_VERIFIED_CLASS_LABEL} (Class 0)" elif label == 1: string_label = f"{constants.MCP_SERVER_VERIFIED_CLASS_LABEL} (Class 1)" elif label == data_validator.oodLabel: string_label = f"{data_validator.getDefaultLabelName(label=label, abbreviated=False)}" else: raise AdaptationError(f"The provided label is not in [0, 1, {data_validator.oodLabel}].", "LABEL_ERROR") message = f"Successfully added document id {document_id} to the training set database with label: {string_label}. The training set database now contains {self.model.support_index.ntotal} labeled examples.{add_to_support_db_message}" return message except AdaptationError as e: self.current_reexpression = None # clear the cache to avoid inconsistencies return f"ERROR: Unable to save changes to the training set database. {e} The cache has been cleared." except: self.current_reexpression = None # clear the cache to avoid inconsistencies return message def get_reexpress_view(self) -> str: if self.current_reexpression is None: return "There is no existing reexpression in the cache." # also return file access information prediction_meta_data = self.current_reexpression["prediction_meta_data"] # also show files in consideration, if any files_in_consideration_message = \ mcp_utils_test.get_files_in_consideration_message(self.current_reexpression[constants.REEXPRESS_ATTACHED_FILE_NAMES]) formatted_output_string = f""" {constants.predictedFull}: {constants.MCP_SERVER_VERIFIED_CLASS_LABEL if prediction_meta_data["prediction"] == 1 else constants.MCP_SERVER_NOT_VERIFIED_CLASS_LABEL}\n Out-of-distribution: {prediction_meta_data["is_ood"]}\n {constants.qFull}: {int(prediction_meta_data["q"])}\n {constants.dFull} Quantile: {prediction_meta_data["d"]}\n {constants.fFull}: {prediction_meta_data["f"].detach().cpu().tolist()}\n {constants.CALIBRATION_HIGH_RELIABILITY_REGION_LABEL_FULL} (at alpha={prediction_meta_data["hr_class_conditional_accuracy"]}, q'_min={prediction_meta_data["min_rescaled_similarity_to_determine_high_reliability_region"]}, class-wise output thresholds={prediction_meta_data["hr_output_thresholds"]}): {prediction_meta_data["is_high_reliability_region"]}\n p(y | x): {prediction_meta_data["sdm_output"].detach().cpu().tolist()}\n q': {prediction_meta_data["rescaled_similarity"]}\n Effective sample size (by class): {prediction_meta_data["cumulative_effective_sample_sizes"].detach().cpu().tolist()}\n {files_in_consideration_message} ---------------\n {self.current_reexpression["formatted_output_string"]} """ return formatted_output_string

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ReexpressAI/reexpress_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server