MCP Terminal Server

// Copyright 2024 Google LLC // SPDX-License-Identifier: Apache-2.0 // Package genkit provides Genkit functionality for application developers. package genkit import ( "context" "fmt" "log/slog" "net/http" "os" "os/signal" "strings" "sync" "syscall" "github.com/firebase/genkit/go/ai" "github.com/firebase/genkit/go/internal/registry" "github.com/invopop/jsonschema" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) // Genkit encapsulates a Genkit instance including the registry and configuration. type Genkit struct { // The registry for this instance. reg *registry.Registry // Options to configure the instance. Opts *Options } type Options struct { // The default model to use if no model is specified. DefaultModel string // Directory where dotprompts are stored. PromptDir string } // StartOptions are options to [Start]. type StartOptions struct { // If "-", do not start a FlowServer. // Otherwise, start a FlowServer on the given address, or the // default of ":3400" if empty. FlowAddr string // The names of flows to serve. // If empty, all registered flows are served. Flows []string } // New creates a new Genkit instance. func New(opts *Options) (*Genkit, error) { r, err := registry.New() if err != nil { return nil, err } if opts == nil { opts = &Options{} } if opts.DefaultModel != "" { parts := strings.Split(opts.DefaultModel, "/") if len(parts) != 2 { return nil, fmt.Errorf("invalid default model format %q, expected provider/name", opts.DefaultModel) } } return &Genkit{ reg: r, Opts: opts, }, nil } // Start initializes Genkit. // After it is called, no further actions can be defined. // // Start starts servers depending on the value of the GENKIT_ENV // environment variable and the provided options. // // If GENKIT_ENV = "dev", a development server is started // in a separate goroutine at the address in opts.DevAddr, or the default // of ":3100" if empty. // // If opts.FlowAddr is a value other than "-", a flow server is started // and the call to Start waits for the server to shut down. // If opts.FlowAddr == "-", no flow server is started and Start returns immediately. // // Thus Start(nil) will start a dev server in the "dev" environment, will always start // a flow server, and will pause execution until the flow server terminates. func (g *Genkit) Start(ctx context.Context, opts *StartOptions) error { ai.DefineGenerateAction(ctx, g.reg) if opts == nil { opts = &StartOptions{} } g.reg.Freeze() var mu sync.Mutex var servers []*http.Server var wg sync.WaitGroup errCh := make(chan error, 2) if registry.CurrentEnvironment() == registry.EnvironmentDev { wg.Add(1) go func() { defer wg.Done() s := startReflectionServer(ctx, g.reg, errCh) mu.Lock() servers = append(servers, s) mu.Unlock() }() } if opts.FlowAddr != "-" { wg.Add(1) go func() { defer wg.Done() s := startFlowServer(g, opts.FlowAddr, opts.Flows, errCh) mu.Lock() servers = append(servers, s) mu.Unlock() }() } serverStartCh := make(chan struct{}) go func() { wg.Wait() close(serverStartCh) }() // It will block here until either all servers start up or there is an error in starting one. select { case <-serverStartCh: slog.Info("all servers started successfully") case err := <-errCh: return fmt.Errorf("failed to start servers: %w", err) case <-ctx.Done(): return ctx.Err() } sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) // It will block here (i.e. servers will run) until we get an interrupt signal. select { case sig := <-sigCh: slog.Info("received signal, initiating shutdown", "signal", sig) case err := <-errCh: slog.Error("server error", "err", err) return err case <-ctx.Done(): slog.Info("context cancelled, initiating shutdown") } return shutdownServers(servers) } // DefineModel registers the given generate function as an action, and returns a // [Model] that runs it. func DefineModel( g *Genkit, provider, name string, metadata *ai.ModelInfo, generate func(context.Context, *ai.ModelRequest, ai.ModelStreamingCallback) (*ai.ModelResponse, error), ) ai.Model { return ai.DefineModel(g.reg, provider, name, metadata, generate) } // IsDefinedModel reports whether a model is defined. func IsDefinedModel(g *Genkit, provider, name string) bool { return ai.IsDefinedModel(g.reg, provider, name) } // LookupModel looks up a [Model] registered by [DefineModel]. // It returns nil if the model was not defined. func LookupModel(g *Genkit, provider, name string) ai.Model { return ai.LookupModel(g.reg, provider, name) } // DefineTool defines a tool to be passed to a model generate call. func DefineTool[In, Out any](g *Genkit, name, description string, fn func(ctx *ai.ToolContext, input In) (Out, error)) *ai.ToolDef[In, Out] { return ai.DefineTool(g.reg, name, description, fn) } // LookupTool looks up the tool in the registry by provided name and returns it. func LookupTool(g *Genkit, name string) ai.Tool { return ai.LookupTool(g.reg, name) } // DefinePrompt takes a function that renders a prompt template // into a [GenerateRequest] that may be passed to a [Model]. // The prompt expects some input described by inputSchema. // DefinePrompt registers the function as an action, // and returns a [Prompt] that runs it. func DefinePrompt( g *Genkit, provider, name string, metadata map[string]any, inputSchema *jsonschema.Schema, render func(context.Context, any) (*ai.ModelRequest, error), ) *ai.Prompt { return ai.DefinePrompt(g.reg, provider, name, metadata, inputSchema, render) } // IsDefinedPrompt reports whether a [Prompt] is defined. func IsDefinedPrompt(g *Genkit, provider, name string) bool { return ai.IsDefinedPrompt(g.reg, provider, name) } // LookupPrompt looks up a [Prompt] registered by [DefinePrompt]. // It returns nil if the prompt was not defined. func LookupPrompt(g *Genkit, provider, name string) *ai.Prompt { return ai.LookupPrompt(g.reg, provider, name) } // Generate run generate request for this model. Returns ModelResponse struct. func Generate(ctx context.Context, g *Genkit, opts ...ai.GenerateOption) (*ai.ModelResponse, error) { opts, err := optsWithDefaults(g, opts) if err != nil { return nil, err } return ai.Generate(ctx, g.reg, opts...) } // GenerateText run generate request for this model. Returns generated text only. func GenerateText(ctx context.Context, g *Genkit, opts ...ai.GenerateOption) (string, error) { opts, err := optsWithDefaults(g, opts) if err != nil { return "", err } return ai.GenerateText(ctx, g.reg, opts...) } // GenerateData run generate request for this model. Returns ModelResponse struct and fills value with structured output. func GenerateData(ctx context.Context, g *Genkit, value any, opts ...ai.GenerateOption) (*ai.ModelResponse, error) { opts, err := optsWithDefaults(g, opts) if err != nil { return nil, err } return ai.GenerateData(ctx, g.reg, value, opts...) } // GenerateWithRequest runs the model with the given request and streaming callback. func GenerateWithRequest(ctx context.Context, g *Genkit, m ai.Model, req *ai.ModelRequest, toolCfg *ai.ToolConfig, cb ai.ModelStreamingCallback) (*ai.ModelResponse, error) { return m.Generate(ctx, g.reg, req, toolCfg, cb) } // DefineIndexer registers the given index function as an action, and returns an // [Indexer] that runs it. func DefineIndexer(g *Genkit, provider, name string, index func(context.Context, *ai.IndexerRequest) error) ai.Indexer { return ai.DefineIndexer(g.reg, provider, name, index) } // IsDefinedIndexer reports whether an [Indexer] is defined. func IsDefinedIndexer(g *Genkit, provider, name string) bool { return ai.IsDefinedIndexer(g.reg, provider, name) } // LookupIndexer looks up an [Indexer] registered by [DefineIndexer]. // It returns nil if the model was not defined. func LookupIndexer(g *Genkit, provider, name string) ai.Indexer { return ai.LookupIndexer(g.reg, provider, name) } // DefineRetriever registers the given retrieve function as an action, and returns a // [Retriever] that runs it. func DefineRetriever(g *Genkit, provider, name string, ret func(context.Context, *ai.RetrieverRequest) (*ai.RetrieverResponse, error)) ai.Retriever { return ai.DefineRetriever(g.reg, provider, name, ret) } // IsDefinedRetriever reports whether a [Retriever] is defined. func IsDefinedRetriever(g *Genkit, provider, name string) bool { return ai.IsDefinedRetriever(g.reg, provider, name) } // LookupRetriever looks up a [Retriever] registered by [DefineRetriever]. // It returns nil if the model was not defined. func LookupRetriever(g *Genkit, provider, name string) ai.Retriever { return ai.LookupRetriever(g.reg, provider, name) } // DefineEmbedder registers the given embed function as an action, and returns an // [Embedder] that runs it. func DefineEmbedder(g *Genkit, provider, name string, embed func(context.Context, *ai.EmbedRequest) (*ai.EmbedResponse, error)) ai.Embedder { return ai.DefineEmbedder(g.reg, provider, name, embed) } // IsDefinedEmbedder reports whether an embedder is defined. func IsDefinedEmbedder(g *Genkit, provider, name string) bool { return ai.IsDefinedEmbedder(g.reg, provider, name) } // LookupEmbedder looks up an [Embedder] registered by [DefineEmbedder]. // It returns nil if the embedder was not defined. func LookupEmbedder(g *Genkit, provider, name string) ai.Embedder { return ai.LookupEmbedder(g.reg, provider, name) } // RegisterSpanProcessor registers an OpenTelemetry SpanProcessor for tracing. func RegisterSpanProcessor(g *Genkit, sp sdktrace.SpanProcessor) { g.reg.RegisterSpanProcessor(sp) } // optsWithDefaults prepends defaults to the options so that they can be overridden by the caller. func optsWithDefaults(g *Genkit, opts []ai.GenerateOption) ([]ai.GenerateOption, error) { if g.Opts.DefaultModel != "" { parts := strings.Split(g.Opts.DefaultModel, "/") if len(parts) != 2 { return nil, fmt.Errorf("invalid default model format %q, expected provider/name", g.Opts.DefaultModel) } model := LookupModel(g, parts[0], parts[1]) if model == nil { return nil, fmt.Errorf("default model %q not found", g.Opts.DefaultModel) } opts = append([]ai.GenerateOption{ai.WithModel(model)}, opts...) } return opts, nil }