We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/noah-vh/mcp-server-clickup'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import logging
import tarfile
import fsspec
from fsspec.archive import AbstractArchiveFileSystem
from fsspec.compression import compr
from fsspec.utils import infer_compression
typemap = {b"0": "file", b"5": "directory"}
logger = logging.getLogger("tar")
class TarFileSystem(AbstractArchiveFileSystem):
"""Compressed Tar archives as a file-system (read-only)
Supports the following formats:
tar.gz, tar.bz2, tar.xz
"""
root_marker = ""
protocol = "tar"
cachable = False
def __init__(
self,
fo="",
index_store=None,
target_options=None,
target_protocol=None,
compression=None,
**kwargs,
):
super().__init__(**kwargs)
target_options = target_options or {}
if isinstance(fo, str):
self.of = fsspec.open(fo, protocol=target_protocol, **target_options)
fo = self.of.open() # keep the reference
# Try to infer compression.
if compression is None:
name = None
# Try different ways to get hold of the filename. `fo` might either
# be a `fsspec.LocalFileOpener`, an `io.BufferedReader` or an
# `fsspec.AbstractFileSystem` instance.
try:
# Amended io.BufferedReader or similar.
# This uses a "protocol extension" where original filenames are
# propagated to archive-like filesystems in order to let them
# infer the right compression appropriately.
if hasattr(fo, "original"):
name = fo.original
# fsspec.LocalFileOpener
elif hasattr(fo, "path"):
name = fo.path
# io.BufferedReader
elif hasattr(fo, "name"):
name = fo.name
# fsspec.AbstractFileSystem
elif hasattr(fo, "info"):
name = fo.info()["name"]
except Exception as ex:
logger.warning(
f"Unable to determine file name, not inferring compression: {ex}"
)
if name is not None:
compression = infer_compression(name)
logger.info(f"Inferred compression {compression} from file name {name}")
if compression is not None:
# TODO: tarfile already implements compression with modes like "'r:gz'",
# but then would seek to offset in the file work?
fo = compr[compression](fo)
self._fo_ref = fo
self.fo = fo # the whole instance is a context
self.tar = tarfile.TarFile(fileobj=self.fo)
self.dir_cache = None
self.index_store = index_store
self.index = None
self._index()
def _index(self):
# TODO: load and set saved index, if exists
out = {}
for ti in self.tar:
info = ti.get_info()
info["type"] = typemap.get(info["type"], "file")
name = ti.get_info()["name"].rstrip("/")
out[name] = (info, ti.offset_data)
self.index = out
# TODO: save index to self.index_store here, if set
def _get_dirs(self):
if self.dir_cache is not None:
return
# This enables ls to get directories as children as well as files
self.dir_cache = {
dirname: {"name": dirname, "size": 0, "type": "directory"}
for dirname in self._all_dirnames(self.tar.getnames())
}
for member in self.tar.getmembers():
info = member.get_info()
info["name"] = info["name"].rstrip("/")
info["type"] = typemap.get(info["type"], "file")
self.dir_cache[info["name"]] = info
def _open(self, path, mode="rb", **kwargs):
if mode != "rb":
raise ValueError("Read-only filesystem implementation")
details, offset = self.index[path]
if details["type"] != "file":
raise ValueError("Can only handle regular files")
return self.tar.extractfile(path)