#!/usr/bin/env node
/**
* OBSBOT Camera Control MCP Server
* PTZ gimbal control, snapshots, and visual analysis integration
*
* Features:
* - Full gimbal control (pan, tilt, zoom)
* - Real-time position feedback
* - Camera snapshots
* - LM Studio integration for visual analysis
* - Area scanning patterns
* - Safety limits and error handling
*
* @license MIT
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ErrorCode,
ListToolsRequestSchema,
McpError,
} from '@modelcontextprotocol/sdk/types.js';
import { exec } from 'child_process';
import { promisify } from 'util';
import path from 'path';
import os from 'os';
const execAsync = promisify(exec);
class OBSBOTController {
constructor(config = {}) {
// Configuration with sensible defaults
this.videoDevice = config.videoDevice || process.env.OBSBOT_DEVICE || '/dev/video0';
this.outputDir = config.outputDir || process.env.OBSBOT_OUTPUT_DIR || '/tmp/obsbot_captures';
this.lmStudioUrl = config.lmStudioUrl || process.env.LM_STUDIO_URL || 'http://localhost:1234/v1/chat/completions';
this.defaultModel = config.model || process.env.OBSBOT_VL_MODEL || 'qwen2.5-vl-7b-instruct';
// v4l2 control limits for OBSBOT Tiny SE (adjust for other cameras)
this.limits = {
pan: { min: -468000, max: 468000, step: 3600 },
tilt: { min: -324000, max: 324000, step: 3600 },
zoom: { min: 0, max: 12, step: 1 }
};
console.error(`OBSBOT Controller initialized: device=${this.videoDevice}`);
}
async executeCommand(command) {
try {
const { stdout, stderr } = await execAsync(command);
return { success: true, stdout: stdout.trim(), stderr: stderr.trim() };
} catch (error) {
return { success: false, error: error.message, stderr: error.stderr };
}
}
async getGimbalPosition() {
const command = `v4l2-ctl -d ${this.videoDevice} --get-ctrl=pan_absolute,tilt_absolute,zoom_absolute`;
const result = await this.executeCommand(command);
if (!result.success) {
throw new Error(`Failed to get gimbal position: ${result.error}`);
}
const lines = result.stdout.split('\n');
const pan = parseInt(lines.find(line => line.includes('pan_absolute:'))?.split(':')[1]?.trim() || '0');
const tilt = parseInt(lines.find(line => line.includes('tilt_absolute:'))?.split(':')[1]?.trim() || '0');
const zoom = parseInt(lines.find(line => line.includes('zoom_absolute:'))?.split(':')[1]?.trim() || '0');
// Human-readable directions
let panDirection = 'CENTER';
let tiltDirection = 'CENTER';
if (pan < 0) panDirection = 'RIGHT';
else if (pan > 0) panDirection = 'LEFT';
if (tilt > 0) tiltDirection = 'UP';
else if (tilt < 0) tiltDirection = 'DOWN';
return {
pan,
tilt,
zoom,
panDirection,
tiltDirection,
panDegrees: Math.round(pan / 3600),
tiltDegrees: Math.round(tilt / 3600)
};
}
async controlGimbal(pan = null, tilt = null, zoom = null) {
const commands = [];
if (pan !== null) {
pan = Math.max(this.limits.pan.min, Math.min(this.limits.pan.max, pan));
pan = Math.round(pan / this.limits.pan.step) * this.limits.pan.step;
commands.push(`pan_absolute=${pan}`);
}
if (tilt !== null) {
tilt = Math.max(this.limits.tilt.min, Math.min(this.limits.tilt.max, tilt));
tilt = Math.round(tilt / this.limits.tilt.step) * this.limits.tilt.step;
commands.push(`tilt_absolute=${tilt}`);
}
if (zoom !== null) {
zoom = Math.max(this.limits.zoom.min, Math.min(this.limits.zoom.max, zoom));
commands.push(`zoom_absolute=${zoom}`);
}
if (commands.length === 0) {
return { success: true, message: 'No changes requested' };
}
const controlCommand = `v4l2-ctl -d ${this.videoDevice} --set-ctrl=${commands.join(',')}`;
const result = await this.executeCommand(controlCommand);
if (!result.success) {
throw new Error(`Gimbal control failed: ${result.error}`);
}
await new Promise(resolve => setTimeout(resolve, 1000));
const newPosition = await this.getGimbalPosition();
return {
success: true,
message: 'Gimbal movement successful',
position: newPosition
};
}
async centerCamera() {
return await this.controlGimbal(0, 0, 0);
}
async takeSnapshot(analyzeWithLM = false, customPrompt = null) {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-').split('T').join('_').split('.')[0] + 'Z';
const filename = `obsbot_snapshot_${timestamp}.jpg`;
const outputPath = path.join(this.outputDir, filename);
// Ensure output directory exists
await this.executeCommand(`mkdir -p ${this.outputDir}`);
// Capture using ffmpeg
const captureCommand = `ffmpeg -f v4l2 -input_format mjpeg -video_size 1920x1080 -i ${this.videoDevice} -frames:v 1 -update 1 -y ${outputPath} 2>/dev/null`;
const captureResult = await this.executeCommand(captureCommand);
if (!captureResult.success) {
throw new Error(`Capture failed: ${captureResult.error}`);
}
let result = {
success: true,
filename,
path: outputPath,
timestamp
};
if (analyzeWithLM) {
try {
const analysis = await this.analyzeImageWithLM(outputPath, customPrompt);
result.analysis = analysis;
} catch (error) {
result.analysisError = error.message;
}
}
return result;
}
async analyzeImageWithLM(imagePath, customPrompt = null) {
const prompt = customPrompt || 'Analyze this camera image. Describe what you see in detail, including any people, objects, lighting, and the overall scene.';
// Encode image to base64
const base64Command = `base64 -w 0 "${imagePath}"`;
const base64Result = await this.executeCommand(base64Command);
if (!base64Result.success) {
throw new Error(`Failed to encode image: ${base64Result.error}`);
}
const base64Image = base64Result.stdout.trim();
const payload = {
model: this.defaultModel,
messages: [{
role: 'user',
content: [
{ type: 'text', text: prompt },
{ type: 'image_url', image_url: { url: `data:image/jpeg;base64,${base64Image}` }}
]
}],
max_tokens: 500
};
const curlCommand = `curl -s -X POST "${this.lmStudioUrl}" -H "Content-Type: application/json" -d '${JSON.stringify(payload)}'`;
const apiResult = await this.executeCommand(curlCommand);
if (!apiResult.success) {
throw new Error(`LM Studio API call failed: ${apiResult.error}`);
}
const response = JSON.parse(apiResult.stdout);
return {
success: true,
analysis: response.choices[0].message.content,
model: this.defaultModel,
timestamp: new Date().toISOString()
};
}
async checkHealth() {
const status = {
timestamp: new Date().toISOString(),
camera: { available: false },
gimbal: { available: false },
lmStudio: { available: false }
};
try {
const position = await this.getGimbalPosition();
status.camera.available = true;
status.gimbal.available = true;
status.gimbal.currentPosition = position;
} catch (error) {
status.camera.error = error.message;
status.gimbal.error = error.message;
}
try {
const healthCommand = `curl -s -X GET ${this.lmStudioUrl.replace('/v1/chat/completions', '/v1/models')}`;
const result = await this.executeCommand(healthCommand);
if (result.success) {
const models = JSON.parse(result.stdout);
status.lmStudio.available = true;
status.lmStudio.models = models.data?.map(m => m.id) || [];
}
} catch (error) {
status.lmStudio.error = error.message;
}
return status;
}
}
class OBSBOTMCPServer {
constructor(config = {}) {
this.server = new Server(
{ name: 'obsbot-camera-control', version: '1.0.0' },
{ capabilities: { tools: {} } }
);
this.obsbot = new OBSBOTController(config);
this.setupHandlers();
}
setupHandlers() {
this.server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: 'get_gimbal_position',
description: 'Get current camera gimbal position (pan, tilt, zoom) with human-readable directions',
inputSchema: { type: 'object', properties: {} }
},
{
name: 'control_gimbal',
description: 'Control camera gimbal. Pan: NEGATIVE=RIGHT, POSITIVE=LEFT. Tilt: POSITIVE=UP, NEGATIVE=DOWN.',
inputSchema: {
type: 'object',
properties: {
pan: { type: 'integer', description: 'Pan position (-468000 to 468000)', minimum: -468000, maximum: 468000 },
tilt: { type: 'integer', description: 'Tilt position (-324000 to 324000)', minimum: -324000, maximum: 324000 },
zoom: { type: 'integer', description: 'Zoom level (0-12)', minimum: 0, maximum: 12 }
}
}
},
{
name: 'center_camera',
description: 'Return camera to center position (pan=0, tilt=0, zoom=0)',
inputSchema: { type: 'object', properties: {} }
},
{
name: 'take_snapshot',
description: 'Capture camera snapshot with optional LM Studio visual analysis',
inputSchema: {
type: 'object',
properties: {
analyzeWithLM: { type: 'boolean', description: 'Analyze with vision model', default: false },
customPrompt: { type: 'string', description: 'Custom analysis prompt' }
}
}
},
{
name: 'look_and_analyze',
description: 'Move camera to position and take analyzed snapshot',
inputSchema: {
type: 'object',
properties: {
pan: { type: 'integer', minimum: -468000, maximum: 468000 },
tilt: { type: 'integer', minimum: -324000, maximum: 324000 },
zoom: { type: 'integer', minimum: 0, maximum: 12 },
analysisPrompt: { type: 'string', default: 'Analyze this camera view' }
}
}
},
{
name: 'scan_area',
description: 'Systematically scan area with multiple snapshots',
inputSchema: {
type: 'object',
properties: {
pattern: { type: 'string', enum: ['horizontal', 'vertical', 'grid', 'panoramic'], default: 'horizontal' },
steps: { type: 'integer', minimum: 3, maximum: 9, default: 5 }
}
}
},
{
name: 'check_system_status',
description: 'Check camera system health and LM Studio availability',
inputSchema: { type: 'object', properties: {} }
}
]
};
});
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
const { name, arguments: args } = request.params;
switch (name) {
case 'get_gimbal_position':
return { content: [{ type: 'text', text: JSON.stringify(await this.obsbot.getGimbalPosition(), null, 2) }] };
case 'control_gimbal':
return { content: [{ type: 'text', text: JSON.stringify(await this.obsbot.controlGimbal(args.pan, args.tilt, args.zoom), null, 2) }] };
case 'center_camera':
return { content: [{ type: 'text', text: JSON.stringify(await this.obsbot.centerCamera(), null, 2) }] };
case 'take_snapshot':
return { content: [{ type: 'text', text: JSON.stringify(await this.obsbot.takeSnapshot(args.analyzeWithLM, args.customPrompt), null, 2) }] };
case 'look_and_analyze':
if (args.pan !== undefined || args.tilt !== undefined || args.zoom !== undefined) {
await this.obsbot.controlGimbal(args.pan, args.tilt, args.zoom);
await new Promise(resolve => setTimeout(resolve, 1500));
}
return { content: [{ type: 'text', text: JSON.stringify(await this.obsbot.takeSnapshot(true, args.analysisPrompt), null, 2) }] };
case 'scan_area':
return { content: [{ type: 'text', text: JSON.stringify(await this.performAreaScan(args.pattern, args.steps), null, 2) }] };
case 'check_system_status':
return { content: [{ type: 'text', text: JSON.stringify(await this.obsbot.checkHealth(), null, 2) }] };
default:
throw new McpError(ErrorCode.MethodNotFound, `Unknown tool: ${name}`);
}
} catch (error) {
throw new McpError(ErrorCode.InternalError, `Tool execution failed: ${error.message}`);
}
});
}
async performAreaScan(pattern = 'horizontal', steps = 5) {
const results = { pattern, steps, snapshots: [], startTime: new Date().toISOString() };
const startPosition = await this.obsbot.getGimbalPosition();
let positions = [];
switch (pattern) {
case 'horizontal':
for (let i = 0; i < steps; i++) {
positions.push({ pan: Math.round(-150000 + (300000 / (steps - 1)) * i), tilt: 0, zoom: 0 });
}
break;
case 'vertical':
for (let i = 0; i < steps; i++) {
positions.push({ pan: 0, tilt: Math.round(-100000 + (200000 / (steps - 1)) * i), zoom: 0 });
}
break;
case 'grid':
const gridSize = Math.ceil(Math.sqrt(steps));
for (let i = 0; i < steps; i++) {
const row = Math.floor(i / gridSize), col = i % gridSize;
positions.push({ pan: Math.round(-100000 + (200000 / (gridSize - 1)) * col), tilt: Math.round(-75000 + (150000 / (gridSize - 1)) * row), zoom: 0 });
}
break;
case 'panoramic':
for (let i = 0; i < steps; i++) {
positions.push({ pan: Math.round(((360 / steps) * i - 180) * 1300), tilt: 0, zoom: 0 });
}
break;
}
for (let i = 0; i < positions.length; i++) {
try {
await this.obsbot.controlGimbal(positions[i].pan, positions[i].tilt, positions[i].zoom);
await new Promise(resolve => setTimeout(resolve, 2000));
const snapshot = await this.obsbot.takeSnapshot(true);
results.snapshots.push({ position: i + 1, gimbalPosition: positions[i], snapshot, timestamp: new Date().toISOString() });
} catch (error) {
results.snapshots.push({ position: i + 1, error: error.message });
}
}
await this.obsbot.controlGimbal(startPosition.pan, startPosition.tilt, startPosition.zoom);
results.endTime = new Date().toISOString();
return results;
}
async run() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error('OBSBOT Camera Control MCP Server running');
}
}
const server = new OBSBOTMCPServer();
server.run().catch(console.error);