IACR MCP Server
- src
- digitalfate
- client
- level_two
import copy
import time
import cloudpickle
cloudpickle.DEFAULT_PROTOCOL = 2
import dill
import base64
import httpx
import hashlib
from typing import Any, List, Dict, Optional, Type, Union
from pydantic import BaseModel
from ..tasks.tasks import Task
from ..printing import agent_end, agent_total_cost, agent_retry
from ..tasks.task_response import ObjectResponse
from ..agent_configuration.agent_configuration import AgentConfiguration
from ..level_utilized.utility import context_serializer
from ..level_utilized.utility import context_serializer, response_format_serializer, tools_serializer, response_format_deserializer, error_handler
from ...storage.caching import save_to_cache_with_expiry, get_from_cache_with_expiry
class SubTask(ObjectResponse):
description: str
sources_can_be_used: List[str]
required_output: str
tools: List[str]
class SubTaskList(ObjectResponse):
sub_tasks: List[SubTask]
class SearchResult(ObjectResponse):
any_customers: bool
products: List[str]
services: List[str]
potential_competitors: List[str]
class CompanyObjective(ObjectResponse):
objective: str
goals: List[str]
state: str
class HumanObjective(ObjectResponse):
job_title: str
job_description: str
job_goals: List[str]
class Characterization(ObjectResponse):
website_content: Union[SearchResult, None]
company_objective: Union[CompanyObjective, None]
human_objective: Union[HumanObjective, None]
name_of_the_human_of_tasks: str = None
contact_of_the_human_of_tasks: str = None
class OtherTask(ObjectResponse):
task: str
result: Any
class Agent:
def agent_(
self,
agent_configuration: AgentConfiguration,
task: Task,
llm_model: str = None,
) -> Any:
start_time = time.time()
results = []
try:
if isinstance(task, list):
for each in task:
the_result = self.send_agent_request(agent_configuration, each, llm_model)
the_result["time"] = time.time() - start_time
results.append(the_result)
agent_end(the_result["result"], the_result["llm_model"], the_result["response_format"], start_time, time.time(), the_result["usage"], the_result["tool_count"], the_result["context_count"], self.debug)
else:
the_result = self.send_agent_request(agent_configuration, task, llm_model)
the_result["time"] = time.time() - start_time
results.append(the_result)
agent_end(the_result["result"], the_result["llm_model"], the_result["response_format"], start_time, time.time(), the_result["usage"], the_result["tool_count"], the_result["context_count"], self.debug)
except Exception as e:
try:
from ...server import stop_dev_server, stop_main_server, is_tools_server_running, is_main_server_running
if is_tools_server_running() or is_main_server_running():
stop_dev_server()
except Exception as e:
pass
raise e
end_time = time.time()
return results
def send_agent_request(
self,
agent_configuration: AgentConfiguration,
task: Task,
llm_model: str = None,
) -> Any:
from ..trace import sentry_sdk
from ..level_utilized.utility import CallErrorException
"""
Call GPT-4 with optional tools and MCP servers.
Args:
prompt: The input prompt for GPT-4
response_format: The expected response format (can be a type or Pydantic model)
tools: Optional list of tool names to use
Returns:
The response in the specified format
"""
if llm_model is None:
llm_model = self.default_llm_model
retry_count = 0
last_error = None
while retry_count <= agent_configuration.retries:
try:
if retry_count > 0:
agent_retry(retry_count, agent_configuration.retries)
tools = tools_serializer(task.tools)
response_format = task.response_format
with sentry_sdk.start_transaction(op="task", name="Agent.send_agent_request") as transaction:
with sentry_sdk.start_span(op="serialize"):
# Serialize the response format if it's a type or BaseModel
response_format_str = response_format_serializer(task.response_format)
context = context_serializer(task.context, self)
with sentry_sdk.start_span(op="prepare_request"):
# Prepare the request data
data = {
"agent_id": agent_configuration.agent_id,
"prompt": task.description,
"response_format": response_format_str,
"tools": tools or [],
"context": context,
"llm_model": llm_model,
"system_prompt": None,
"retries": agent_configuration.retries,
"context_compress": agent_configuration.context_compress,
"memory": agent_configuration.memory
}
with sentry_sdk.start_span(op="send_request"):
result = self.send_request("/level_two/agent", data)
result = result["result"]
error_handler(result)
with sentry_sdk.start_span(op="deserialize"):
deserialized_result = response_format_deserializer(response_format_str, result)
task._response = deserialized_result["result"]
response_format_req = None
if response_format_str == "str":
response_format_req = response_format_str
else:
# Class name
response_format_req = response_format.__name__
if context is None:
context = []
len_of_context = len(task.context) if task.context is not None else 0
return {"result": deserialized_result["result"], "llm_model": llm_model, "response_format": response_format_req, "usage": deserialized_result["usage"], "tool_count": len(tools), "context_count": len_of_context}
except CallErrorException as e:
last_error = e
retry_count += 1
if retry_count > agent_configuration.retries:
raise last_error
continue
def create_characterization(self, agent_configuration: AgentConfiguration, llm_model: str = None):
tools = ["google", "read_website"]
search_task = Task(description=f"Make a search for {agent_configuration.company_url}", tools=tools, response_format=SearchResult)
self.call(search_task, llm_model=llm_model)
company_objective_task = Task(description=f"Generate the company objective for {agent_configuration.company_url}", tools=tools, response_format=CompanyObjective, context=search_task)
self.call(company_objective_task, llm_model=llm_model)
human_objective_task = Task(description=f"Generate the human objective for {agent_configuration.job_title}", tools=tools, response_format=HumanObjective, context=[search_task, company_objective_task])
self.call(human_objective_task, llm_model=llm_model)
total_character = Characterization(website_content=search_task.response, company_objective=company_objective_task.response, human_objective=human_objective_task.response, name_of_the_human_of_tasks=agent_configuration.name, contact_of_the_human_of_tasks=agent_configuration.contact)
return total_character
def agent(self, agent_configuration: AgentConfiguration, task: Task, llm_model: str = None):
original_task = task
copy_agent_configuration = copy.deepcopy(agent_configuration)
copy_agent_configuration_json = copy_agent_configuration.model_dump_json(include={"job_title", "company_url", "company_objective", "name", "contact"})
the_characterization_cache_key = f"characterization_{hashlib.sha256(copy_agent_configuration_json.encode()).hexdigest()}"
if agent_configuration.caching:
the_characterization = get_from_cache_with_expiry(the_characterization_cache_key)
if the_characterization is None:
the_characterization = self.create_characterization(agent_configuration, llm_model)
save_to_cache_with_expiry(the_characterization, the_characterization_cache_key, agent_configuration.cache_expiry)
else:
the_characterization = self.create_characterization(agent_configuration, llm_model)
knowledge_base = None
if agent_configuration.knowledge_base:
knowledge_base = self.knowledge_base(agent_configuration, llm_model)
the_task = task
is_it_sub_task = False
shared_context = []
if agent_configuration.sub_task:
sub_tasks = self.multiple(task, llm_model)
is_it_sub_task = True
the_task = sub_tasks
if not isinstance(the_task, list):
the_task = [the_task]
for each in the_task:
if not isinstance(each.context, list):
each.context = [each.context]
last_task = []
for each in the_task:
if isinstance(each.context, list):
last_task.append(each)
the_task = last_task
for each in the_task:
each.context.append(the_characterization)
# Add knowledge base to the context for each task
if knowledge_base:
if isinstance(the_task, list):
for each in the_task:
if each.context:
each.context.append(knowledge_base)
else:
each.context = [knowledge_base]
if task.context:
for each in the_task:
each.context.append(task.context)
if agent_configuration.tools:
if isinstance(the_task, list):
for each in the_task:
each.tools = agent_configuration.tools
results = []
if isinstance(the_task, list):
for each in the_task:
if is_it_sub_task:
if shared_context:
each.context += shared_context
result = self.agent_(agent_configuration, each, llm_model=llm_model)
results += result
if is_it_sub_task:
shared_context.append(OtherTask(task=each.description, result=each.response))
original_task._response = the_task[-1].response
total_time = 0
for each in results:
total_time += each["time"]
total_input_tokens = 0
total_output_tokens = 0
for each in results:
total_input_tokens += each["usage"]["input_tokens"]
total_output_tokens += each["usage"]["output_tokens"]
the_llm_model = llm_model
if the_llm_model is None:
the_llm_model = self.default_llm_model
agent_total_cost(total_input_tokens, total_output_tokens, total_time, the_llm_model)
def multiple(self, task: Task, llm_model: str = None):
# Generate a list of sub tasks
prompt = "You are a helpful assistant. User have an general task. You need to generate a list of sub tasks. Each sub task should be a Actionable step of main task. You need to return a list of sub tasks. You should say to agent to make this job not making plan again and again. We need actions. If If you have tools that can help you for the task specify them in the task. If there is an context its the user want to see so create tasks to fill them all. Create rich tasks for every user requested field. Only do user requested things. Dont make any assumptions."
sub_tasker = Task(description=prompt, response_format=SubTaskList, context=[task, task.response_format], tools=task.tools)
self.call(sub_tasker, llm_model)
sub_tasks = []
for each in sub_tasker.response.sub_tasks:
new_task = Task(description=each.description+ " " + each.required_output + " " + str(each.sources_can_be_used) + " " + str(each.tools))
new_task.tools = task.tools
sub_tasks.append(new_task)
end_task = Task(description=task.description, response_format=task.response_format)
sub_tasks.append(end_task)
return sub_tasks
def multi_agent(self, agent_configurations: List[AgentConfiguration], tasks: Any, llm_model: str = None):
agent_tasks = []
the_agents = {}
for each in agent_configurations:
agent_key = each.agent_id[:5] + "_" + each.job_title
the_agents[agent_key] = each
the_agents_keys = list(the_agents.keys())
class TheAgents_(ObjectResponse):
agents: List[str]
the_agents_ = TheAgents_(agents=the_agents_keys)
class SelectedAgent(ObjectResponse):
selected_agent: str
if isinstance(tasks, list) != True:
tasks = [tasks]
for each in tasks:
is_end = False
while is_end == False:
selecting_task = Task(description="Select an agent for this task", response_format=SelectedAgent, context=[the_agents_, each])
self.call(selecting_task, llm_model)
if selecting_task.response.selected_agent in the_agents:
is_end = True
agent_tasks.append({
"agent": the_agents[selecting_task.response.selected_agent],
"task": each
})
for each in agent_tasks:
self.agent(each["agent"], each["task"], llm_model)
return the_agents