"""Oracle Fusion AR MCP Server - Main entry point."""
from typing import Any
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.types import Tool, TextContent
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.responses import JSONResponse
from starlette.requests import Request
import uvicorn
from . import __version__
from .config import settings
from .tools import get_invoice_details, list_invoices, search_invoices
from .oracle_client import oracle_client
# Create MCP server instance
mcp_server = Server("oracle-fusion-ar-mcp-server")
# ---------------- Tools (unchanged) ----------------
TOOLS = [
Tool(
name="oracle_ar_list_invoices",
description="Retrieve a list of Oracle Fusion AR invoices...",
inputSchema={
"type": "object",
"properties": {
"username": {"type": "string"},
"password": {"type": "string"},
"limit": {"type": "number", "default": 20, "minimum": 1, "maximum": 100},
"offset": {"type": "number", "default": 0, "minimum": 0},
"customer_name": {"type": "string"},
"invoice_number": {"type": "string"},
"date_from": {"type": "string", "pattern": "^\\d{4}-\\d{2}-\\d{2}$"},
"date_to": {"type": "string", "pattern": "^\\d{4}-\\d{2}-\\d{2}$"},
"status": {"type": "string", "enum": ["Open", "Closed", "Disputed", "Pending", "Cancelled"]},
},
"required": ["username", "password"],
},
),
Tool(
name="oracle_ar_get_invoice_details",
description="Retrieve detailed information about a specific Oracle Fusion AR invoice...",
inputSchema={
"type": "object",
"properties": {
"username": {"type": "string"},
"password": {"type": "string"},
"invoice_id": {"type": "string"},
},
"required": ["username", "password", "invoice_id"],
},
),
Tool(
name="oracle_ar_search_invoices",
description="Advanced search for Oracle Fusion AR invoices...",
inputSchema={
"type": "object",
"properties": {
"username": {"type": "string"},
"password": {"type": "string"},
"q": {"type": "string"},
"filters": {
"type": "object",
"properties": {
"amountGreaterThan": {"type": "number"},
"amountLessThan": {"type": "number"},
"overdueDays": {"type": "number", "minimum": 0},
"customerId": {"type": "string"},
},
},
"limit": {"type": "number", "default": 20, "minimum": 1, "maximum": 100},
"offset": {"type": "number", "default": 0, "minimum": 0},
},
"required": ["username", "password"],
},
),
]
@mcp_server.list_tools()
async def list_tools() -> list[Tool]:
return TOOLS
@mcp_server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
if name == "oracle_ar_list_invoices":
results = await list_invoices(arguments)
return [TextContent(type="text", text=item["text"]) for item in results]
if name == "oracle_ar_get_invoice_details":
results = await get_invoice_details(arguments)
return [TextContent(type="text", text=item["text"]) for item in results]
if name == "oracle_ar_search_invoices":
results = await search_invoices(arguments)
return [TextContent(type="text", text=item["text"]) for item in results]
raise ValueError(f"Unknown tool: {name}")
# ---------------- Correct SSE transport wiring ----------------
# IMPORTANT: trailing slash in /messages/
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request):
"""
Establish the SSE stream and run the MCP server over it.
This must use sse.connect_sse(...) and does not return a normal JSON response.
"""
async with sse.connect_sse(request.scope, request.receive, request._send) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
# ---------------- Other HTTP endpoints ----------------
async def health_check(request: Request):
is_healthy = await oracle_client.health_check()
return JSONResponse(
{"status": "healthy" if is_healthy else "degraded",
"service": "oracle-fusion-ar-mcp-server",
"version": __version__}
)
async def root(request: Request):
return JSONResponse({
"service": "Oracle Fusion AR MCP Server",
"version": __version__,
"endpoints": {
"sse": "/sse",
"messages": "/messages/",
"health": "/health",
"api": {
"list_invoices": "/api/list-invoices",
"get_invoice_details": "/api/invoice-details",
"search_invoices": "/api/search-invoices",
},
},
"tools": [{"name": t.name, "description": t.description} for t in TOOLS],
})
async def api_list_invoices(request: Request):
try:
body = await request.json()
results = await list_invoices(body)
return JSONResponse({"success": True, "data": [{"text": item["text"]} for item in results]})
except Exception as e:
return JSONResponse({"success": False, "error": str(e)}, status_code=400)
async def api_get_invoice_details(request: Request):
try:
body = await request.json()
results = await get_invoice_details(body)
return JSONResponse({"success": True, "data": [{"text": item["text"]} for item in results]})
except Exception as e:
return JSONResponse({"success": False, "error": str(e)}, status_code=400)
async def api_search_invoices(request: Request):
try:
body = await request.json()
results = await search_invoices(body)
return JSONResponse({"success": True, "data": [{"text": item["text"]} for item in results]})
except Exception as e:
return JSONResponse({"success": False, "error": str(e)}, status_code=400)
# Create Starlette app
app = Starlette(
routes=[
Route("/", root),
Route("/health", health_check),
# SSE endpoint
Route("/sse", handle_sse, methods=["GET"]),
# MCP POST messages endpoint MUST be mounted to the transport handler
Mount("/messages/", app=sse.handle_post_message),
# REST API endpoints
Route("/api/list-invoices", api_list_invoices, methods=["POST"]),
Route("/api/invoice-details", api_get_invoice_details, methods=["POST"]),
Route("/api/search-invoices", api_search_invoices, methods=["POST"]),
]
)
def main():
uvicorn.run(app, host="0.0.0.0", port=settings.port, log_level="info")
if __name__ == "__main__":
main()