import base64
import os
from ctypes import sizeof
from fastmcp import FastMCP
import threading
import time
from desmume.emulator import DeSmuME, DeSmuME_Savestate, SCREEN_PIXEL_SIZE, SCREEN_WIDTH, SCREEN_HEIGHT
from desmume.controls import keymask, Keys
from PIL import Image
import mcp.types as types
from mcp.server.fastmcp import Image as ImageRes
import time
from enum import Enum
import queue
import matplotlib.pyplot as plt
FRAME_RATE = 120
GAME_URL = "/path/to/your/game.nds"
SCREENSHOT_DIR = "src/status"
SAVE_STATE_DIR = "src/save_state"
MEMORY_DIR = "src/memory"
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
os.makedirs(SAVE_STATE_DIR, exist_ok=True)
os.makedirs(MEMORY_DIR, exist_ok=True)
emu = DeSmuME()
COMMAND_QUEUE = queue.Queue()
mcp = FastMCP(name="PokemonMCP")
# Define key enum and mapping
class DSKey(str, Enum):
A = "A"
B = "B"
X = "X"
Y = "Y"
L = "L"
R = "R"
UP = "UP"
DOWN = "DOWN"
LEFT = "LEFT"
RIGHT = "RIGHT"
START = "START"
SELECT = "SELECT"
class MemoryType(str, Enum):
POKEMON_MEMORY = "POKEMON_MEMORY"
PLAYER_MEMORY = "PLAYER_MEMORY"
LOCATION_MEMORY = "LOCATION_MEMORY"
OTHER_MEMORY = "OTHER_MEMORY"
# Map enum values to DeSmuME Keys
KEY_MAPPING = {
DSKey.A: Keys.KEY_A,
DSKey.B: Keys.KEY_B,
DSKey.X: Keys.KEY_X,
DSKey.Y: Keys.KEY_Y,
DSKey.L: Keys.KEY_L,
DSKey.R: Keys.KEY_R,
DSKey.UP: Keys.KEY_UP,
DSKey.DOWN: Keys.KEY_DOWN,
DSKey.LEFT: Keys.KEY_LEFT,
DSKey.RIGHT: Keys.KEY_RIGHT,
DSKey.START: Keys.KEY_START,
DSKey.SELECT: Keys.KEY_SELECT,
}
def _do_screenshot(emu: DeSmuME):
"""Internal function that actually captures the screenshot."""
im = emu.screenshot()
TILE_SIZE_X = 16
TILE_SIZE_Y = 16
fig, ax = plt.subplots()
ax.imshow(im)
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_xticks(range(0, im.width + 1, TILE_SIZE_X))
ax.set_yticks(range(0, im.height + 1, TILE_SIZE_Y))
ax.tick_params(axis="x", labelrotation=90)
plt.savefig(f"{SCREENSHOT_DIR}/status2.png")
# im.save(f"{SCREENSHOT_DIR}/status2.png")
print("Status screenshot taken")
return "Screenshot taken"
def _save_state(emu: DeSmuME):
"""Internal function that actually saves the state of the game."""
state = DeSmuME_Savestate(emu)
# emu.savestate(state)
try:
state.save(1)
except Exception as e:
print(f"Error saving game state: {e}")
return "error"
print("Game state saved")
return "success"
def _load_state(emu: DeSmuME):
"""Internal function that actually loads the state of the game."""
state = DeSmuME_Savestate(emu)
try:
state.load(1)
except Exception as e:
print(f"Error loading game state: {e}")
return "error"
# emu.load_state(state)
print("Game state loaded")
return "success"
@mcp.tool
def take_screenshot_of_status() -> ImageRes:
"""Takes the current screenshot for the current status of the game."""
import io
COMMAND_QUEUE.put("status")
time.sleep(0.2)
im = Image.open(f"{SCREENSHOT_DIR}/status2.png")
buffer = io.BytesIO()
im.save(buffer, format="PNG")
base64_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
return types.ImageContent(
type="image", data=base64_image, mimeType="image/png"
)
@mcp.tool
def press_key(key: DSKey, n: int = 1):
"""Presses a key on the Nintendo DS, Always analyze the image to determine how many times a key should be pressed.
Args:
key: The key to press out of A, B, X, Y, L, R, UP, DOWN, LEFT, RIGHT, START, SELECT.
n: The number of times to press the key. Defaults to 1. Max 100 times.
Returns:
A message indicating the key was pressed n times.
"""
if n < 1:
return "Number of times to press the key must be at least 1."
if n > 100:
return "Number of times to press the key must be less than 100."
command = f"press_key_{key.value}"
for _ in range(n):
COMMAND_QUEUE.put(command)
return f"{key.value} key pressed {n} times"
@mcp.tool
def press_multiple_keys(keys: list[DSKey]) -> str:
"""Presses multiple keys on the Nintendo DS, Always analyze the image to determine how many times a key should be pressed.
Args:
keys: The list of keys to press out of A, B, X, Y, L, R, UP, DOWN, LEFT, RIGHT, START, SELECT. MAX 25 keys.
Returns:
A message indicating the keys were pressed.
"""
KEYS_MAX = 25
if len(keys) == 0:
return "No keys to press"
if len(keys) > KEYS_MAX:
return f"Too many keys to press. Max {KEYS_MAX} keys."
for key in keys:
COMMAND_QUEUE.put(f"press_key_{key.value}")
return f"{keys} keys pressed"
@mcp.tool
def save_state() -> str:
"""ONLY USE IT FOR SAVING THE GAME STATE. Saves the current state of the game."""
COMMAND_QUEUE.put("save_state")
time.sleep(0.2)
return "Game state saved"
@mcp.tool
def load_state() -> str:
"""ONLY USE IT FOR LOADING THE GAME STATE. Loads the saved state of the game."""
COMMAND_QUEUE.put("load_state")
time.sleep(0.2)
return "Game state loaded"
@mcp.tool
def write_to_memory(memory_type: MemoryType, memory: str, keywords: list[str] = None) -> str:
"""Writes the important information related to the game in a separate memory which can be used later to analyze the game.
Use this tool in a particular wayr, if you enter a town add "entered <town name>" to the memory.
if you leave a town add "left <town name>" to the memory.
if you catch a pokemon add "caught <pokemon name>" to the memory.
if you battle to a trainer add "battled <trainer name>" to the memory.
if you battle to a gym leader add "battled <gym leader name>" to the memory.
Args:
memory_type: The type of memory to write to. Out of POKEMON_MEMORY, PLAYER_MEMORY, LOCATION_MEMORY, OTHER_MEMORY.
memory: The memory to write to.
keywords: The keywords to add to the memory, leave it empty if you don't want to add any keywords.
Returns:
A message indicating the memory was written.
"""
import datetime
with open(f"{MEMORY_DIR}/{memory_type.value}.txt", "a") as f:
f.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {memory}, {', '.join(keywords) if keywords else ''}\n")
return "Memory written"
@mcp.tool
def read_from_memory(memory_type: MemoryType, keywords: list[str] = None) -> str:
"""Reads the important information related to the game from the memory which can be used later to analyze the game.
Args:
memory_type: The type of memory to read from. Out of POKEMON_MEMORY, PLAYER_MEMORY, LOCATION_MEMORY, OTHER_MEMORY.
keywords: The keywords to read from the memory, leave it empty if you need the whole memory.
Returns:
The memory read from the file.
"""
try:
with open(f"{MEMORY_DIR}/{memory_type.value}.txt", "r") as f:
text = f.read()
text = text.split("\n")
result = ""
for line in text:
if keywords:
if any(keyword in line for keyword in keywords):
result += line + "\n"
return result
except FileNotFoundError:
return "Memory not found"
except Exception as e:
print(f"Error reading from memory: {e}")
return "Error reading from memory"
@mcp.resource("resource://game_controls")
def list_controls() -> str:
import json
with open("src/resources/controls.json", "r") as f:
controls = json.load(f)
return json.dumps(controls)
emu.open(GAME_URL)
window = emu.create_sdl_window(use_opengl_if_possible=True)
def main_loop():
global COMMAND_QUEUE
result = _load_state(emu)
if result == "error":
print("No Saved State Found. Starting new game.")
return
time1 = time.time()
while not window.has_quit():
time2 = time.time()
if time2 - time1 > 60:
time1 = time2
_save_state(emu)
time.sleep(1/FRAME_RATE)
while not COMMAND_QUEUE.empty():
command = COMMAND_QUEUE.get_nowait()
if command == "status":
_do_screenshot(emu)
print("Status screenshot taken")
COMMAND_QUEUE.task_done()
elif command.startswith("press_key_"):
key_name = command.replace("press_key_", "")
try:
ds_key = DSKey(key_name)
desmume_key = KEY_MAPPING[ds_key]
emu.input.keypad_add_key(keymask(desmume_key))
for _ in range(10):
time.sleep(1/FRAME_RATE)
emu.cycle()
window.draw()
emu.input.keypad_rm_key(keymask(desmume_key))
print(f"{key_name} key pressed")
except (ValueError, KeyError):
print(f"Unknown key: {key_name}")
COMMAND_QUEUE.task_done()
elif command == "save_state":
_save_state(emu)
COMMAND_QUEUE.task_done()
elif command == "load_state":
_load_state(emu)
COMMAND_QUEUE.task_done()
else:
print(f"Unknown command: {command}")
window.process_input()
emu.cycle()
window.draw()
if __name__ == "__main__":
# For testing with test.py
# threading.Thread(target=lambda: mcp.run(transport="http", port=8000), daemon=True).start()
# For using with an agent
threading.Thread(target=lambda: mcp.run(transport="stdio"), daemon=True).start()
main_loop()