SourceSync.ai MCP Server

  • src
#!/usr/bin/env node import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js' import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js' import wretch from 'wretch' // Import SourceSync client import { sourcesync } from './sourcesync.js' // Import schemas import { validateApiKeySchema, createNamespaceSchema, listNamespacesSchema, getNamespaceSchema, updateNamespaceSchema, deleteNamespaceSchema, ingestTextSchema, IngestFileSchema, IngestUrlsSchema, IngestSitemapSchema, IngestWebsiteSchema, IngestConnectorSchema, IngestJobRunStatusSchema, FetchDocumentsSchema, UpdateDocumentsSchema, DeleteDocumentsSchema, ResyncDocumentsSchema, SemanticSearchSchema, HybridSearchSchema, CreateConnectionSchema, ListConnectionsSchema, GetConnectionSchema, UpdateConnectionSchema, RevokeConnectionSchema, FetchUrlContentSchema, ValidateApiKeyParams, CreateNamespaceParams, ListNamespacesParams, GetNamespaceParams, UpdateNamespaceParams, DeleteNamespaceParams, IngestTextParams, FetchUrlContentParams, } from './schemas.js' // Import types import { SourceSyncIngestionSource, SourceSyncChunkConfig, SourceSyncConnector, SourceSyncSearchType, SourceSyncDocumentType, SourceSyncIngestionStatus, } from './sourcesync.types.js' // Initialize the MCP server const server = new McpServer({ name: 'SourceSyncAI', version: '1.0.0', }) /** * Helper function to create a SourceSync client with the provided parameters */ function createClient({ apiKey, namespaceId, tenantId, }: { apiKey?: string namespaceId?: string tenantId?: string }) { return sourcesync({ apiKey: apiKey || process.env.SOURCESYNC_API_KEY || '', namespaceId: namespaceId || process.env.SOURCESYNC_NAMESPACE_ID || '', tenantId: tenantId || process.env.SOURCESYNC_TENANT_ID || '', }) } /** * Helper function to safely handle API responses and errors * This ensures the response format matches what the MCP SDK expects * while preserving the original error messages from SourceSync */ async function safeApiCall<T>(apiCall: () => Promise<T>) { try { const result = await apiCall() return { content: [ { type: 'text' as const, text: JSON.stringify(result), }, ], } } catch (error: any) { // Preserve the original error structure from SourceSync return { content: [ { type: 'text' as const, text: JSON.stringify(error), }, ], isError: true, } } } // Register authentication tool server.tool( 'validateApiKey', 'Validates the API key by attempting to list namespaces. Returns the list of namespaces if successful.', validateApiKeySchema.shape, async (params: ValidateApiKeyParams) => { return safeApiCall(async () => { // Create a client with the provided API key const client = createClient({}) // Validate the API key by listing namespaces // @ts-ignore - Ignoring type error for now to focus on error handling return await client.listNamespaces() }) }, ) // Register namespace tools server.tool( 'createNamespace', 'Creates a new namespace with the provided configuration. Requires a name, file storage configuration, vector storage configuration, and embedding model configuration.', createNamespaceSchema.shape, async (params: CreateNamespaceParams) => { return safeApiCall(async () => { const { tenantId, ...createParams } = params // Create a client with the provided API key const client = createClient({ tenantId }) return await client.createNamespace(createParams) }) }, ) server.tool( 'listNamespaces', 'Lists all namespaces available for the current API key and optional tenant ID.', listNamespacesSchema.shape, async (params: ListNamespacesParams) => { return safeApiCall(async () => { const { tenantId } = params // Create a client with the provided API key const client = createClient({ tenantId }) return await client.listNamespaces() }) }, ) server.tool( 'getNamespace', 'Retrieves a specific namespace by its ID.', getNamespaceSchema.shape, async (params: GetNamespaceParams) => { return safeApiCall(async () => { const { namespaceId, tenantId } = params // Create a client with the provided API key const client = createClient({ namespaceId, tenantId }) return await client.getNamespace() }) }, ) server.tool( 'updateNamespace', 'Updates an existing namespace with the provided configuration parameters.', updateNamespaceSchema.shape, async (params: UpdateNamespaceParams) => { return safeApiCall(async () => { const { namespaceId, tenantId, ...updateParams } = params // Create a client with the provided API key const client = createClient({ namespaceId, tenantId }) return await client.updateNamespace(updateParams) }) }, ) server.tool( 'deleteNamespace', 'Permanently deletes a namespace by its ID.', deleteNamespaceSchema.shape, async (params: DeleteNamespaceParams) => { return safeApiCall(async () => { const { namespaceId, tenantId } = params // Create a client with the provided API key const client = createClient({ namespaceId, tenantId }) return await client.deleteNamespace() }) }, ) // Register ingestion tools server.tool( 'ingestText', 'Ingests raw text content into the namespace. Supports optional metadata and chunk configuration.', ingestTextSchema.shape, async (params: IngestTextParams) => { return safeApiCall(async () => { const { namespaceId, tenantId, ingestConfig } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Direct passthrough to the API return await client.ingestText({ ingestConfig, }) }) }, ) // Add ingestFile tool server.tool( 'ingestFile', 'Ingests a file into the namespace. Supports various file formats with automatic parsing.', IngestFileSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, tenantId, file, metadata, chunkConfig } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Direct passthrough to the API return await client.ingestFile({ file: file as unknown as File, // Type cast to File as required by the client metadata, chunkConfig, }) }) }, ) // Add ingestUrls tool server.tool( 'ingestUrls', 'Ingests content from a list of URLs. Supports scraping options and metadata.', IngestUrlsSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, tenantId, ingestConfig } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Direct passthrough to the API return await client.ingestUrls({ ingestConfig, }) }) }, ) // Add ingestSitemap tool server.tool( 'ingestSitemap', 'Ingests content from a website using its sitemap.xml. Supports path filtering and link limits.', IngestSitemapSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, ingestConfig, tenantId } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Direct passthrough to the API return await client.ingestSitemap({ ingestConfig, }) }) }, ) // Add ingestWebsite tool server.tool( 'ingestWebsite', 'Crawls and ingests content from a website recursively. Supports depth control and path filtering.', IngestWebsiteSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, ingestConfig, tenantId } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Direct passthrough to the API return await client.ingestWebsite({ ingestConfig, }) }) }, ) // Add ingestConnector tool server.tool( 'ingestConnector', 'Ingests all documents in the connector that are in backlog or failed status. No need to provide the document ids or file ids for the ingestion. Ids are already in the backlog when picked thorough the picker. If not, the user has to go through the authorization flow again, where they will be asked to pick the documents again.', IngestConnectorSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, tenantId, ingestConfig } = params // Create a client with the provided API key const client = createClient({ namespaceId, tenantId }) return await client.ingestConnector({ ingestConfig: { ...ingestConfig, source: ingestConfig.source as unknown as SourceSyncIngestionSource, }, }) }) }, ) // Add getIngestJobRunStatus tool server.tool( 'getIngestJobRunStatus', 'Checks the status of a previously submitted ingestion job.', IngestJobRunStatusSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, tenantId, ingestJobRunId } = params // Create a client with the provided API key const client = createClient({ namespaceId, tenantId }) return await client.getIngestJobRunStatus({ ingestJobRunId, }) }) }, ) // Add document management tools server.tool( 'fetchDocuments', 'Fetches documents from the namespace based on filter criteria. Supports pagination and including specific document properties.', FetchDocumentsSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, tenantId, documentIds, pagination, filterConfig, includeConfig, } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Add documentIds to filterConfig if provided if (documentIds && documentIds.length > 0 && !filterConfig.documentIds) { filterConfig.documentIds = documentIds } // Call the getDocuments method with properly structured parameters return await client.getDocuments({ filterConfig: { ...filterConfig, // Convert string enum values to their SourceSync enum equivalents documentTypes: filterConfig.documentTypes?.map( (type: string) => SourceSyncDocumentType[ type as keyof typeof SourceSyncDocumentType ], ), documentIngestionSources: filterConfig.documentIngestionSources?.map( (source: string) => SourceSyncIngestionSource[ source as keyof typeof SourceSyncIngestionSource ], ), documentIngestionStatuses: filterConfig.documentIngestionStatuses?.map( (status: string) => SourceSyncIngestionStatus[ status as keyof typeof SourceSyncIngestionStatus ], ), }, pagination, includeConfig: includeConfig || { documents: true }, }) }) }, ) server.tool( 'updateDocuments', 'Updates metadata for documents that match the specified filter criteria.', UpdateDocumentsSchema.shape, async (params: any) => { return safeApiCall(async () => { const { namespaceId, documents, tenantId, filterConfig, data } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Extract document IDs and add to filter if not already present if (documents && documents.length > 0 && !filterConfig.documentIds) { filterConfig.documentIds = documents.map((doc: any) => doc.documentId) } // Prepare metadata for update const metadata: Record<string, any> = {} if (documents) { documents.forEach((doc: any) => { if (doc.metadata) { // Combine all document metadata Object.assign(metadata, doc.metadata) } }) } // Prepare the data object if (Object.keys(metadata).length > 0) { data.metadata = metadata } // Call the updateDocuments method with properly structured parameters return await client.updateDocuments({ filterConfig: { ...filterConfig, // Convert string enum values to their SourceSync enum equivalents documentTypes: filterConfig.documentTypes?.map( (type: string) => SourceSyncDocumentType[ type as keyof typeof SourceSyncDocumentType ], ), documentIngestionSources: filterConfig.documentIngestionSources?.map( (source: string) => SourceSyncIngestionSource[ source as keyof typeof SourceSyncIngestionSource ], ), documentIngestionStatuses: filterConfig.documentIngestionStatuses?.map( (status: string) => SourceSyncIngestionStatus[ status as keyof typeof SourceSyncIngestionStatus ], ), }, data: { metadata: data?.metadata || {}, $metadata: data?.$metadata || { $set: {}, $append: {}, $remove: {}, }, }, }) }) }, ) server.tool( 'deleteDocuments', 'Permanently deletes documents that match the specified filter criteria.', DeleteDocumentsSchema.shape, async (params: any) => { return safeApiCall(async () => { const { namespaceId, documentIds, tenantId, filterConfig } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Add documentIds to filter if provided and not already in filter if (documentIds && documentIds.length > 0 && !filterConfig.documentIds) { filterConfig.documentIds = documentIds } // Call the deleteDocuments method with properly structured parameters return await client.deleteDocuments({ filterConfig: { ...filterConfig, // Convert string enum values to their SourceSync enum equivalents documentTypes: filterConfig.documentTypes?.map( (type: string) => SourceSyncDocumentType[ type as keyof typeof SourceSyncDocumentType ], ), documentIngestionSources: filterConfig.documentIngestionSources?.map( (source: string) => SourceSyncIngestionSource[ source as keyof typeof SourceSyncIngestionSource ], ), documentIngestionStatuses: filterConfig.documentIngestionStatuses?.map( (status: string) => SourceSyncIngestionStatus[ status as keyof typeof SourceSyncIngestionStatus ], ), }, }) }) }, ) server.tool( 'resyncDocuments', 'Reprocesses documents that match the specified filter criteria. Useful for updating after schema changes.', ResyncDocumentsSchema.shape, async (params: any) => { return safeApiCall(async () => { const { namespaceId, documentIds, tenantId, filterConfig } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Add documentIds to filter if provided and not already in filter if (documentIds && documentIds.length > 0 && !filterConfig.documentIds) { filterConfig.documentIds = documentIds } // Call the resyncDocuments method with properly structured parameters return await client.resyncDocuments({ filterConfig: { ...filterConfig, // Convert string enum values to their SourceSync enum equivalents documentTypes: filterConfig.documentTypes?.map( (type: string) => SourceSyncDocumentType[ type as keyof typeof SourceSyncDocumentType ], ), documentIngestionSources: filterConfig.documentIngestionSources?.map( (source: string) => SourceSyncIngestionSource[ source as keyof typeof SourceSyncIngestionSource ], ), documentIngestionStatuses: filterConfig.documentIngestionStatuses?.map( (status: string) => SourceSyncIngestionStatus[ status as keyof typeof SourceSyncIngestionStatus ], ), }, }) }) }, ) // Add search tools server.tool( 'semanticSearch', 'Performs semantic search across the namespace to find relevant content based on meaning rather than exact keyword matches.', SemanticSearchSchema.shape, async (params: any) => { return safeApiCall(async () => { const { namespaceId, query, topK, scoreThreshold, filter, tenantId, searchType, } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Call the semanticSearch method with the searchType (default to SEMANTIC if not provided) return await client.semanticSearch({ query, topK, scoreThreshold, filter, searchType: searchType || SourceSyncSearchType.SEMANTIC, }) }) }, ) server.tool( 'hybridSearch', 'Performs a combined keyword and semantic search, balancing between exact matches and semantic similarity. Requires hybridConfig with weights for both search types.', HybridSearchSchema.shape, async (params: any) => { return safeApiCall(async () => { const { namespaceId, query, topK, scoreThreshold, filter, hybridConfig, tenantId, searchType, } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Call the hybridSearch method with the searchType (default to HYBRID if not provided) return await client.hybridSearch({ query, topK, scoreThreshold, filter, hybridConfig, searchType: searchType || SourceSyncSearchType.HYBRID, }) }) }, ) // Register connection tools server.tool( 'createConnection', 'Creates a new connection to a specific source. The connector parameter should be a valid SourceSync connector enum value. The clientRedirectUrl parameter is optional and can be used to specify a custom redirect URL for the connection. This will give you a authorization url which you can redirect the user to. The user will then be asked to pick the documents they want to ingest.', CreateConnectionSchema.shape, async (params: any) => { return safeApiCall(async () => { const { namespaceId, name, connector, clientRedirectUrl, tenantId } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Call the createConnection method with the connector as enum return await client.createConnection({ name, connector, clientRedirectUrl, }) }) }, ) server.tool( 'listConnections', 'Lists all connections for the current namespace, optionally filtered by connector type.', ListConnectionsSchema.shape, async (params: any) => { return safeApiCall(async () => { const { namespaceId, connector, tenantId } = params // Create a client with the provided parameters const client = createClient({ namespaceId, tenantId }) // Call the listConnections method with the connector as enum if provided return await client.listConnections({ connector: connector || undefined, }) }) }, ) server.tool( 'getConnection', 'Retrieves details for a specific connection by its ID.', GetConnectionSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, tenantId, connectionId } = params // Create a client with the provided API key const client = createClient({ namespaceId, tenantId }) return await client.getConnection({ connectionId, }) }) }, ) server.tool( 'updateConnection', 'Updates a connection to a specific source. The connector parameter should be a valid SourceSync connector enum value. The clientRedirectUrl parameter is optional and can be used to specify a custom redirect URL for the connection. This will give you a authorization url which you can redirect the user to. The user will then be asked to pick the documents they want to ingest. This is useful if you want to update the connection to a different source or if you want to update the clientRedirectUrl or if you want to pick a different or new set of documents.', UpdateConnectionSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, tenantId, connectionId, name, clientRedirectUrl } = params // Create a client with the provided API key const client = createClient({ namespaceId, tenantId }) return await client.updateConnection({ connectionId, name, clientRedirectUrl, }) }) }, ) server.tool( 'revokeConnection', 'Revokes access for a specific connection, removing the integration with the external service.', RevokeConnectionSchema.shape, async (params) => { return safeApiCall(async () => { const { namespaceId, tenantId, connectionId } = params // Create a client with the provided API key const client = createClient({ namespaceId, tenantId }) return await client.revokeConnection({ connectionId, }) }) }, ) // Add a tool to fetch content from a URL, particularly useful for parsed text file URLs server.tool( 'fetchUrlContent', 'Fetches the content of a URL. Particularly useful for fetching parsed text file URLs.', FetchUrlContentSchema.shape, async (params: FetchUrlContentParams) => { return safeApiCall(async () => { const { url, apiKey, tenantId } = params try { // Create a wretch client with authentication if provided let client = wretch(url) if (apiKey) { client = client.auth(`Bearer ${apiKey}`) } if (tenantId) { client = client.headers({ 'X-Tenant-ID': tenantId, }) } // Fetch the content from the URL const content = await client.get().text() return { content } } catch (error: any) { throw new Error(`Error fetching URL content: ${error.message}`) } }) }, ) async function main() { console.error('Starting SourceSync.ai MCP Server...') const transport = new StdioServerTransport() await server.connect(transport) console.error('SourceSync.ai MCP Server running on stdio') } main().catch((error) => { console.error('Fatal error in main():', error) process.exit(1) })