server.js•25 kB
#!/usr/bin/env node
import express from 'express';
import { randomUUID } from "node:crypto";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import PocketBase from 'pocketbase';
import { z } from 'zod';
import { EventSource } from 'eventsource';
import dotenv from 'dotenv';
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js';
// Load environment variables from .env file
dotenv.config();
// Assign the polyfill to the global scope for PocketBase SDK to find
// @ts-ignore - Need to assign to global scope
global.EventSource = EventSource;
export class PocketBaseServer {
pb;
_customHeaders = {};
constructor() {
// Initialize PocketBase client
const url = process.env.POCKETBASE_URL;
if (!url) {
throw new Error('POCKETBASE_URL environment variable is required');
}
this.pb = new PocketBase(url);
}
// Create and configure the MCP server
createServer() {
const server = new McpServer({
name: 'pocketbase-server',
version: '0.1.0',
}, {
capabilities: {
resources: {},
tools: {},
prompts: {},
logging: {}
}
});
this.setupTools(server);
this.setupResources(server);
this.setupPrompts(server);
return server;
}
setupPrompts(server) {
// Collection creation prompt
server.prompt("create-collection", "Create a new collection with specified fields", async (extra) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `Create a new collection with specified fields`
}
}]
}));
// Record creation prompt
server.prompt("create-record", "Create a new record in a collection", async (extra) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `Create a new record in a collection`
}
}]
}));
// Query builder prompt
server.prompt("build-query", "Build a query for a collection with filters, sorting, and expansion", async (extra) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `Build a query for a collection with filters, sorting, and expansion`
}
}]
}));
}
setupResources(server) {
// Server info resource
server.resource("server-info", "pocketbase://info", async (uri) => {
try {
return {
contents: [{
uri: uri.href,
text: JSON.stringify({
url: this.pb.baseUrl,
isAuthenticated: this.pb.authStore?.isValid || false
}, null, 2)
}]
};
}
catch (error) {
throw new Error(`Failed to get server info: ${error.message}`);
}
});
// Collection list resource
server.resource("collections", "pocketbase://collections", async (uri) => {
try {
const collectionsResponse = await this.pb.collections.getList(1, 100);
const collections = {
page: collectionsResponse.page,
perPage: collectionsResponse.perPage,
totalItems: collectionsResponse.totalItems,
totalPages: collectionsResponse.totalPages,
items: collectionsResponse.items
};
return {
contents: [{
uri: uri.href,
text: JSON.stringify(collections.items.map(c => ({
id: c.id,
name: c.name,
type: c.type,
system: c.system,
listRule: c.listRule,
viewRule: c.viewRule,
createRule: c.createRule,
updateRule: c.updateRule,
deleteRule: c.deleteRule,
})), null, 2)
}]
};
}
catch (error) {
throw new Error(`Failed to list collections: ${error.message}`);
}
});
// Auth info resource
server.resource("auth-info", "pocketbase://auth", async (uri) => {
try {
return { contents: [{
uri: uri.href,
text: JSON.stringify({
isValid: this.pb.authStore.isValid,
token: this.pb.authStore.token,
record: this.pb.authStore.record
}, null, 2)
}]
};
}
catch (error) {
throw new Error(`Failed to get auth info: ${error.message}`);
}
});
}
setupTools(server) {
console.error('[MCP DEBUG] Setting up tools...');
// Server info tool
server.tool('get_server_info', {}, async () => {
try {
return {
content: [{
type: 'text',
text: JSON.stringify({
url: this.pb.baseUrl,
isAuthenticated: this.pb.authStore?.isValid || false,
version: '0.1.0'
}, null, 2)
}]
};
}
catch (error) {
return {
content: [{ type: 'text', text: `Failed to get server info: ${error.message}` }],
isError: true
};
}
});
// Auth info tool
server.tool('get_auth_info', {}, async () => {
try {
return {
content: [{
type: 'text',
text: JSON.stringify({ isValid: this.pb.authStore.isValid,
token: this.pb.authStore.token,
model: this.pb.authStore.record,
isAdmin: this.pb.authStore.record?.collectionName === '_superusers'
}, null, 2)
}]
};
}
catch (error) {
return {
content: [{ type: 'text', text: `Failed to get auth info: ${error.message}` }],
isError: true
};
}
});
// List collections tool
server.tool('list_collections', {
includeSystem: z.boolean().optional().default(false).describe('Whether to include system collections')
}, async ({ includeSystem }) => {
try {
const collections = await this.pb.collections.getList(1, 100);
const filteredCollections = includeSystem
? collections.items
: collections.items.filter((c) => !c.system);
return {
content: [{
type: 'text',
text: JSON.stringify(filteredCollections.map((c) => ({
id: c.id,
name: c.name,
type: c.type,
system: c.system,
recordCount: c.recordCount || 0
})), null, 2)
}]
};
}
catch (error) {
return {
content: [{ type: 'text', text: `Failed to list collections: ${error.message}` }],
isError: true
};
}
});
// Authentication tool
server.tool('authenticate_user', {
email: z.string().optional().describe('User email (required unless isAdmin=true and env vars are set)'),
password: z.string().optional().describe('User password (required unless isAdmin=true and env vars are set)'),
collection: z.string().optional().default('users').describe('Collection name'),
isAdmin: z.boolean().optional().default(false).describe('Whether to authenticate as an admin')
}, async ({ email, password, collection, isAdmin }) => {
try {
const authCollection = isAdmin ? '_superusers' : collection;
const authEmail = isAdmin && !email ? process.env.POCKETBASE_ADMIN_EMAIL : email;
const authPassword = isAdmin && !password ? process.env.POCKETBASE_ADMIN_PASSWORD : password;
if (!authEmail || !authPassword) {
return {
content: [{ type: 'text', text: 'Email and password are required for authentication' }],
isError: true
};
}
const authData = await this.pb
.collection(authCollection)
.authWithPassword(authEmail, authPassword);
return {
content: [{ type: 'text', text: JSON.stringify(authData, null, 2) }]
};
}
catch (error) {
return {
content: [{ type: 'text', text: `Authentication failed: ${error.message}` }],
isError: true
};
}
});
// Get collection schema tool with latest MCP features
server.tool('get_collection_schema', {
collection: z.string().describe('Collection name or ID')
}, async ({ collection }) => {
try {
console.error('[MCP DEBUG] get_collection_schema called for collection:', collection);
const collectionData = await this.pb.collections.getOne(collection);
console.error('[MCP DEBUG] Collection data retrieved:', JSON.stringify(collectionData, null, 2));
// In newer PocketBase versions, the schema is in the 'fields' property
const schema = collectionData.fields || collectionData.schema || [];
return {
content: [{
type: 'text',
text: JSON.stringify({
name: collection,
id: collectionData.id,
type: collectionData.type,
system: collectionData.system,
schema: schema,
listRule: collectionData.listRule,
viewRule: collectionData.viewRule,
createRule: collectionData.createRule,
updateRule: collectionData.updateRule,
deleteRule: collectionData.deleteRule,
indexes: collectionData.indexes || []
}, null, 2)
}]
};
}
catch (error) {
console.error('[MCP DEBUG] get_collection_schema error:', error);
return {
content: [{
type: 'text',
text: JSON.stringify({
name: collection,
error: "Failed to get collection schema: " + (error.message || "Unknown error")
}, null, 2)
}],
isError: true
};
}
});
// Real-time streaming tool using latest MCP features
server.tool('stream_collection_changes', {
collection: z.string().describe('Collection name to stream changes from'),
recordId: z.string().optional().describe('Specific record ID to watch (optional)'),
filter: z.string().optional().describe('Filter expression for subscription (optional)')
}, async ({ collection, recordId, filter }, { sendNotification }) => {
try {
const subscribePath = recordId ? `${collection}/${recordId}` : collection;
console.error(`[MCP PocketBase] Starting stream for ${subscribePath}...`);
// Use the new streaming capabilities
let eventCount = 0;
const maxEvents = 10; // Limit for demo purposes
// Simulate real-time events (in a real implementation, this would use PocketBase's real-time subscriptions)
const interval = setInterval(async () => {
if (eventCount >= maxEvents) {
clearInterval(interval);
return;
}
eventCount++;
try {
await sendNotification({
method: "notifications/message",
params: {
level: "info",
data: `Collection ${collection} change event #${eventCount} at ${new Date().toISOString()}`
}
});
}
catch (error) {
console.error("Error sending notification:", error);
clearInterval(interval);
}
}, 1000);
return {
content: [{
type: 'text',
text: `Started streaming changes for collection '${collection}'${recordId ? ` record '${recordId}'` : ''}. Events will be sent via notifications.`
}]
};
}
catch (error) {
console.error(`[MCP PocketBase] Stream failed for ${collection}:`, error);
return {
content: [{ type: 'text', text: `Failed to start stream: ${error.message}` }],
isError: true
};
}
});
// Add more essential tools
server.tool('create_record', {
collection: z.string().describe('Collection name'),
data: z.record(z.any()).describe('Record data')
}, async ({ collection, data }) => {
try {
const result = await this.pb.collection(collection).create(data);
return {
content: [{ type: 'text', text: JSON.stringify(result, null, 2) }]
};
}
catch (error) {
return {
content: [{ type: 'text', text: `Failed to create record: ${error.message}` }],
isError: true
};
}
});
server.tool('list_records', {
collection: z.string().describe('Collection name'),
filter: z.string().optional().describe('Filter query'),
sort: z.string().optional().describe('Sort field and direction'),
page: z.number().optional().describe('Page number'),
perPage: z.number().optional().describe('Items per page')
}, async ({ collection, filter, sort, page = 1, perPage = 50 }) => {
try {
const options = {};
if (filter)
options.filter = filter;
if (sort)
options.sort = sort;
const result = await this.pb.collection(collection).getList(page, perPage, options);
return {
content: [{ type: 'text', text: JSON.stringify(result, null, 2) }]
};
}
catch (error) {
return {
content: [{ type: 'text', text: `Failed to list records: ${error.message}` }],
isError: true
};
}
});
}
// Run as stdio server (for CLI usage)
async runStdio() {
console.error('[MCP DEBUG] Starting PocketBase MCP server in stdio mode...');
const server = this.createServer();
const transport = new StdioServerTransport();
try {
await server.connect(transport);
console.error('[MCP DEBUG] PocketBase MCP server running on stdio');
}
catch (error) {
console.error(`[MCP DEBUG] Error connecting server: ${error}`);
}
}
// Run as HTTP server with SSE support
async runHttp(port = 3000) {
console.error(`[MCP DEBUG] Starting PocketBase MCP server in HTTP mode on port ${port}...`);
const app = express();
app.use(express.json());
// Store transports by session ID
const transports = {};
//=============================================================================
// STREAMABLE HTTP TRANSPORT (PROTOCOL VERSION 2025-03-26)
//=============================================================================
app.all('/mcp', async (req, res) => {
console.log(`Received ${req.method} request to /mcp`);
try {
const sessionId = req.headers['mcp-session-id'];
let transport;
if (sessionId && transports[sessionId]) {
const existingTransport = transports[sessionId];
if (existingTransport instanceof StreamableHTTPServerTransport) {
transport = existingTransport;
}
else {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Session exists but uses a different transport protocol',
},
id: null,
});
return;
}
}
else if (!sessionId && req.method === 'POST' && isInitializeRequest(req.body)) {
const eventStore = new InMemoryEventStore();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore,
onsessioninitialized: (sessionId) => {
console.log(`StreamableHTTP session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});
transport.onclose = () => {
const sid = transport.sessionId;
if (sid && transports[sid]) {
console.log(`Transport closed for session ${sid}, removing from transports map`);
delete transports[sid];
}
};
const server = this.createServer();
await server.connect(transport);
}
else {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided',
},
id: null,
});
return;
}
if (transport) {
await transport.handleRequest(req, res, req.body);
}
}
catch (error) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: null,
});
}
}
});
//=============================================================================
// DEPRECATED HTTP+SSE TRANSPORT (PROTOCOL VERSION 2024-11-05)
//=============================================================================
app.get('/sse', async (req, res) => {
console.log('Received GET request to /sse (deprecated SSE transport)');
const transport = new SSEServerTransport('/messages', res);
transports[transport.sessionId] = transport;
res.on("close", () => {
delete transports[transport.sessionId];
});
const server = this.createServer();
await server.connect(transport);
});
app.post("/messages", async (req, res) => {
const sessionId = req.query.sessionId;
const existingTransport = transports[sessionId];
if (existingTransport instanceof SSEServerTransport) {
await existingTransport.handlePostMessage(req, res, req.body);
}
else {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Session exists but uses a different transport protocol',
},
id: null,
});
}
});
// Health check endpoint
app.get('/health', (req, res) => {
res.json({
status: 'healthy',
server: 'pocketbase-mcp-server',
version: '0.1.0',
pocketbaseUrl: this.pb.baseUrl,
isAuthenticated: this.pb.authStore?.isValid || false
});
});
// Start the server
app.listen(port, () => {
console.log(`PocketBase MCP server listening on port ${port}`);
console.log(`
==============================================
SUPPORTED TRANSPORT OPTIONS:
1. Streamable HTTP (Protocol version: 2025-03-26)
Endpoint: /mcp
Methods: GET, POST, DELETE
Usage:
- Initialize with POST to /mcp
- Establish SSE stream with GET to /mcp
- Send requests with POST to /mcp
- Terminate session with DELETE to /mcp
2. HTTP + SSE (Protocol version: 2024-11-05)
Endpoints: /sse (GET) and /messages (POST)
Usage:
- Establish SSE stream with GET to /sse
- Send requests with POST to /messages?sessionId=<id>
3. Health Check
Endpoint: /health (GET)
Returns server status and PocketBase connection info
==============================================
`);
});
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
}
catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});
}
}
// Determine how to run based on command line arguments
const args = process.argv.slice(2);
const server = new PocketBaseServer();
if (args.includes('--http') || args.includes('--sse')) {
const portArg = args.find(arg => arg.startsWith('--port='));
const port = portArg ? parseInt(portArg.split('=')[1]) : 3000;
server.runHttp(port).catch(console.error);
}
else {
// Default to stdio mode
server.runStdio().catch(console.error);
}