Skip to main content
Glama
Southclaws

Storyden

by Southclaws
reindexer.go6.1 kB
package search_indexer import ( "context" "log/slog" "time" "entgo.io/ent/dialect/sql" "github.com/Southclaws/dt" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fctx" "github.com/rs/xid" "github.com/Southclaws/storyden/app/resources/datagraph" "github.com/Southclaws/storyden/app/resources/library" "github.com/Southclaws/storyden/app/resources/post/reply" "github.com/Southclaws/storyden/app/resources/post/thread" "github.com/Southclaws/storyden/internal/ent/node" ent_node "github.com/Southclaws/storyden/internal/ent/node" ent_post "github.com/Southclaws/storyden/internal/ent/post" ) func (idx *Indexer) ReindexAll(ctx context.Context) error { started := time.Now().UTC() idx.logger.Info("starting full reindex", slog.Int("chunk_size", idx.chunkSize), ) tn, err := idx.reindexThreads(ctx) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } rn, err := idx.reindexReplies(ctx) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } nn, err := idx.reindexNodes(ctx) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } processed := tn + rn + nn if processed > 0 { idx.logger.Info("reindex completed", slog.Duration("duration", time.Since(started)), slog.Int("processed", processed), ) } else { idx.logger.Info("reindex skipped: nothing to reindex") } return nil } func (idx *Indexer) reindexThreads(ctx context.Context) (int, error) { return reindex(ctx, idx, func() ([]datagraph.Item, error) { threads, err := idx.db.Post.Query(). Where( ent_post.RootPostIDIsNil(), ent_post.VisibilityEQ(ent_post.VisibilityPublished), func(s *sql.Selector) { s.Where(sql.Or( sql.IsNull(ent_post.FieldIndexedAt), sql.GT(ent_post.FieldUpdatedAt, sql.Raw(ent_post.FieldIndexedAt)), )) }, ). WithTags(). Order(ent_post.ByUpdatedAt(), ent_post.ByID()). Limit(idx.chunkSize). All(ctx) if err != nil { return nil, fault.Wrap(err, fctx.With(ctx)) } return dt.MapErr(threads, thread.ItemRef) }, func(ids []xid.ID, t time.Time) (int, error) { i, err := idx.db.Post.Update(). Where(ent_post.IDIn(ids...)). SetIndexedAt(t). Save(ctx) if err != nil { return 0, fault.Wrap(err, fctx.With(ctx)) } return i, nil }) } func (idx *Indexer) reindexReplies(ctx context.Context) (int, error) { return reindex(ctx, idx, func() ([]datagraph.Item, error) { replies, err := idx.db.Post.Query(). Where( ent_post.RootPostIDNotNil(), // NOTE: Prior to version v1.25.12, replies would always be set // to visibility draft on creation, as visibility was never used // for replies. Since v1.25.12, replies always get visibility // published (semantically correct, and we may use it in future // for draft replies, etc.) However, to ensure older instances // are indexed correctly in 1.25.12+, we do not filter by the // visibility. Yet... this will change once replies have a use // case for visibility. The hope is, someone won't jump from // version < 1.25.11 to whatever that version is that will add // draft reply support, and thus removing this filter. Reason // being is, we wouldn't want non-published replies to end up // in the search index. For now, we leave this commented out and // that bridge to be crossed when the time comes. // ent_post.VisibilityEQ(ent_post.VisibilityPublished), func(s *sql.Selector) { s.Where(sql.Or( sql.IsNull(ent_post.FieldIndexedAt), sql.GT(ent_post.FieldUpdatedAt, sql.Raw(ent_post.FieldIndexedAt)), )) }, ). Order(ent_post.ByUpdatedAt(), ent_post.ByID()). Limit(idx.chunkSize). All(ctx) if err != nil { return nil, fault.Wrap(err, fctx.With(ctx)) } return dt.MapErr(replies, reply.ItemRef) }, func(ids []xid.ID, t time.Time) (int, error) { i, err := idx.db.Post.Update(). Where(ent_post.IDIn(ids...)). SetIndexedAt(t). Save(ctx) if err != nil { return 0, fault.Wrap(err, fctx.With(ctx)) } return i, nil }) } func (idx *Indexer) reindexNodes(ctx context.Context) (int, error) { return reindex(ctx, idx, func() ([]datagraph.Item, error) { threads, err := idx.db.Node.Query(). Where( node.VisibilityEQ(node.VisibilityPublished), func(s *sql.Selector) { s.Where(sql.Or( sql.IsNull(node.FieldIndexedAt), sql.GT(node.FieldUpdatedAt, sql.Raw(node.FieldIndexedAt)), )) }, ). Order(node.ByUpdatedAt(), node.ByID()). Limit(idx.chunkSize). All(ctx) if err != nil { return nil, fault.Wrap(err, fctx.With(ctx)) } return dt.MapErr(threads, library.ItemRef) }, func(ids []xid.ID, t time.Time) (int, error) { i, err := idx.db.Node.Update(). Where(ent_node.IDIn(ids...)). SetIndexedAt(t). Save(ctx) if err != nil { return 0, fault.Wrap(err, fctx.With(ctx)) } return i, nil }) } func reindex[T datagraph.Item]( ctx context.Context, idx *Indexer, fetch func() ([]T, error), update func([]xid.ID, time.Time) (int, error), ) (int, error) { ids := make([]xid.ID, 0, idx.chunkSize) processed := 0 var k datagraph.Kind for { ids = ids[:0] v, err := fetch() if err != nil { return processed, fault.Wrap(err, fctx.With(ctx)) } if len(v) == 0 { break } for i, item := range v { err := idx.searchIndexer.Index(ctx, item) if err != nil { idx.logger.Error("failed to index item", slog.String("kind", item.GetKind().String()), slog.String("id", item.GetID().String()), slog.String("error", err.Error()), ) } ids = append(ids, item.GetID()) if i == 0 { k = item.GetKind() } } // Go 2 seconds forward, to ensure databases with 1 second precision do // not round the indexed at time above the updated at time. n, err := update(ids, time.Now().Add((2 * time.Second))) if err != nil { return processed, fault.Wrap(err, fctx.With(ctx)) } processed += n idx.logger.Debug("reindexed chunk", slog.String("kind", k.String()), slog.Int("processed", n), ) if len(v) < idx.chunkSize { break } } return processed, nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Southclaws/storyden'

If you have feedback or need assistance with the MCP directory API, please join our Discord server