coc_anchor
Submits the current chain hash to OpenTimestamps and RFC 3161 TSA for independent proof of existence at a specific time.
Instructions
Submit the current chain hash for external timestamping.
Computes SHA-256 of the full chain file and submits it to:
- OpenTimestamps calendar servers (Bitcoin-anchored proof)
- RFC 3161 TSA server (freeTSA.org — instant certificate)
This creates independently verifiable proof that the chain existed at a
specific point in time. The OTS proof takes 1-12 hours for Bitcoin
confirmation; the TSA certificate is immediate.
Requires network access. No credentials needed.
Returns:
JSON with chain hash, anchor ID, OTS/TSA submission results, and proof file pathsInput Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The coc_anchor tool handler function. It computes SHA-256 of the full chain file, submits the hash to OpenTimestamps calendar servers for Bitcoin-anchored proof and to an RFC 3161 TSA server (freeTSA.org) for an instant certificate. Saves anchor metadata, OTS/TST proof files, appends an 'anchor' entry to the chain, and returns JSON with results.
@mcp.tool() def coc_anchor() -> str: """Submit the current chain hash for external timestamping. Computes SHA-256 of the full chain file and submits it to: - OpenTimestamps calendar servers (Bitcoin-anchored proof) - RFC 3161 TSA server (freeTSA.org — instant certificate) This creates independently verifiable proof that the chain existed at a specific point in time. The OTS proof takes 1-12 hours for Bitcoin confirmation; the TSA certificate is immediate. Requires network access. No credentials needed. Returns: JSON with chain hash, anchor ID, OTS/TSA submission results, and proof file paths """ if not os.path.exists(CHAIN_FILE): return json.dumps({"error": "Chain file not found. Call coc_init first."}) with open(CHAIN_FILE, "rb") as f: chain_bytes = f.read() chain_hash = hashlib.sha256(chain_bytes).hexdigest() chain = _read_chain() seq = chain[-1]["seq"] if chain else 0 anchor_dir = os.path.join(CHAIN_DIR, "anchors") os.makedirs(anchor_dir, exist_ok=True) anchor_id = f"anchor_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}" anchor_meta = { "id": anchor_id, "chain_hash": chain_hash, "chain_length": len(chain), "latest_seq": seq, "latest_entry_hash": chain[-1]["entry_hash"] if chain else None, "timestamp": datetime.now(timezone.utc).isoformat(), "status": "pending", "ots_proof_file": None, } # Save anchor metadata anchor_meta_path = os.path.join(anchor_dir, f"{anchor_id}.json") with open(anchor_meta_path, "w", encoding="utf-8") as f: json.dump(anchor_meta, f, indent=2) hash_bytes = bytes.fromhex(chain_hash) ots_proof_path = os.path.join(anchor_dir, f"{anchor_id}.ots") results = {"anchor_id": anchor_id, "chain_hash": chain_hash, "ots": "skipped", "tsa": "skipped"} # OTS submission import urllib.request import ssl ots_servers = [ "https://a.pool.opentimestamps.org", "https://b.pool.opentimestamps.org", "https://a.pool.eternitywall.com", ] ots_success = False try: from opentimestamps.core.timestamp import DetachedTimestampFile, Timestamp from opentimestamps.core.op import OpSHA256 from opentimestamps.core.serialize import StreamSerializationContext, StreamDeserializationContext import io as _io timestamp = Timestamp(hash_bytes) detached = DetachedTimestampFile(OpSHA256(), timestamp) ssl_ctx = ssl.create_default_context() calendars_merged = 0 for server_url in ots_servers: try: req = urllib.request.Request( f"{server_url}/digest", data=hash_bytes, headers={ "Content-Type": "application/x-www-form-urlencoded", "User-Agent": "agent-trust-stack-mcp/0.1.0", "Accept": "application/vnd.opentimestamps.v1", }, method="POST", ) with urllib.request.urlopen(req, timeout=30, context=ssl_ctx) as resp: response_data = resp.read() if len(response_data) > 0: resp_buf = _io.BytesIO(response_data) resp_ctx = StreamDeserializationContext(resp_buf) cal_ts = Timestamp.deserialize(resp_ctx, hash_bytes) timestamp.merge(cal_ts) calendars_merged += 1 except Exception: continue if calendars_merged > 0: buf = _io.BytesIO() ser_ctx = StreamSerializationContext(buf) detached.serialize(ser_ctx) ots_data = buf.getvalue() with open(ots_proof_path, "wb") as pf: pf.write(ots_data) ots_success = True anchor_meta["status"] = "calendar_submitted_proper" anchor_meta["ots_proof_file"] = f"{anchor_id}.ots" anchor_meta["calendars_submitted"] = calendars_merged results["ots"] = f"submitted to {calendars_merged} calendar(s)" except ImportError: # Fallback: raw submission for server_url in ots_servers: try: req = urllib.request.Request( f"{server_url}/digest", data=hash_bytes, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) ctx = ssl.create_default_context() with urllib.request.urlopen(req, timeout=30, context=ctx) as resp: proof_data = resp.read() if len(proof_data) > 0: with open(ots_proof_path, "wb") as pf: pf.write(proof_data) ots_success = True anchor_meta["status"] = "submitted_raw" anchor_meta["ots_proof_file"] = f"{anchor_id}.ots" results["ots"] = f"raw submission to {server_url}" break except Exception: continue if not ots_success: results["ots"] = "all OTS servers failed" # TSA submission tsa_success = False try: from agent_trust_stack_mcp.tsa import build_rfc3161_tsq, parse_tsr_status tsa_url = "https://freetsa.org/tsr" tsq = build_rfc3161_tsq(hash_bytes) req = urllib.request.Request( tsa_url, data=tsq, headers={ "Content-Type": "application/timestamp-query", "User-Agent": "agent-trust-stack-mcp/0.1.0", }, method="POST", ) ctx = ssl.create_default_context() with urllib.request.urlopen(req, timeout=30, context=ctx) as resp: tsr_bytes = resp.read() tsr_info = parse_tsr_status(tsr_bytes) if tsr_info["status"] in (0, 1): tsr_path = os.path.join(anchor_dir, f"{anchor_id}.tsr") with open(tsr_path, "wb") as tf: tf.write(tsr_bytes) tsa_success = True anchor_meta["tsa_status"] = tsr_info["status_text"] anchor_meta["tsa_proof_file"] = f"{anchor_id}.tsr" results["tsa"] = tsr_info["status_text"] except Exception as e: results["tsa"] = f"failed: {e}" # Save final anchor metadata with open(anchor_meta_path, "w", encoding="utf-8") as f: json.dump(anchor_meta, f, indent=2) # Save hash file for manual verification hash_file = os.path.join(anchor_dir, f"{anchor_id}.hash") with open(hash_file, "w", encoding="utf-8") as f: f.write(chain_hash) # Add anchor entry to chain tiers = [] if ots_success: tiers.append("OTS/Bitcoin") if tsa_success: tiers.append("RFC3161/TSA") tier_str = " + ".join(tiers) if tiers else "local-only" anchor_entry = _make_entry( sequence=len(chain), event_type="anchor", data=f"Anchor submitted ({tier_str}). Chain hash: {chain_hash[:16]}... (seq 0-{seq}, {len(chain)} entries).", prev_hash=chain[-1]["entry_hash"], agent="mcp-server", ) _append_entry(anchor_entry) chain.append(anchor_entry) _update_meta(chain) results["anchor_entry_seq"] = anchor_entry["seq"] results["proof_dir"] = anchor_dir return json.dumps(results) - src/agent_trust_stack_mcp/server.py:587-587 (registration)The tool is registered via the @mcp.tool() decorator on line 587, which is how FastMCP exposes it as an MCP tool named 'coc_anchor'.
@mcp.tool() - The docstring serves as the schema definition — it documents that the tool takes no arguments, describes the OTS/TSA submission process, and specifies the JSON return format.
"""Submit the current chain hash for external timestamping. Computes SHA-256 of the full chain file and submits it to: - OpenTimestamps calendar servers (Bitcoin-anchored proof) - RFC 3161 TSA server (freeTSA.org — instant certificate) This creates independently verifiable proof that the chain existed at a specific point in time. The OTS proof takes 1-12 hours for Bitcoin confirmation; the TSA certificate is immediate. Requires network access. No credentials needed. Returns: - The tsa.py module provides helper functions (build_rfc3161_tsq and parse_tsr_status) used by coc_anchor to construct DER-encoded RFC 3161 TimeStampReq messages and parse the TSA response status.
"""RFC 3161 TSA helpers for Chain of Consciousness anchoring. Builds DER-encoded TimeStampReq messages and parses TimeStampResp status. No external dependencies — pure Python DER encoding. """ import secrets def _der_tag_length(tag: int, content: bytes) -> bytes: """Wrap content bytes with a DER tag-length header.""" length = len(content) if length < 0x80: return bytes([tag, length]) + content elif length < 0x100: return bytes([tag, 0x81, length]) + content else: return bytes([tag, 0x82, (length >> 8) & 0xFF, length & 0xFF]) + content def build_rfc3161_tsq(hash_bytes: bytes) -> bytes: """Build a DER-encoded RFC 3161 TimeStampReq for a SHA-256 digest. Structure per RFC 3161 Section 2.4.1: TimeStampReq ::= SEQUENCE { version INTEGER {v1(1)}, messageImprint MessageImprint, nonce INTEGER OPTIONAL, certReq BOOLEAN DEFAULT FALSE } """ # SHA-256 OID: 2.16.840.1.101.3.4.2.1 sha256_oid = _der_tag_length(0x06, bytes([ 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01 ])) alg_id = _der_tag_length(0x30, sha256_oid + bytes([0x05, 0x00])) # SEQUENCE { OID, NULL } msg_imprint = _der_tag_length(0x30, alg_id + _der_tag_length(0x04, hash_bytes)) version = _der_tag_length(0x02, bytes([0x01])) # INTEGER v1 # Random nonce for replay protection (positive integer) nonce_raw = secrets.token_bytes(8) if nonce_raw[0] & 0x80: nonce_raw = b"\x00" + nonce_raw nonce = _der_tag_length(0x02, nonce_raw) cert_req = _der_tag_length(0x01, bytes([0xFF])) # BOOLEAN TRUE return _der_tag_length(0x30, version + msg_imprint + nonce + cert_req) def parse_tsr_status(tsr_bytes: bytes) -> dict: """Parse an RFC 3161 TimeStampResp to extract status info. Returns dict with status code, text, token presence, and size. For full cryptographic verification use: openssl ts -verify """ STATUS_NAMES = { 0: "granted", 1: "grantedWithMods", 2: "rejection", 3: "waiting", 4: "revocationWarning", 5: "revocationNotification", } def read_tl(data: bytes, off: int) -> tuple: tag = data[off] off += 1 lb = data[off] off += 1 if lb < 0x80: return tag, lb, off n = lb & 0x7F length = 0 for _ in range(n): length = (length << 8) | data[off] off += 1 return tag, length, off try: _, outer_len, outer_start = read_tl(tsr_bytes, 0) outer_end = outer_start + outer_len _, si_len, si_start = read_tl(tsr_bytes, outer_start) si_end = si_start + si_len tag, int_len, int_start = read_tl(tsr_bytes, si_start) if tag != 0x02: return { "status": -1, "status_text": "parse_error: expected INTEGER", "has_token": False, "tsr_size": len(tsr_bytes), } status_val = int.from_bytes( tsr_bytes[int_start : int_start + int_len], "big", signed=True ) return { "status": status_val, "status_text": STATUS_NAMES.get(status_val, f"unknown({status_val})"), "has_token": si_end < outer_end, "tsr_size": len(tsr_bytes), } except (IndexError, ValueError) as e: return { "status": -1, "status_text": f"parse_error: {e}", "has_token": False, "tsr_size": len(tsr_bytes), }