Skip to main content
Glama

Storyden

by Southclaws
Mozilla Public License 2.0
227
bus.go10.4 kB
package pubsub import ( "context" "errors" "fmt" "log/slog" "os" "reflect" "sync" "time" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fctx" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v3/pkg/amqp" "github.com/ThreeDotsLabs/watermill/components/cqrs" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "go.uber.org/fx" "github.com/Southclaws/storyden/internal/config" ) type Bus struct { logger *slog.Logger cfg config.Config pub message.Publisher sub message.Subscriber router *message.Router eventBus *cqrs.EventBus commandBus *cqrs.CommandBus eventProcessor *cqrs.EventProcessor commandProcessor *cqrs.CommandProcessor mu sync.RWMutex subscriptions map[subscriptionKey]*Subscription } type subscriptionKey string func newBus( lc fx.Lifecycle, l *slog.Logger, ctx context.Context, cfg config.Config, pub message.Publisher, sub message.Subscriber, ) (*Bus, error) { logger := watermill.NewSlogLogger(l.With("component", "watermill")) router, err := message.NewRouter(message.RouterConfig{ CloseTimeout: time.Second * 30, }, logger) if err != nil { return nil, fault.Wrap(err) } router.AddMiddleware(middleware.Recoverer) router.AddMiddleware(newSessionContextMiddleware(l)) poisonQueue, err := middleware.PoisonQueue(pub, "poison_queue") if err != nil { return nil, fault.Wrap(err) } router.AddMiddleware(poisonQueue) retryMiddleware := middleware.Retry{ MaxRetries: cfg.QueueMaxRetries, InitialInterval: cfg.QueueRetryInitialInterval, MaxInterval: cfg.QueueRetryMaxInterval, Multiplier: 2.0, OnRetryHook: func(retryNum int, delay time.Duration) { log := fmt.Sprintf("a message consumer returned an error: retrying %d/%d after %s/%s", retryNum, cfg.QueueMaxRetries, delay, cfg.QueueRetryMaxInterval, ) l.Error(log, slog.Int("retry_num", retryNum), slog.String("delay", delay.String()), ) }, } router.AddMiddleware(retryMiddleware.Middleware) marshaler := cqrs.JSONMarshaler{ GenerateName: func(v interface{}) string { return topicFromValue(v) }, } // Wrap publisher with session context middleware contextPub := publisherContextMiddleware(pub) eventBus, err := cqrs.NewEventBusWithConfig(contextPub, cqrs.EventBusConfig{ GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { return params.EventName, nil }, Marshaler: marshaler, }) if err != nil { return nil, fault.Wrap(err) } commandBus, err := cqrs.NewCommandBusWithConfig(contextPub, cqrs.CommandBusConfig{ GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { return params.CommandName, nil }, Marshaler: marshaler, }) if err != nil { return nil, fault.Wrap(err) } eventProcessor, err := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{ GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { return params.EventName, nil }, SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { // NOTE: When we're using AMQP as the broker, because the fanout // logic requires separate subscribers per queue, we need to do // some additional setup to ensure that the subscriber is unique // to each consumer. This is how we get events properly fanned out // to subscribers of the same event. Internally this creates a new // subscriber for each event+service key and AMQP handles delivery. // NOTE 2: This is durable and needs to be optionally durable based // on parameters passed to Subscribe. But that's really awkward... if cfg.QueueType == "amqp" { apsc := amqp.NewDurablePubSubConfig(cfg.AmqpURL, func(topic string) string { return topic + "." + params.HandlerName }) subscriber, err := amqp.NewSubscriber(apsc, logger) if err != nil { return nil, err } return subscriber, nil } return sub, nil }, Marshaler: marshaler, }) if err != nil { return nil, fault.Wrap(err) } commandProcessor, err := cqrs.NewCommandProcessorWithConfig(router, cqrs.CommandProcessorConfig{ GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { return params.CommandName, nil }, SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { return sub, nil }, Marshaler: marshaler, }) if err != nil { return nil, fault.Wrap(err) } router.AddNoPublisherHandler("poison_queue_logger", "poison_queue", sub, func(msg *message.Message) error { l.Error("poisoned message received after all retries failed", slog.String("message_id", msg.UUID), slog.String("message_type", msg.Metadata.Get("name")), slog.String("reason", msg.Metadata.Get("reason_poisoned")), slog.String("original_topic", msg.Metadata.Get("topic_poisoned")), slog.String("handler", msg.Metadata.Get("handler_poisoned")), slog.String("payload", string(msg.Payload)), ) return nil }) lc.Append(fx.StartHook(func() { go func() { err := router.Run(ctx) if err != nil { l.Error("message router stopped unexpectedly", slog.String("error", err.Error()), ) os.Exit(0x12) } }() <-router.Running() })) lc.Append(fx.StopHook(func(ctx context.Context) error { if err := router.Close(); err != nil { return err } l.Info("message router stopped successfully") return nil })) return &Bus{ logger: l, cfg: cfg, pub: pub, sub: sub, router: router, eventBus: eventBus, commandBus: commandBus, eventProcessor: eventProcessor, commandProcessor: commandProcessor, subscriptions: make(map[subscriptionKey]*Subscription), }, nil } // Publish publishes an event and does not provide error handling semantics. // Most simple events can use this where a failure to publish isn't critical. func (b *Bus) Publish(ctx context.Context, event any) { if err := b.eventBus.Publish(ctx, event); err != nil { b.logger.Error("failed to publish event", slog.String("event_type", fmt.Sprintf("%T", event)), slog.String("error", err.Error()), ) } } func (b *Bus) PublishMany(ctx context.Context, events ...any) { for _, e := range events { b.Publish(ctx, e) } } // MustPublish is for when publishing is a critical requirement and errors must // prevent further procedures, for things like sending emails, etc. func (b *Bus) MustPublish(ctx context.Context, event any) error { if err := b.eventBus.Publish(ctx, event); err != nil { b.logger.Error("failed to publish event", slog.String("event_type", fmt.Sprintf("%T", event)), slog.String("error", err.Error()), ) return fault.Wrap(err, fctx.With(ctx)) } return nil } func (b *Bus) MustPublishMany(ctx context.Context, events ...any) error { var errs []error for _, event := range events { if err := b.eventBus.Publish(ctx, event); err != nil { b.logger.Error("failed to publish event", slog.String("event_type", fmt.Sprintf("%T", event)), slog.String("error", err.Error()), ) errs = append(errs, err) } } return errors.Join(errs...) } func (b *Bus) SendCommand(ctx context.Context, command any) error { if err := b.commandBus.Send(ctx, command); err != nil { b.logger.Error("failed to send command", slog.String("command_type", fmt.Sprintf("%T", command)), slog.String("error", err.Error()), ) return fault.Wrap(err, fctx.With(ctx)) } return nil } type Subscription struct { bus *Bus subkey subscriptionKey topic string closed bool mu sync.Mutex messageHandler *message.Handler } func (s *Subscription) Close() { s.mu.Lock() defer s.mu.Unlock() if s.closed { return } if s.messageHandler != nil { s.messageHandler.Stop() } s.closed = true s.bus.mu.Lock() delete(s.bus.subscriptions, s.subkey) s.bus.mu.Unlock() } type ( HandlerFunc[T any] func(ctx context.Context, event *T) error CommandHandlerFunc[T any] func(ctx context.Context, command *T) error ) func Subscribe[T any](ctx context.Context, bus *Bus, handlerName string, handler HandlerFunc[T]) (*Subscription, error) { topic := topicFromT[T]() subkey := subscriptionKey(handlerName) bus.mu.Lock() defer bus.mu.Unlock() if _, exists := bus.subscriptions[subkey]; exists { return nil, fmt.Errorf("subscription already exists: %s", subkey) } cqrsHandler := cqrs.NewEventHandler(handlerName, func(ctx context.Context, event *T) error { return handler(ctx, event) }) messageHandler, err := bus.eventProcessor.AddHandler(cqrsHandler) if err != nil { return nil, fault.Wrap(err, fctx.With(ctx)) } err = bus.router.RunHandlers(ctx) if err != nil { return nil, fault.Wrap(err, fctx.With(ctx)) } sub := &Subscription{ bus: bus, subkey: subkey, topic: topic, messageHandler: messageHandler, } bus.subscriptions[subkey] = sub return sub, nil } func SubscribeCommand[T any](ctx context.Context, bus *Bus, handlerName string, handler CommandHandlerFunc[T]) (*Subscription, error) { var zero T topic := topicFromValue(zero) subkey := subscriptionKey(handlerName) bus.mu.Lock() defer bus.mu.Unlock() if _, exists := bus.subscriptions[subkey]; exists { return nil, fmt.Errorf("subscription already exists: %s", subkey) } cqrsHandler := cqrs.NewCommandHandler(handlerName, func(ctx context.Context, command *T) error { return handler(ctx, command) }) if err := bus.commandProcessor.AddHandlers(cqrsHandler); err != nil { return nil, fault.Wrap(err, fctx.With(ctx)) } if err := bus.router.RunHandlers(ctx); err != nil { return nil, fault.Wrap(err, fctx.With(ctx)) } sub := &Subscription{ bus: bus, subkey: subkey, topic: topic, } bus.subscriptions[subkey] = sub return sub, nil } func topicFromValue(zero any) string { t := reflect.TypeOf(zero) if t.Kind() == reflect.Pointer { t = t.Elem() } to := t.String() return to } func topicFromT[T any]() string { var zero T t := reflect.TypeOf(zero) if t.Kind() == reflect.Pointer { t = t.Elem() } to := t.String() return to }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Southclaws/storyden'

If you have feedback or need assistance with the MCP directory API, please join our Discord server