Skip to main content
Glama

coc_anchor

Submits the current chain hash to OpenTimestamps and RFC 3161 TSA for independent proof of existence at a specific time.

Instructions

Submit the current chain hash for external timestamping.

Computes SHA-256 of the full chain file and submits it to:
- OpenTimestamps calendar servers (Bitcoin-anchored proof)
- RFC 3161 TSA server (freeTSA.org — instant certificate)

This creates independently verifiable proof that the chain existed at a
specific point in time. The OTS proof takes 1-12 hours for Bitcoin
confirmation; the TSA certificate is immediate.

Requires network access. No credentials needed.

Returns:
    JSON with chain hash, anchor ID, OTS/TSA submission results, and proof file paths

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The coc_anchor tool handler function. It computes SHA-256 of the full chain file, submits the hash to OpenTimestamps calendar servers for Bitcoin-anchored proof and to an RFC 3161 TSA server (freeTSA.org) for an instant certificate. Saves anchor metadata, OTS/TST proof files, appends an 'anchor' entry to the chain, and returns JSON with results.
    @mcp.tool()
    def coc_anchor() -> str:
        """Submit the current chain hash for external timestamping.
    
        Computes SHA-256 of the full chain file and submits it to:
        - OpenTimestamps calendar servers (Bitcoin-anchored proof)
        - RFC 3161 TSA server (freeTSA.org — instant certificate)
    
        This creates independently verifiable proof that the chain existed at a
        specific point in time. The OTS proof takes 1-12 hours for Bitcoin
        confirmation; the TSA certificate is immediate.
    
        Requires network access. No credentials needed.
    
        Returns:
            JSON with chain hash, anchor ID, OTS/TSA submission results, and proof file paths
        """
        if not os.path.exists(CHAIN_FILE):
            return json.dumps({"error": "Chain file not found. Call coc_init first."})
    
        with open(CHAIN_FILE, "rb") as f:
            chain_bytes = f.read()
        chain_hash = hashlib.sha256(chain_bytes).hexdigest()
    
        chain = _read_chain()
        seq = chain[-1]["seq"] if chain else 0
    
        anchor_dir = os.path.join(CHAIN_DIR, "anchors")
        os.makedirs(anchor_dir, exist_ok=True)
    
        anchor_id = f"anchor_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}"
        anchor_meta = {
            "id": anchor_id,
            "chain_hash": chain_hash,
            "chain_length": len(chain),
            "latest_seq": seq,
            "latest_entry_hash": chain[-1]["entry_hash"] if chain else None,
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "status": "pending",
            "ots_proof_file": None,
        }
    
        # Save anchor metadata
        anchor_meta_path = os.path.join(anchor_dir, f"{anchor_id}.json")
        with open(anchor_meta_path, "w", encoding="utf-8") as f:
            json.dump(anchor_meta, f, indent=2)
    
        hash_bytes = bytes.fromhex(chain_hash)
        ots_proof_path = os.path.join(anchor_dir, f"{anchor_id}.ots")
        results = {"anchor_id": anchor_id, "chain_hash": chain_hash, "ots": "skipped", "tsa": "skipped"}
    
        # OTS submission
        import urllib.request
        import ssl
    
        ots_servers = [
            "https://a.pool.opentimestamps.org",
            "https://b.pool.opentimestamps.org",
            "https://a.pool.eternitywall.com",
        ]
        ots_success = False
    
        try:
            from opentimestamps.core.timestamp import DetachedTimestampFile, Timestamp
            from opentimestamps.core.op import OpSHA256
            from opentimestamps.core.serialize import StreamSerializationContext, StreamDeserializationContext
            import io as _io
    
            timestamp = Timestamp(hash_bytes)
            detached = DetachedTimestampFile(OpSHA256(), timestamp)
            ssl_ctx = ssl.create_default_context()
            calendars_merged = 0
    
            for server_url in ots_servers:
                try:
                    req = urllib.request.Request(
                        f"{server_url}/digest",
                        data=hash_bytes,
                        headers={
                            "Content-Type": "application/x-www-form-urlencoded",
                            "User-Agent": "agent-trust-stack-mcp/0.1.0",
                            "Accept": "application/vnd.opentimestamps.v1",
                        },
                        method="POST",
                    )
                    with urllib.request.urlopen(req, timeout=30, context=ssl_ctx) as resp:
                        response_data = resp.read()
                    if len(response_data) > 0:
                        resp_buf = _io.BytesIO(response_data)
                        resp_ctx = StreamDeserializationContext(resp_buf)
                        cal_ts = Timestamp.deserialize(resp_ctx, hash_bytes)
                        timestamp.merge(cal_ts)
                        calendars_merged += 1
                except Exception:
                    continue
    
            if calendars_merged > 0:
                buf = _io.BytesIO()
                ser_ctx = StreamSerializationContext(buf)
                detached.serialize(ser_ctx)
                ots_data = buf.getvalue()
                with open(ots_proof_path, "wb") as pf:
                    pf.write(ots_data)
                ots_success = True
                anchor_meta["status"] = "calendar_submitted_proper"
                anchor_meta["ots_proof_file"] = f"{anchor_id}.ots"
                anchor_meta["calendars_submitted"] = calendars_merged
                results["ots"] = f"submitted to {calendars_merged} calendar(s)"
    
        except ImportError:
            # Fallback: raw submission
            for server_url in ots_servers:
                try:
                    req = urllib.request.Request(
                        f"{server_url}/digest",
                        data=hash_bytes,
                        headers={"Content-Type": "application/x-www-form-urlencoded"},
                        method="POST",
                    )
                    ctx = ssl.create_default_context()
                    with urllib.request.urlopen(req, timeout=30, context=ctx) as resp:
                        proof_data = resp.read()
                        if len(proof_data) > 0:
                            with open(ots_proof_path, "wb") as pf:
                                pf.write(proof_data)
                            ots_success = True
                            anchor_meta["status"] = "submitted_raw"
                            anchor_meta["ots_proof_file"] = f"{anchor_id}.ots"
                            results["ots"] = f"raw submission to {server_url}"
                            break
                except Exception:
                    continue
    
        if not ots_success:
            results["ots"] = "all OTS servers failed"
    
        # TSA submission
        tsa_success = False
        try:
            from agent_trust_stack_mcp.tsa import build_rfc3161_tsq, parse_tsr_status
    
            tsa_url = "https://freetsa.org/tsr"
            tsq = build_rfc3161_tsq(hash_bytes)
            req = urllib.request.Request(
                tsa_url,
                data=tsq,
                headers={
                    "Content-Type": "application/timestamp-query",
                    "User-Agent": "agent-trust-stack-mcp/0.1.0",
                },
                method="POST",
            )
            ctx = ssl.create_default_context()
            with urllib.request.urlopen(req, timeout=30, context=ctx) as resp:
                tsr_bytes = resp.read()
    
            tsr_info = parse_tsr_status(tsr_bytes)
            if tsr_info["status"] in (0, 1):
                tsr_path = os.path.join(anchor_dir, f"{anchor_id}.tsr")
                with open(tsr_path, "wb") as tf:
                    tf.write(tsr_bytes)
                tsa_success = True
                anchor_meta["tsa_status"] = tsr_info["status_text"]
                anchor_meta["tsa_proof_file"] = f"{anchor_id}.tsr"
                results["tsa"] = tsr_info["status_text"]
        except Exception as e:
            results["tsa"] = f"failed: {e}"
    
        # Save final anchor metadata
        with open(anchor_meta_path, "w", encoding="utf-8") as f:
            json.dump(anchor_meta, f, indent=2)
    
        # Save hash file for manual verification
        hash_file = os.path.join(anchor_dir, f"{anchor_id}.hash")
        with open(hash_file, "w", encoding="utf-8") as f:
            f.write(chain_hash)
    
        # Add anchor entry to chain
        tiers = []
        if ots_success:
            tiers.append("OTS/Bitcoin")
        if tsa_success:
            tiers.append("RFC3161/TSA")
        tier_str = " + ".join(tiers) if tiers else "local-only"
    
        anchor_entry = _make_entry(
            sequence=len(chain),
            event_type="anchor",
            data=f"Anchor submitted ({tier_str}). Chain hash: {chain_hash[:16]}... (seq 0-{seq}, {len(chain)} entries).",
            prev_hash=chain[-1]["entry_hash"],
            agent="mcp-server",
        )
        _append_entry(anchor_entry)
        chain.append(anchor_entry)
        _update_meta(chain)
    
        results["anchor_entry_seq"] = anchor_entry["seq"]
        results["proof_dir"] = anchor_dir
        return json.dumps(results)
  • The tool is registered via the @mcp.tool() decorator on line 587, which is how FastMCP exposes it as an MCP tool named 'coc_anchor'.
    @mcp.tool()
  • The docstring serves as the schema definition — it documents that the tool takes no arguments, describes the OTS/TSA submission process, and specifies the JSON return format.
    """Submit the current chain hash for external timestamping.
    
    Computes SHA-256 of the full chain file and submits it to:
    - OpenTimestamps calendar servers (Bitcoin-anchored proof)
    - RFC 3161 TSA server (freeTSA.org — instant certificate)
    
    This creates independently verifiable proof that the chain existed at a
    specific point in time. The OTS proof takes 1-12 hours for Bitcoin
    confirmation; the TSA certificate is immediate.
    
    Requires network access. No credentials needed.
    
    Returns:
  • The tsa.py module provides helper functions (build_rfc3161_tsq and parse_tsr_status) used by coc_anchor to construct DER-encoded RFC 3161 TimeStampReq messages and parse the TSA response status.
    """RFC 3161 TSA helpers for Chain of Consciousness anchoring.
    
    Builds DER-encoded TimeStampReq messages and parses TimeStampResp status.
    No external dependencies — pure Python DER encoding.
    """
    
    import secrets
    
    
    def _der_tag_length(tag: int, content: bytes) -> bytes:
        """Wrap content bytes with a DER tag-length header."""
        length = len(content)
        if length < 0x80:
            return bytes([tag, length]) + content
        elif length < 0x100:
            return bytes([tag, 0x81, length]) + content
        else:
            return bytes([tag, 0x82, (length >> 8) & 0xFF, length & 0xFF]) + content
    
    
    def build_rfc3161_tsq(hash_bytes: bytes) -> bytes:
        """Build a DER-encoded RFC 3161 TimeStampReq for a SHA-256 digest.
    
        Structure per RFC 3161 Section 2.4.1:
          TimeStampReq ::= SEQUENCE {
              version INTEGER {v1(1)}, messageImprint MessageImprint,
              nonce INTEGER OPTIONAL, certReq BOOLEAN DEFAULT FALSE }
        """
        # SHA-256 OID: 2.16.840.1.101.3.4.2.1
        sha256_oid = _der_tag_length(0x06, bytes([
            0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01
        ]))
        alg_id = _der_tag_length(0x30, sha256_oid + bytes([0x05, 0x00]))  # SEQUENCE { OID, NULL }
        msg_imprint = _der_tag_length(0x30, alg_id + _der_tag_length(0x04, hash_bytes))
    
        version = _der_tag_length(0x02, bytes([0x01]))  # INTEGER v1
    
        # Random nonce for replay protection (positive integer)
        nonce_raw = secrets.token_bytes(8)
        if nonce_raw[0] & 0x80:
            nonce_raw = b"\x00" + nonce_raw
        nonce = _der_tag_length(0x02, nonce_raw)
    
        cert_req = _der_tag_length(0x01, bytes([0xFF]))  # BOOLEAN TRUE
    
        return _der_tag_length(0x30, version + msg_imprint + nonce + cert_req)
    
    
    def parse_tsr_status(tsr_bytes: bytes) -> dict:
        """Parse an RFC 3161 TimeStampResp to extract status info.
    
        Returns dict with status code, text, token presence, and size.
        For full cryptographic verification use: openssl ts -verify
        """
        STATUS_NAMES = {
            0: "granted",
            1: "grantedWithMods",
            2: "rejection",
            3: "waiting",
            4: "revocationWarning",
            5: "revocationNotification",
        }
    
        def read_tl(data: bytes, off: int) -> tuple:
            tag = data[off]
            off += 1
            lb = data[off]
            off += 1
            if lb < 0x80:
                return tag, lb, off
            n = lb & 0x7F
            length = 0
            for _ in range(n):
                length = (length << 8) | data[off]
                off += 1
            return tag, length, off
    
        try:
            _, outer_len, outer_start = read_tl(tsr_bytes, 0)
            outer_end = outer_start + outer_len
    
            _, si_len, si_start = read_tl(tsr_bytes, outer_start)
            si_end = si_start + si_len
    
            tag, int_len, int_start = read_tl(tsr_bytes, si_start)
            if tag != 0x02:
                return {
                    "status": -1,
                    "status_text": "parse_error: expected INTEGER",
                    "has_token": False,
                    "tsr_size": len(tsr_bytes),
                }
            status_val = int.from_bytes(
                tsr_bytes[int_start : int_start + int_len], "big", signed=True
            )
    
            return {
                "status": status_val,
                "status_text": STATUS_NAMES.get(status_val, f"unknown({status_val})"),
                "has_token": si_end < outer_end,
                "tsr_size": len(tsr_bytes),
            }
        except (IndexError, ValueError) as e:
            return {
                "status": -1,
                "status_text": f"parse_error: {e}",
                "has_token": False,
                "tsr_size": len(tsr_bytes),
            }
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

Discloses key behaviors: requires network access, no credentials needed, OTS takes 1-12 hours, TSA immediate, returns JSON with specified fields. Lacks explicit statement on side effects (non-destructive assumed).

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

Concise 7-sentence description, front-loaded with purpose, well-structured with bullet points for services and results.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness5/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Covers purpose, behavioral context, return format, and prerequisites. No output schema provided but description fills gaps adequately for a zero-param tool.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters4/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

No parameters in schema; description adds no param info as none exist. Baseline 4 applies per instructions for 0-param tools.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

Clearly states the tool submits chain hash for external timestamping, listing specific services (OpenTimestamps, RFC 3161 TSA). Differentiates from sibling tools like coc_verify and coc_status by focusing on timestamping.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

Implies usage when timestamping proof is needed, but no explicit guidance on when not to use or alternatives among siblings.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/alexfleetcommander/agent-trust-stack-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server