privateGPT MCP Server
by Fujitsu-AI
import json
from pathlib import Path
from starlette.responses import StreamingResponse
from fastapi import FastAPI, Request, HTTPException
from threading import local
from agents.OpenAI_Compatible_API_Agent.Python.open_ai_helper import ChatInstance, \
ChatCompletionRequest, CompletionRequest, _resp_sync, _resp_async_generator, models, Message, _resp_async_generator_completions, _resp_sync_completions
from .pgpt_api import PrivateGPTAPI
from ...AgentInterface.Python.config import Config, ConfigError
import uvicorn
app = FastAPI(title="OpenAI-compatible API for PrivateGPT")
request_context = local()
instances = []
# Konfiguration laden
try:
config_file = Path.absolute(Path(__file__).parent.parent / "pgpt_openai_api_proxy.json")
config = Config(config_file=config_file, required_fields=["base_url"])
default_groups = config.get("groups", [])
except ConfigError as e:
print(f"Configuration Error: {e}")
exit(1)
@app.middleware("http")
async def store_request_headers(request: Request, call_next):
request_context.headers = dict(request.headers)
response = await call_next(request)
return response
@app.post("/chat/completions")
async def chat_completions(request: ChatCompletionRequest):
headers = getattr(request_context, "headers", {})
client_api_key = str(headers['authorization']).split(" ")[1]
groups = default_groups
force_new_session = False
if request.groups:
groups = request.groups
if request.newSession:
force_new_session = True
print("Groups: " + str(groups))
if request.messages:
#Check if this api-key already has a running instance
indices = [i for i, x in enumerate(instances) if
x.api_key == client_api_key]
index = -1
if len(indices) > 0:
index = indices[0]
if index > -1:
# if we already have an instance, just reuse it. No need to open new connection
if instances[index].agent.chosen_groups != groups:
print("⚠️ New Groups requested, switching to new Chat..")
config.set_value("groups", groups)
instances[index].agent.chat_id = None
elif force_new_session:
print("⚠️ New Session Requested, switching to new Chat..")
config.set_value("groups", groups)
instances[index].agent.chat_id = None
pgpt = instances[index].agent
else:
#otherwise connect via api-key
config.set_value("groups", groups)
pgpt = PrivateGPTAPI(config, client_api_key=client_api_key)
# remember that we already have an instance for the api key
instance = ChatInstance(client_api_key, pgpt)
instances.append(instance)
if pgpt.logged_in:
response = pgpt.respond_with_context(request.messages, request.response_format, request.tools)
if response is not None:
if "answer" not in response:
response["answer"] = "No Response received"
if response is None or ("answer" in response and response["answer"] == "error"):
pgpt.login()
else:
response = {
"chatId": "0",
"answer": "API Key not valid",
}
else:
response = {
"chatId": "0",
"answer": "No Input given",
}
if request.stream:
return StreamingResponse(
_resp_async_generator(response, request), media_type="application/x-ndjson"
)
else:
return _resp_sync(response, request)
# legacy completions API
@app.post("/completions")
async def completions(request: CompletionRequest):
headers = getattr(request_context, "headers", {})
client_api_key = str(headers['authorization']).split(" ")[1]
groups = default_groups
if request.groups:
groups = request.groups
print("Groups: " + str(groups))
if request.prompt:
#otherwise connect via api-key
config.set_value("groups", groups)
pgpt = PrivateGPTAPI(config, client_api_key=client_api_key)
# remember that we already have an instance for the api key
if pgpt.logged_in:
response = pgpt.respond_with_context([Message(role="user", content=request.prompt)], request.response_format, request.tools)
if "answer" not in response:
response["answer"] = "No Response received"
if "answer" in response and response["answer"] == "error":
if pgpt.login():
pgpt.create_chat()
else:
response = {
"chatId": "0",
"answer": "API Key not valid",
}
else:
response = {
"chatId": "0",
"answer": "No Input given",
}
if request.stream :
return StreamingResponse(
_resp_async_generator_completions(response, request), media_type="application/x-ndjson"
)
else:
return _resp_sync_completions(response, request)
@app.get("/models")
def return_models():
return {
"object": "list",
"data": models
}
@app.get('/models/{model_id}')
async def get_model(model_id: str):
filtered_entries = list(filter(lambda item: item["id"] == model_id, models))
entry = filtered_entries[0] if filtered_entries else None
print(entry)
if entry is None:
raise HTTPException(status_code=404, detail="Model not found")
return entry
if __name__ == "__main__":
api_ip = config.get("api_ip", "0.0.0.0")
api_port = config.get("api_port", 8002)
uvicorn.run(app, host=api_ip, port=int(api_port))