IACR MCP Server
- src
- digitalfate
- server
- level_utilized
import inspect
import traceback
import types
from itertools import chain
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIModel, OpenAIAgentModel
from pydantic_ai.models.anthropic import AnthropicModel
from openai import AsyncOpenAI, NOT_GIVEN
from openai import AsyncAzureOpenAI
from pydantic import BaseModel
from pydantic_ai.result import ResultData
from fastapi import HTTPException, status
from functools import wraps
from typing import Any, Callable, Optional
from pydantic_ai import RunContext, Tool
from anthropic import AsyncAnthropicBedrock
from dataclasses import dataclass
from openai.types.chat import ChatCompletion, ChatCompletionChunk
from openai.types import chat
from collections.abc import AsyncIterator
from typing import Literal
from openai import AsyncStream
from ...storage.configuration import Configuration
from ...tools_server.function_client import FunctionToolManager
class CustomOpenAIAgentModel(OpenAIAgentModel):
async def _completions_create(
self, messages: list[Any], stream: bool, model_settings: Any | None
) -> ChatCompletion | AsyncStream[ChatCompletionChunk]:
if not self.tools:
tool_choice: Literal['none', 'required', 'auto'] | None = None
elif not self.allow_text_result:
tool_choice = 'required'
else:
tool_choice = 'auto'
openai_messages = list(chain(*(self._map_message(m) for m in messages)))
model_settings = model_settings or {}
return await self.client.chat.completions.create(
model=self.model_name,
messages=openai_messages,
n=1,
parallel_tool_calls=False if self.tools else NOT_GIVEN, # Force parallel_tool_calls to False
tools=self.tools or NOT_GIVEN,
tool_choice=tool_choice or NOT_GIVEN,
stream=stream,
stream_options={'include_usage': True} if stream else NOT_GIVEN,
max_tokens=model_settings.get('max_tokens', NOT_GIVEN),
temperature=model_settings.get('temperature', NOT_GIVEN),
top_p=model_settings.get('top_p', NOT_GIVEN),
timeout=model_settings.get('timeout', NOT_GIVEN),
)
class CustomOpenAIModel(OpenAIModel):
async def agent_model(
self,
*,
function_tools: list[Any],
allow_text_result: bool,
result_tools: list[Any],
) -> Any:
tools = [self._map_tool_definition(r) for r in function_tools]
if result_tools:
tools += [self._map_tool_definition(r) for r in result_tools]
return CustomOpenAIAgentModel(
self.client,
self.model_name,
allow_text_result,
tools,
)
def tool_wrapper(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Log the tool call
tool_name = getattr(func, "__name__", str(func))
try:
# Call the original function
result = func(*args, **kwargs)
return result
except Exception as e:
print("Tool call failed:", e)
return {"status_code": 500, "detail": f"Tool call failed: {e}"}
return wrapper
def summarize_text(text: str, llm_model: Any, chunk_size: int = 100000, max_size: int = 300000) -> str:
"""Base function to summarize any text by splitting into chunks and summarizing each."""
# Return early if text is None or empty
if text is None:
return ""
if not isinstance(text, str):
try:
text = str(text)
except:
return ""
if not text:
return ""
# If text is already under max_size, return it
if len(text) <= max_size:
return text
# Adjust chunk size based on model
if "gpt" in str(llm_model).lower():
# OpenAI has a 1M character limit, we'll use a much smaller chunk size to be safe
chunk_size = min(chunk_size, 100000) # 100K per chunk for OpenAI
elif "claude" in str(llm_model).lower():
chunk_size = min(chunk_size, 200000) # 200K per chunk for Claude
try:
print(f"Original text length: {len(text)}")
# If text is extremely long, do an initial aggressive truncation
if len(text) > 2000000: # If over 2M characters
text = text[:2000000] # Take first 2M characters
print("Text was extremely long, truncated to 2M characters")
chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
print(f"Number of chunks: {len(chunks)}")
model = agent_creator(response_format=str, tools=[], context=None, llm_model=llm_model, system_prompt=None)
if isinstance(model, dict) and "status_code" in model:
print(f"Error creating model: {model}")
return text[:max_size]
# Process chunks in smaller batches if there are too many
batch_size = 5
summarized_chunks = []
for batch_start in range(0, len(chunks), batch_size):
batch_end = min(batch_start + batch_size, len(chunks))
batch = chunks[batch_start:batch_end]
for i, chunk in enumerate(batch):
chunk_num = batch_start + i + 1
try:
print(f"Processing chunk {chunk_num}/{len(chunks)}, length: {len(chunk)}")
# Create a more focused prompt for better summarization
prompt = (
"Please provide an extremely concise summary of the following text. "
"Focus only on the most important points and key information. "
"Be as brief as possible while retaining critical meaning:\n\n"
)
message = [{"type": "text", "text": prompt + chunk}]
result = model.run_sync(message)
if result and hasattr(result, 'data') and result.data:
# Ensure the summary isn't too long
summary = result.data[:max_size//len(chunks)]
summarized_chunks.append(summary)
else:
print(f"Warning: Empty or invalid result for chunk {chunk_num}")
# Include a shorter truncated version as fallback
summarized_chunks.append(chunk[:500] + "...")
except Exception as e:
print(f"Error summarizing chunk {chunk_num}: {str(e)}")
# Include a shorter truncated version as fallback
summarized_chunks.append(chunk[:500] + "...")
# Combine all summarized chunks
combined_summary = "\n\n".join(summarized_chunks)
# If still too long, recursively summarize with smaller chunks
if len(combined_summary) > max_size:
print(f"Combined summary still too long ({len(combined_summary)} chars), recursively summarizing...")
return summarize_text(
combined_summary,
llm_model,
chunk_size=max(5000, chunk_size//4), # Reduce chunk size more aggressively
max_size=max_size
)
print(f"Final summary length: {len(combined_summary)}")
return combined_summary
except Exception as e:
traceback.print_exc()
print(f"Error in summarize_text: {str(e)}")
# If all else fails, return a truncated version
return text[:max_size]
def summarize_message_prompt(message_prompt: str, llm_model: Any) -> str:
"""Summarizes the message prompt to reduce its length while preserving key information."""
print("\n\n\n****************Summarizing message prompt****************\n\n\n")
if message_prompt is None:
return ""
try:
# Use a smaller max size for message prompts
max_size = 50000 # 100K for messages
summarized_message_prompt = summarize_text(message_prompt, llm_model, max_size=max_size)
if summarized_message_prompt is None:
return ""
print(f"Summarized message prompt length: {len(summarized_message_prompt)}")
return summarized_message_prompt
except Exception as e:
print(f"Error in summarize_message_prompt: {str(e)}")
try:
return str(message_prompt)[:50000] if message_prompt else ""
except:
return ""
def summarize_system_prompt(system_prompt: str, llm_model: Any) -> str:
"""Summarizes the system prompt to reduce its length while preserving key information."""
print("\n\n\n****************Summarizing system prompt****************\n\n\n")
if system_prompt is None:
return ""
try:
# Use a smaller max size for system prompts
max_size = 50000 # 100K for system prompts
summarized_system_prompt = summarize_text(system_prompt, llm_model, max_size=max_size)
if summarized_system_prompt is None:
return ""
print(f"Summarized system prompt length: {len(summarized_system_prompt)}")
return summarized_system_prompt
except Exception as e:
print(f"Error in summarize_system_prompt: {str(e)}")
try:
return str(system_prompt)[:50000] if system_prompt else ""
except:
return ""
def summarize_context_string(context_string: str, llm_model: Any) -> str:
"""Summarizes the context string to reduce its length while preserving key information."""
print("\n\n\n****************Summarizing context string****************\n\n\n")
if context_string is None or context_string == "":
return ""
try:
# Use a smaller max size for context strings
max_size = 50000 # 50K for context strings
summarized_context = summarize_text(context_string, llm_model, max_size=max_size)
if summarized_context is None:
return ""
print(f"Summarized context string length: {len(summarized_context)}")
return summarized_context
except Exception as e:
print(f"Error in summarize_context_string: {str(e)}")
try:
return str(context_string)[:50000] if context_string else ""
except:
return ""
def agent_creator(
response_format: BaseModel = str,
tools: list[str] = [],
context: Any = None,
llm_model: str = "openai/gpt-4o",
system_prompt: Optional[Any] = None,
context_compress: bool = False
) -> ResultData:
if llm_model == "openai/gpt-4o" or llm_model == "gpt-4o":
openai_api_key = Configuration.get("OPENAI_API_KEY")
if not openai_api_key:
return {"status_code": 401, "detail": "No API key provided. Please set OPENAI_API_KEY in your configuration."}
client = AsyncOpenAI(
api_key=openai_api_key, # This is the default and can be omitted
)
model = CustomOpenAIModel('gpt-4o', openai_client=client)
if llm_model == "deepseek/deepseek-chat":
deepseek_api_key = Configuration.get("DEEPSEEK_API_KEY")
if not deepseek_api_key:
return {"status_code": 401, "detail": "No API key provided. Please set DEEPSEEK_API_KEY in your configuration."}
model = OpenAIModel(
'deepseek-chat',
base_url='https://api.deepseek.com',
api_key=deepseek_api_key,
)
elif llm_model == "claude/claude-3-5-sonnet" or llm_model == "claude-3-5-sonnet":
anthropic_api_key = Configuration.get("ANTHROPIC_API_KEY")
if not anthropic_api_key:
return {"status_code": 401, "detail": "No API key provided. Please set ANTHROPIC_API_KEY in your configuration."}
model = AnthropicModel("claude-3-5-sonnet-latest", api_key=anthropic_api_key)
elif llm_model == "bedrock/claude-3-5-sonnet" or llm_model == "claude-3-5-sonnet-aws":
aws_access_key_id = Configuration.get("AWS_ACCESS_KEY_ID")
aws_secret_access_key = Configuration.get("AWS_SECRET_ACCESS_KEY")
aws_region = Configuration.get("AWS_REGION")
if not aws_access_key_id or not aws_secret_access_key or not aws_region:
return {"status_code": 401, "detail": "No AWS credentials provided. Please set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_REGION in your configuration."}
model = AsyncAnthropicBedrock(
aws_access_key=aws_access_key_id,
aws_secret_key=aws_secret_access_key,
aws_region=aws_region
)
model = AnthropicModel("us.anthropic.claude-3-5-sonnet-20241022-v2:0", anthropic_client=model)
elif llm_model == "azure/gpt-4o" or llm_model == "gpt-4o-azure":
azure_endpoint = Configuration.get("AZURE_OPENAI_ENDPOINT")
azure_api_version = Configuration.get("AZURE_OPENAI_API_VERSION")
azure_api_key = Configuration.get("AZURE_OPENAI_API_KEY")
missing_keys = []
if not azure_endpoint:
missing_keys.append("AZURE_OPENAI_ENDPOINT")
if not azure_api_version:
missing_keys.append("AZURE_OPENAI_API_VERSION")
if not azure_api_key:
missing_keys.append("AZURE_OPENAI_API_KEY")
if missing_keys:
return {
"status_code": 401,
"detail": f"No API key provided. Please set {', '.join(missing_keys)} in your configuration."
}
model = AsyncAzureOpenAI(api_version=azure_api_version, azure_endpoint=azure_endpoint, api_key=azure_api_key)
model = CustomOpenAIModel('gpt-4o', openai_client=model)
else:
return {"status_code": 400, "detail": f"Unsupported LLM model: {llm_model}"}
context_string = ""
if context is not None:
if not isinstance(context, list):
context = [context]
if isinstance(context, list):
for each in context:
from ...client.level_two.agent import Characterization
from ...client.level_two.agent import OtherTask
from ...client.tasks.tasks import Task
from ...client.knowledge_base.knowledge_base import KnowledgeBase
type_string = type(each).__name__
if type_string == Characterization.__name__:
context_string += f"\n\nThis is your character ```character {each.model_dump()}```"
elif type_string == OtherTask.__name__:
context_string += f"\n\nContexts from old tasks: ```old_task {each.task} {each.result}```"
elif type_string == Task.__name__:
response = None
description = each.description
try:
response = each.response.dict()
except:
try:
response = each.response.model_dump()
except:
response = each.response
context_string += f"\n\nContexts from old tasks: ```old_task {description} {response}``` "
else:
context_string += f"\n\nContexts ```context {each}```"
# Compress context string if enabled
if context_compress and context_string:
context_string = summarize_context_string(context_string, llm_model)
system_prompt_ = ()
if system_prompt is not None:
system_prompt_ = system_prompt + f"The context is: {context_string}"
elif context_string != "":
system_prompt_ = f"You are a helpful assistant. User want to add an old task context to the task. The context is: {context_string}"
roulette_agent = Agent(
model,
result_type=response_format,
retries=5,
system_prompt=system_prompt_
)
the_wrapped_tools = []
with FunctionToolManager() as function_client:
the_list_of_tools = function_client.get_tools_by_name(tools)
for each in the_list_of_tools:
# Wrap the tool with our wrapper
wrapped_tool = tool_wrapper(each)
the_wrapped_tools.append(wrapped_tool)
for each in the_wrapped_tools:
# İnspect signature of the tool
signature = inspect.signature(each)
roulette_agent.tool_plain(each, retries=5)
# Computer use
if "claude/claude-3-5-sonnet" in llm_model:
print("Tools", tools)
if "ComputerUse.*" in tools:
try:
from .cu import ComputerUse_tools
for each in ComputerUse_tools:
roulette_agent.tool_plain(each, retries=5)
except Exception as e:
print("Error", e)
return roulette_agent