import asyncio
import json
import websockets
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import Server
import mcp.server.stdio
server = Server("live2d-mcp")
class Live2DClient:
def __init__(self, uri="ws://localhost:22033"):
self.uri = uri
self.websocket = None
self.token = None
self.request_id = 0
async def connect(self):
self.websocket = await websockets.connect(self.uri)
async def send(self, method, data={}):
if not self.websocket:
await self.connect()
self.request_id += 1
request = {
"Version": "1.0.0",
"Type": "Request",
"Method": method,
"RequestId": str(self.request_id),
"Data": data,
}
if self.token:
request["Token"] = self.token
await self.websocket.send(json.dumps(request))
response = await self.websocket.recv()
return json.loads(response)
async def register_plugin(self, name, token=None):
data = {"Name": name}
if token:
data["Token"] = token
response = await self.send("RegisterPlugin", data)
if response.get("Type") == "Response":
self.token = response["Data"]["Token"]
return self.token
else:
raise Exception(f"Failed to register plugin: {response}")
async def get_is_approval(self):
response = await self.send("GetIsApproval")
if response.get("Type") == "Response":
return response["Data"]["Result"]
else:
raise Exception(f"Failed to get approval status: {response}")
async def get_parameter_values(self, model_uid, ids=None):
data = {"ModelUID": model_uid}
if ids:
data["Ids"] = ids
response = await self.send("GetParameterValues", data)
if response.get("Type") == "Response":
return response["Data"]["Parameters"]
else:
raise Exception(f"Failed to get parameter values: {response}")
async def set_parameter_values(self, model_uid, parameters):
data = {"ModelUID": model_uid, "Parameters": parameters}
response = await self.send("SetParameterValues", data)
if response.get("Type") != "Response":
raise Exception(f"Failed to set parameter values: {response}")
async def get_parameters(self, model_uid=None, document_uid=None):
data = {}
if model_uid:
data["ModelUID"] = model_uid
if document_uid:
data["DocumentUID"] = document_uid
response = await self.send("GetParameters", data)
if response.get("Type") == "Response":
return response["Data"]["Parameters"]
else:
raise Exception(f"Failed to get parameters: {response}")
async def get_parameter_groups(self, model_uid=None, document_uid=None):
data = {}
if model_uid:
data["ModelUID"] = model_uid
if document_uid:
data["DocumentUID"] = document_uid
response = await self.send("GetParameterGroups", data)
if response.get("Type") == "Response":
return response["Data"]["Groups"]
else:
raise Exception(f"Failed to get parameter groups: {response}")
async def get_documents(self):
response = await self.send("GetDocuments")
if response.get("Type") == "Response":
return response["Data"]
else:
raise Exception(f"Failed to get documents: {response}")
async def get_current_document_uid(self):
response = await self.send("GetCurrentDocumentUID")
if response.get("Type") == "Response":
return response["Data"]["DocumentUID"]
else:
raise Exception(f"Failed to get current document UID: {response}")
async def get_current_model_uid(self):
response = await self.send("GetCurrentModelUID")
if response.get("Type") == "Response":
return response["Data"]["ModelUID"]
else:
raise Exception(f"Failed to get current model UID: {response}")
async def get_current_edit_mode(self):
response = await self.send("GetCurrentEditMode")
if response.get("Type") == "Response":
return response["Data"]["EditMode"]
else:
raise Exception(f"Failed to get current edit mode: {response}")
live2d_client = Live2DClient()
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools.
"""
return [
types.Tool(
name="register_plugin",
description="Register this MCP server as a plugin with the Live2D Editor.",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Name of the plugin"},
"token": {"type": "string", "description": "Optional token from a previous session"},
},
"required": ["name"],
},
),
types.Tool(
name="get_is_approval",
description="Check if the user has approved this plugin.",
inputSchema={"type": "object", "properties": {}},
),
types.Tool(
name="get_parameter_values",
description="Get the values of parameters for a model.",
inputSchema={
"type": "object",
"properties": {
"model_uid": {"type": "string"},
"ids": {"type": "array", "items": {"type": "string"}},
},
"required": ["model_uid"],
},
),
types.Tool(
name="set_parameter_values",
description="Set the values of parameters for a model.",
inputSchema={
"type": "object",
"properties": {
"model_uid": {"type": "string"},
"parameters": {
"type": "array",
"items": {
"type": "object",
"properties": {
"Id": {"type": "string"},
"Value": {"type": "number"},
},
"required": ["Id", "Value"],
},
},
},
"required": ["model_uid", "parameters"],
},
),
types.Tool(
name="get_parameters",
description="Get the list of parameters for a model.",
inputSchema={
"type": "object",
"properties": {
"model_uid": {"type": "string"},
"document_uid": {"type": "string"},
},
},
),
types.Tool(
name="get_parameter_groups",
description="Get the list of parameter groups for a model.",
inputSchema={
"type": "object",
"properties": {
"model_uid": {"type": "string"},
"document_uid": {"type": "string"},
},
},
),
types.Tool(
name="get_documents",
description="Get the list of open documents.",
inputSchema={"type": "object", "properties": {}},
),
types.Tool(
name="get_current_document_uid",
description="Get the UID of the current document.",
inputSchema={"type": "object", "properties": {}},
),
types.Tool(
name="get_current_model_uid",
description="Get the UID of the current model.",
inputSchema={"type": "object", "properties": {}},
),
types.Tool(
name="get_current_edit_mode",
description="Get the current edit mode.",
inputSchema={"type": "object", "properties": {}},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Handle tool execution requests.
"""
if not arguments:
raise ValueError("Missing arguments")
if name == "register_plugin":
plugin_name = arguments.get("name")
token = arguments.get("token")
if not plugin_name:
raise ValueError("Missing name")
new_token = await live2d_client.register_plugin(plugin_name, token)
return [types.TextContent(type="text", text=f"Registered plugin '{plugin_name}' with token: {new_token}")]
elif name == "get_is_approval":
is_approved = await live2d_client.get_is_approval()
return [types.TextContent(type="text", text=f"Plugin approved: {is_approved}")]
elif name == "get_parameter_values":
model_uid = arguments.get("model_uid")
ids = arguments.get("ids")
if not model_uid:
raise ValueError("Missing model_uid")
parameters = await live2d_client.get_parameter_values(model_uid, ids)
return [types.TextContent(type="text", text=json.dumps(parameters))]
elif name == "set_parameter_values":
model_uid = arguments.get("model_uid")
parameters = arguments.get("parameters")
if not model_uid or not parameters:
raise ValueError("Missing model_uid or parameters")
await live2d_client.set_parameter_values(model_uid, parameters)
return [types.TextContent(type="text", text="Parameters set successfully")]
elif name == "get_parameters":
model_uid = arguments.get("model_uid")
document_uid = arguments.get("document_uid")
parameters = await live2d_client.get_parameters(model_uid, document_uid)
return [types.TextContent(type="text", text=json.dumps(parameters))]
elif name == "get_parameter_groups":
model_uid = arguments.get("model_uid")
document_uid = arguments.get("document_uid")
groups = await live2d_client.get_parameter_groups(model_uid, document_uid)
return [types.TextContent(type="text", text=json.dumps(groups))]
elif name == "get_documents":
documents = await live2d_client.get_documents()
return [types.TextContent(type="text", text=json.dumps(documents))]
elif name == "get_current_document_uid":
document_uid = await live2d_client.get_current_document_uid()
return [types.TextContent(type="text", text=document_uid)]
elif name == "get_current_model_uid":
model_uid = await live2d_client.get_current_model_uid()
return [types.TextContent(type="text", text=model_uid)]
elif name == "get_current_edit_mode":
edit_mode = await live2d_client.get_current_edit_mode()
return [types.TextContent(type="text", text=edit_mode)]
else:
raise ValueError(f"Unknown tool: {name}")
async def main():
# Run the server using stdin/stdout streams
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="live2d-mcp",
server_version="0.1.0",
capabilities=server.get_capabilities(),
),
)