Skip to main content
Glama
run_ingest.py6.25 kB
# ingest/run_ingest.py import os, argparse, uuid, csv, fnmatch from pathlib import Path from dotenv import load_dotenv import yaml from collections import defaultdict from qdrant_client.http.models import PointStruct from vectorstore.qdrant_init import connect_qdrant, ensure_collection from ingest.loaders import load_file from ingest.splitters import split_text from ingest.embed import Embedder DEFAULT_DOMAINS = ["engineering","finance","hr","marketing","general"] FOLDER_ALLOW = { "engineering": ["ENGINEERING", "C_LEVEL"], "finance": ["FINANCE", "C_LEVEL"], "hr": ["HR", "C_LEVEL"], "marketing": ["MARKETING", "C_LEVEL"], "general": ["EMPLOYEE","ENGINEERING","FINANCE","HR","MARKETING","C_LEVEL"], } # ---------- metadata helpers ---------- def load_metadata(meta_dir: Path) -> dict: data = {"defaults": {"allow": ["EMPLOYEE"]}, "rules": []} yml = meta_dir / "metadata.yaml" if yml.exists(): y = yaml.safe_load(yml.read_text(encoding="utf-8")) or {} data["defaults"] = y.get("defaults", data["defaults"]) for r in (y.get("files") or []): data["rules"].append(r) csvp = meta_dir / "metadata.csv" if csvp.exists(): with csvp.open() as f: rdr = csv.DictReader(f) for r in rdr: rule = { "path": r.get("path") or None, "glob": r.get("glob") or None, "allow": [a.strip() for a in (r.get("allow") or "").split(";") if a.strip()] or None, "tags": [t.strip() for t in (r.get("tags") or "").split(";") if t.strip()] or None, "doc_type": r.get("doc_type") or None, "title": r.get("title") or None, "boost": float(r.get("boost") or 1.0), "skip": (r.get("skip","").lower() == "true"), } data["rules"].append(rule) return data def resolve_meta(meta: dict, rel_path_from_data: str, folder_domain: str) -> dict: out = { "allow": FOLDER_ALLOW.get(folder_domain, ["EMPLOYEE"]), "tags": [], "doc_type": None, "title": None, "boost": 1.0, "skip": False, } for r in meta.get("rules", []): hit = False if r.get("path") and r["path"] == rel_path_from_data: hit = True elif r.get("glob") and fnmatch.fnmatch(rel_path_from_data, r["glob"]): hit = True if not hit: continue if r.get("allow"): out["allow"] = r["allow"] for k in ["tags","doc_type","title","boost","skip"]: if r.get(k) not in (None, [], "", False): out[k] = r[k] return out def iter_docs(root: Path): for path in root.rglob("*"): if path.is_file() and not path.name.startswith("."): yield path # ---------- main ---------- def main(): load_dotenv("env/.env") ap = argparse.ArgumentParser() ap.add_argument("--data-root", default="./data") ap.add_argument("--collections", nargs="*", default=DEFAULT_DOMAINS) args = ap.parse_args() data_root = Path(args.data_root).resolve() prefix = os.getenv("QDRANT_COLLECTION_PREFIX", "finrag") host = os.getenv("QDRANT_HOST", "localhost") port = int(os.getenv("QDRANT_PORT", "6333")) chunk = int(os.getenv("CHUNK_SIZE", "1200")) overlap= int(os.getenv("CHUNK_OVERLAP", "150")) embed_model = os.getenv("EMBED_MODEL", "text-embedding-3-large") client = connect_qdrant(host, port) embedder = Embedder(model=embed_model) meta = load_metadata(data_root / "_metadata") # ensure all base collections exist for d in DEFAULT_DOMAINS: ensure_collection(client, f"{prefix}_{d}", dim=3072) for domain in args.collections: domain_dir = data_root / domain if not domain_dir.exists(): print(f"[skip] {domain_dir} not found") continue print(f"\n[ingest] {domain} -> {prefix}_{domain}") # Bucket rows per target collection so we can upsert once per collection buckets: dict[str, list[tuple[str, dict]]] = defaultdict(list) for path in iter_docs(domain_dir): rel_from_data = str(path.relative_to(data_root)) try: raw = load_file(path) chunks = split_text(raw, chunk, overlap) m = resolve_meta(meta, rel_from_data, domain) if m["skip"]: continue # Build the base payload once for idx, chunk_text in enumerate(chunks): payload = { "source": str(path), "domain": domain, "chunk_id": idx, "role_allow": m["allow"], "text": chunk_text, "tags": m.get("tags") or [], "doc_type": m.get("doc_type"), "title": m.get("title"), "boost": float(m.get("boost", 1.0)), } # Always index in its own domain buckets[f"{prefix}_{domain}"].append((chunk_text, payload)) # If metadata says EMPLOYEE can see it, also publish to GENERAL if "EMPLOYEE" in payload["role_allow"] and domain != "general": buckets[f"{prefix}_general"].append((chunk_text, payload)) except Exception as e: print(f"[warn] failed to process {rel_from_data}: {e}") # Upsert per collection for collection, rows in buckets.items(): texts = [t for t,_ in rows] ploads = [p for _,p in rows] vectors = embedder.embed(texts) n = min(len(vectors), len(ploads)) points = [PointStruct(id=str(uuid.uuid4()), vector=vectors[i], payload=ploads[i]) for i in range(n)] ensure_collection(client, collection, dim=3072) client.upsert(collection_name=collection, points=points) print(f"[ok] upserted {len(points)} chunks into {collection}") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nithishkaranam2002/Finrag--mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server