"""
FARNS GPU-Signed Model Attestation
=====================================
Hardware-attested model provenance — the first chain of custody for AI models
where the GPU hardware itself signs the model identity.
When a model loads on a GPU:
attestation = BLAKE3(gpu_fingerprint || model_weight_hash || timestamp || node_identity)
This proves:
- WHAT model is running (weight hash)
- WHERE it's running (GPU fingerprint = physical hardware)
- WHEN it was loaded (timestamp)
- WHO loaded it (node identity)
Use cases:
- Verify remote nodes are running the model they claim
- Detect model swaps or weight poisoning
- Build trust scores based on attestation history
- PRO nodes prove their compute is legitimate
Chain of custody: Every model load creates an attestation, forming an
immutable chain. If a model is swapped, the chain breaks.
"""
import os
import time
import json
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from pathlib import Path
from loguru import logger
from .farns_auth import blake3
from .farns_protocol import PacketType, FARNSPacket
from .farns_config import FARNS_DATA_DIR, ensure_dirs
ATTESTATION_FILE = FARNS_DATA_DIR / "model_attestations.json"
@dataclass
class ModelAttestation:
"""A GPU-signed attestation that a specific model is running on specific hardware."""
attestation_id: str
node_name: str
model_name: str
model_hash: bytes # BLAKE3(model_name) — in production: hash of actual weights
gpu_fingerprint: bytes # 32-byte hardware fingerprint
gpu_model: str # Human-readable GPU name
node_identity: bytes # Node's identity token
timestamp: float
seal: bytes # BLAKE3(gpu_fp || model_hash || ts || identity)
chain_prev: bytes # Previous attestation seal (chain link)
chain_seq: int # Position in chain
def to_dict(self) -> Dict:
return {
"id": self.attestation_id,
"node": self.node_name,
"model": self.model_name,
"model_hash": self.model_hash.hex(),
"gpu_fp": self.gpu_fingerprint.hex(),
"gpu_model": self.gpu_model,
"ts": self.timestamp,
"seal": self.seal.hex(),
"chain_prev": self.chain_prev.hex(),
"chain_seq": self.chain_seq,
}
@classmethod
def from_dict(cls, d: Dict) -> "ModelAttestation":
return cls(
attestation_id=d.get("id", ""),
node_name=d.get("node", ""),
model_name=d.get("model", ""),
model_hash=bytes.fromhex(d.get("model_hash", "00" * 32)),
gpu_fingerprint=bytes.fromhex(d.get("gpu_fp", "00" * 32)),
gpu_model=d.get("gpu_model", ""),
node_identity=b"",
timestamp=d.get("ts", 0.0),
seal=bytes.fromhex(d.get("seal", "00" * 16)),
chain_prev=bytes.fromhex(d.get("chain_prev", "00" * 16)),
chain_seq=d.get("chain_seq", 0),
)
def compute_model_hash(model_name: str) -> bytes:
"""
Compute a model's identity hash.
In production, this would hash actual model weights.
For now, we hash model name + probe Ollama for model metadata.
"""
return blake3(f"model:{model_name}".encode()).digest()
def compute_model_hash_with_weights(model_name: str) -> bytes:
"""
Compute model hash including actual weight digest from Ollama.
Queries Ollama's /api/show endpoint for the model's digest,
which is the SHA256 of the model weights. We wrap that in BLAKE3.
"""
try:
import httpx
resp = httpx.get(
"http://localhost:11434/api/show",
params={"name": model_name},
timeout=10.0,
)
if resp.status_code == 200:
data = resp.json()
digest = data.get("digest", "")
size = data.get("size", 0)
params = data.get("details", {}).get("parameter_size", "")
payload = f"model:{model_name}:digest:{digest}:size:{size}:params:{params}"
return blake3(payload.encode()).digest()
except Exception:
pass
return compute_model_hash(model_name)
class ModelAttestor:
"""
GPU-Signed Model Attestation engine.
Creates and verifies hardware-bound attestations forming a chain of custody.
"""
def __init__(self, node_name: str, node_identity: bytes,
gpu_fingerprint: bytes, gpu_model: str = ""):
self.node_name = node_name
self._identity = node_identity
self._gpu_fp = gpu_fingerprint
self._gpu_model = gpu_model
self._attestations: List[ModelAttestation] = []
self._chain_seq = 0
self._last_seal = b"\x00" * 16 # Genesis seal
self._remote_attestations: Dict[str, List[ModelAttestation]] = {}
self._load_state()
def _load_state(self):
"""Load persisted attestation chain."""
try:
if ATTESTATION_FILE.exists():
data = json.loads(ATTESTATION_FILE.read_text())
for entry in data.get("local", []):
att = ModelAttestation.from_dict(entry)
self._attestations.append(att)
self._chain_seq = data.get("chain_seq", 0)
last_seal = data.get("last_seal", "")
if last_seal:
self._last_seal = bytes.fromhex(last_seal)
except Exception as e:
logger.debug(f"Could not load attestation state: {e}")
def _save_state(self):
"""Persist attestation chain."""
try:
ensure_dirs()
data = {
"node": self.node_name,
"chain_seq": self._chain_seq,
"last_seal": self._last_seal.hex(),
"local": [a.to_dict() for a in self._attestations[-100:]],
}
ATTESTATION_FILE.write_text(json.dumps(data, indent=2))
except Exception as e:
logger.debug(f"Could not save attestation state: {e}")
def attest_model(self, model_name: str,
use_weights: bool = True) -> ModelAttestation:
"""
Create a hardware-bound attestation for a loaded model.
attestation_seal = BLAKE3(gpu_fp || model_hash || timestamp || identity || chain_prev)
"""
if use_weights:
model_hash = compute_model_hash_with_weights(model_name)
else:
model_hash = compute_model_hash(model_name)
ts = time.time()
att_id = blake3(
f"{self.node_name}:{model_name}:{ts}".encode()
).hexdigest()[:12]
# Compute seal binding all fields + chain link
seal_payload = (
self._gpu_fp
+ model_hash
+ str(ts).encode()
+ self._identity
+ self._last_seal # Chain link to previous attestation
)
seal = blake3(seal_payload).digest()[:16]
att = ModelAttestation(
attestation_id=att_id,
node_name=self.node_name,
model_name=model_name,
model_hash=model_hash,
gpu_fingerprint=self._gpu_fp,
gpu_model=self._gpu_model,
node_identity=self._identity,
timestamp=ts,
seal=seal,
chain_prev=self._last_seal,
chain_seq=self._chain_seq,
)
self._attestations.append(att)
self._last_seal = seal
self._chain_seq += 1
self._save_state()
logger.info(
f"Model attested: {model_name} on {self._gpu_model}, "
f"seal={seal.hex()}, chain_seq={att.chain_seq}"
)
return att
def verify_attestation(self, attestation: ModelAttestation,
node_identity: bytes, gpu_fingerprint: bytes) -> bool:
"""
Verify a remote attestation.
Recomputes the seal from the claimed fields + known identity/gpu.
"""
seal_payload = (
gpu_fingerprint
+ attestation.model_hash
+ str(attestation.timestamp).encode()
+ node_identity
+ attestation.chain_prev
)
expected_seal = blake3(seal_payload).digest()[:16]
if expected_seal != attestation.seal:
logger.warning(
f"Attestation verification FAILED for {attestation.model_name} "
f"from {attestation.node_name}"
)
return False
return True
def verify_chain(self, attestations: List[ModelAttestation]) -> bool:
"""Verify a chain of attestations is unbroken."""
if not attestations:
return True
for i in range(1, len(attestations)):
if attestations[i].chain_prev != attestations[i - 1].seal:
logger.warning(
f"Attestation chain broken at seq {attestations[i].chain_seq}: "
f"prev_seal mismatch"
)
return False
if attestations[i].chain_seq != attestations[i - 1].chain_seq + 1:
logger.warning(
f"Attestation chain gap: {attestations[i-1].chain_seq} → "
f"{attestations[i].chain_seq}"
)
return False
return True
def add_remote_attestation(self, attestation: ModelAttestation):
"""Store a remote node's attestation for trust tracking."""
node = attestation.node_name
if node not in self._remote_attestations:
self._remote_attestations[node] = []
self._remote_attestations[node].append(attestation)
# Keep bounded
if len(self._remote_attestations[node]) > 100:
self._remote_attestations[node] = self._remote_attestations[node][-50:]
def get_trust_score(self, node_name: str) -> float:
"""
Compute trust score for a remote node based on attestation history.
Score = (verified_attestations / total_attestations) * chain_integrity_bonus
"""
atts = self._remote_attestations.get(node_name, [])
if not atts:
return 0.5 # Unknown node — neutral trust
# Chain integrity bonus
chain_ok = self.verify_chain(atts)
chain_bonus = 1.0 if chain_ok else 0.5
# Consistency: are they always running the same models?
model_changes = 0
for i in range(1, len(atts)):
if atts[i].model_name != atts[i - 1].model_name:
model_changes += 1
consistency = max(0.5, 1.0 - (model_changes / max(len(atts), 1)))
return min(1.0, chain_bonus * consistency)
def get_all_attestations(self) -> List[Dict]:
"""Get all local attestations."""
return [a.to_dict() for a in self._attestations]
def get_chain_summary(self) -> Dict:
"""Get summary of attestation chain."""
return {
"node": self.node_name,
"gpu_model": self._gpu_model,
"chain_length": self._chain_seq,
"last_seal": self._last_seal.hex(),
"models_attested": list(set(a.model_name for a in self._attestations)),
"remote_nodes_tracked": list(self._remote_attestations.keys()),
"trust_scores": {
node: round(self.get_trust_score(node), 3)
for node in self._remote_attestations
},
}
def make_model_attest(sender: str, attestation: ModelAttestation) -> FARNSPacket:
"""Create a MODEL_ATTEST packet — broadcast model attestation to mesh."""
return FARNSPacket(
packet_type=PacketType.MODEL_ATTEST,
sender=sender,
target="*",
data=attestation.to_dict(),
)