Story SDK MCP Server
Official
by piplabs
- story-mcp-hub
- story-sdk-mcp
- services
from web3 import Web3
from story_protocol_python_sdk.story_client import StoryClient
from story_protocol_python_sdk.resources.NFTClient import NFTClient
import requests
import os
from typing import Union
import time
import json
import sys
from pathlib import Path
# Add the parent directory to the Python path so we can import utils
sys.path.append(str(Path(__file__).parent.parent.parent))
from utils.address_resolver import create_address_resolver
from utils.contract_addresses import get_contracts_by_chain_id, CHAIN_IDS
class StoryService:
def __init__(self, rpc_url: str, private_key: str, network: str = None):
"""
Initialize Story Protocol service with RPC URL and private key.
Args:
rpc_url: RPC URL for the blockchain
private_key: Private key for signing transactions
network: Optional network name ('aeneid' or 'mainnet') to override auto-detection
"""
self.web3 = Web3(Web3.HTTPProvider(rpc_url))
if not self.web3.is_connected():
raise Exception("Failed to connect to the Web3 provider")
self.account = self.web3.eth.account.from_key(private_key)
# Detect chain ID
self.chain_id = self.web3.eth.chain_id
# If network is explicitly provided, use it instead of auto-detection
if network:
if network.lower() not in ["aeneid", "mainnet"]:
raise ValueError(
f"Unsupported network: {network}. Must be 'aeneid' or 'mainnet'"
)
self.network = network.lower()
self.chain_id = CHAIN_IDS[self.network]
else:
# Auto-detect network based on chain ID
if self.chain_id == CHAIN_IDS["aeneid"]:
self.network = "aeneid"
elif self.chain_id == CHAIN_IDS["mainnet"]:
self.network = "mainnet"
else:
raise ValueError(
f"Unsupported chain ID: {self.chain_id}. Must be {CHAIN_IDS['aeneid']} (Aeneid) or {CHAIN_IDS['mainnet']} (Mainnet)"
)
# Initialize Story client with detected chain ID
self.client = StoryClient(
web3=self.web3, account=self.account, chain_id=self.chain_id
)
# Manually initialize the NFTClient
self.nft_client = NFTClient(
web3=self.web3, account=self.account, chain_id=self.chain_id
)
# Get contract addresses for the detected network
self.contracts = get_contracts_by_chain_id(self.chain_id)
# Set license template from contracts
self.LICENSE_TEMPLATE = self.contracts["PILicenseTemplate"]
# Initialize Pinata JWT
self.pinata_jwt = os.getenv("PINATA_JWT")
if not self.pinata_jwt:
self.ipfs_enabled = False
print(
"Warning: PINATA_JWT environment variable not found. IPFS functions will be disabled."
)
else:
self.ipfs_enabled = True
# Initialize address resolver
self.address_resolver = create_address_resolver(
self.web3, chain_id=CHAIN_IDS["mainnet"]
) # Story Protocol chain ID for .ip domains
def get_license_terms(self, license_terms_id: int) -> dict:
"""Get the license terms for a specific ID."""
response = self.client.License.getLicenseTerms(license_terms_id)
if not response:
raise ValueError(f"No license terms found for ID {license_terms_id}")
return {
"transferable": response[0],
"royaltyPolicy": response[1],
"defaultMintingFee": response[2],
"expiration": response[3],
"commercialUse": response[4],
"commercialAttribution": response[5],
"commercializerChecker": response[6],
"commercializerCheckerData": response[7].hex()
if isinstance(response[7], bytes)
else response[7],
"commercialRevShare": response[8],
"commercialRevCeiling": response[9],
"derivativesAllowed": response[10],
"derivativesAttribution": response[11],
"derivativesApproval": response[12],
"derivativesReciprocal": response[13],
"derivativeRevCeiling": response[14],
"currency": response[15],
"uri": response[16],
}
def mint_license_tokens(
self,
licensor_ip_id: str,
license_terms_id: int,
receiver: str = None,
amount: int = 1,
max_minting_fee: int = None,
max_revenue_share: int = None,
) -> dict:
"""
Mint license tokens for a specific IP and license terms.
Args:
licensor_ip_id: The IP ID to mint licenses for
license_terms_id: The license terms ID to use
receiver: Address or domain name to receive the license tokens (defaults to caller)
amount: Number of license tokens to mint (defaults to 1)
max_minting_fee: Optional maximum minting fee
max_revenue_share: Optional maximum revenue share percentage (0-100,000,000)
"""
try:
# Resolve receiver address if provided
resolved_receiver = (
self.address_resolver.resolve_address(receiver)
if receiver
else self.account.address
)
# Build kwargs dict with only provided parameters
kwargs = {
"licensor_ip_id": licensor_ip_id,
"license_template": self.LICENSE_TEMPLATE, # Use default template
"license_terms_id": license_terms_id,
"amount": amount,
"receiver": resolved_receiver,
}
if max_minting_fee is not None:
kwargs["max_minting_fee"] = max_minting_fee
if max_revenue_share is not None:
kwargs["max_revenue_share"] = max_revenue_share
response = self.client.License.mintLicenseTokens(**kwargs)
return response
except Exception as e:
print(f"Error minting license tokens: {str(e)}")
raise
def send_ip(self, to_address: str, amount: float) -> dict:
"""
Send IP tokens to a specified address using native token transfer.
:param to_address: Recipient's address or domain name
:param amount: Amount of IP tokens to send (1 IP = 1 Ether)
:return: Transaction details
"""
try:
# Resolve the recipient address
resolved_address = self.address_resolver.resolve_address(to_address)
# Convert amount to Wei (1 IP = 1 Ether)
value_in_wei = self.web3.to_wei(amount, "ether")
print(f"Debug: Account address: {self.account.address}")
print(f"Debug: Network connected: {self.web3.eth.chain_id}")
print(
f"Debug: Account balance: {self.web3.eth.get_balance(self.account.address)}"
)
# Set a default gas price if we can't get it from the network
try:
gas_price = self.web3.eth.gas_price
except Exception:
# Fallback gas price (50 gwei)
gas_price = self.web3.to_wei(50, "gwei")
# Estimate gas limit for this transaction
try:
gas_estimate = self.web3.eth.estimate_gas(
{
"to": resolved_address,
"from": self.account.address,
"value": value_in_wei,
}
)
except Exception:
# Fallback gas limit
gas_estimate = 21000 # Standard transfer gas limit
# Build the transaction with dynamic gas settings
transaction = {
"to": resolved_address,
"value": value_in_wei,
"gas": gas_estimate,
"gasPrice": gas_price,
"nonce": self.web3.eth.get_transaction_count(self.account.address),
"chainId": 1315, # Story Protocol chain ID
}
# Sign and send the transaction
signed_txn = self.account.sign_transaction(transaction)
tx_hash = self.web3.eth.send_raw_transaction(signed_txn.raw_transaction)
# Wait for transaction receipt
tx_receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash)
return {"txHash": tx_hash.hex(), "txReceipt": tx_receipt}
except Exception as e:
print(f"Error details: {str(e)}")
raise
def upload_image_to_ipfs(self, image_data: Union[bytes, str]) -> str:
"""Upload an image to IPFS using Pinata API"""
if not self.ipfs_enabled:
raise Exception(
"IPFS functions are disabled. Please provide PINATA_JWT environment variable."
)
try:
# If image_data is a URL, download it first
if isinstance(image_data, str) and image_data.startswith("http"):
response = requests.get(image_data)
image_data = response.content
# Upload to Pinata
headers = {"Authorization": f"Bearer {self.pinata_jwt}"}
files = {"file": ("image.png", image_data, "image/png")}
response = requests.post(
"https://api.pinata.cloud/pinning/pinFileToIPFS",
files=files,
headers=headers,
)
if response.status_code != 200:
raise Exception(f"Failed to upload to IPFS: {response.text}")
return f"ipfs://{response.json()['IpfsHash']}"
except Exception as e:
print(f"Error uploading to IPFS: {str(e)}")
raise
def create_ip_metadata(
self, image_uri: str, name: str, description: str, attributes: list = None
) -> dict:
"""
Create both NFT and IP metadata and upload to IPFS
Args:
image_uri: IPFS URI of the uploaded image
name: Name of the NFT/IP
description: Description of the NFT/IP
attributes: Optional list of attribute dictionaries
Returns:
dict: Both metadata URIs and their hashes
"""
if not self.ipfs_enabled:
raise Exception(
"IPFS functions are disabled. Please provide PINATA_JWT environment variable."
)
try:
# Get image hash if it's a URL
if image_uri.startswith("http"):
image_hash = self._get_file_hash(image_uri)
else:
# For IPFS URIs, extract hash from URI
image_hash = image_uri.replace("ipfs://", "")
# Create NFT metadata (standard ERC721 format)
nft_metadata = {
"name": name,
"description": description,
"image": image_uri,
"attributes": attributes or [],
}
# Create IP metadata following Story Protocol standard
ip_metadata = {
"title": name,
"description": description,
"createdAt": int(time.time()),
"image": image_uri,
"imageHash": f"0x{image_hash}", # Add 0x prefix
"mediaUrl": image_uri,
"mediaHash": f"0x{image_hash}", # Same as imageHash since they point to same file
"mediaType": "image/png", # Adjust based on actual image type
}
# Upload NFT metadata to IPFS
nft_response = requests.post(
"https://api.pinata.cloud/pinning/pinJSONToIPFS",
json=nft_metadata,
headers={
"Authorization": f"Bearer {self.pinata_jwt}",
"Content-Type": "application/json",
},
)
if nft_response.status_code != 200:
raise Exception(f"Failed to upload NFT metadata: {nft_response.text}")
nft_metadata_uri = f"ipfs://{nft_response.json()['IpfsHash']}"
# Upload IP metadata to IPFS
ip_response = requests.post(
"https://api.pinata.cloud/pinning/pinJSONToIPFS",
json=ip_metadata,
headers={
"Authorization": f"Bearer {self.pinata_jwt}",
"Content-Type": "application/json",
},
)
if ip_response.status_code != 200:
raise Exception(f"Failed to upload IP metadata: {ip_response.text}")
ip_metadata_uri = f"ipfs://{ip_response.json()['IpfsHash']}"
# Generate hashes of the metadata JSONs
nft_metadata_hash = self.web3.keccak(
text=json.dumps(nft_metadata, sort_keys=True)
)
ip_metadata_hash = self.web3.keccak(
text=json.dumps(ip_metadata, sort_keys=True)
)
# Create metadata structure for registration
registration_metadata = {
"ip_metadata_uri": ip_metadata_uri,
"ip_metadata_hash": ip_metadata_hash.hex(),
"nft_metadata_uri": nft_metadata_uri,
"nft_metadata_hash": nft_metadata_hash.hex(),
}
return {
"nft_metadata": nft_metadata,
"nft_metadata_uri": nft_metadata_uri,
"nft_metadata_hash": nft_metadata_hash.hex(),
"ip_metadata": ip_metadata,
"ip_metadata_uri": ip_metadata_uri,
"ip_metadata_hash": ip_metadata_hash.hex(),
"registration_metadata": registration_metadata,
}
except Exception as e:
print(f"Error creating metadata: {str(e)}")
raise
async def _get_file_hash(self, url: str) -> str:
"""
Get hash of a file from its URL using web3's keccak
Args:
url: URL of the image/media file
Returns:
str: Hash in hex format without 0x prefix
"""
response = requests.get(url)
if response.status_code != 200:
raise Exception(f"Failed to download file: {response.text}")
# Hash the raw bytes using web3's keccak
file_hash = self.web3.keccak(response.content)
return file_hash.hex()[2:] # Remove 0x prefix
def mint_and_register_ip_with_terms(
self,
commercial_rev_share: int,
derivatives_allowed: bool,
registration_metadata: dict = None,
recipient: str = None,
spg_nft_contract: str = None,
) -> dict:
"""
Mint an NFT, register it as an IP Asset, and attach PIL terms.
Args:
commercial_rev_share: Percentage of revenue share (0-100)
derivatives_allowed: Whether derivatives are allowed
registration_metadata: Optional dict containing full metadata structure
recipient: Optional recipient address or domain name (defaults to sender)
spg_nft_contract: Optional SPG NFT contract address (defaults to network-specific default)
"""
try:
# Resolve recipient address if provided
resolved_recipient = (
self.address_resolver.resolve_address(recipient)
if recipient
else self.account.address
)
# Use default SPG NFT contract if none provided
if spg_nft_contract is None:
spg_nft_contract = self.contracts["SPG_NFT"]
# Use the royalty policy from the contracts dictionary
royalty_policy = self.contracts["RoyaltyPolicyLAP"]
# Create terms matching our working structure
terms = [
{
"terms": {
"transferable": True,
"royalty_policy": royalty_policy,
"default_minting_fee": 0,
"expiration": 0,
"commercial_use": commercial_rev_share > 0,
"commercial_attribution": False,
"commercializer_checker": "0x0000000000000000000000000000000000000000",
"commercializer_checker_data": "0x0000000000000000000000000000000000000000",
"commercial_rev_share": commercial_rev_share,
"commercial_rev_ceiling": 0,
"derivatives_allowed": derivatives_allowed,
"derivatives_attribution": derivatives_allowed,
"derivatives_approval": False,
"derivatives_reciprocal": derivatives_allowed,
"derivative_rev_ceiling": 0,
"currency": "0x1514000000000000000000000000000000000000",
"uri": "",
},
"licensing_config": {
"is_set": True,
"minting_fee": 0,
"hook_data": "",
"licensing_hook": "0x0000000000000000000000000000000000000000",
"commercial_rev_share": commercial_rev_share,
"disabled": False,
"expect_minimum_group_reward_share": 0,
"expect_group_reward_pool": "0x0000000000000000000000000000000000000000",
},
}
]
# Build kwargs for mintAndRegisterIpAssetWithPilTerms
kwargs = {
"spg_nft_contract": spg_nft_contract,
"terms": terms,
"recipient": resolved_recipient,
"allow_duplicates": True,
}
# Only add ip_metadata if registration_metadata is provided
if registration_metadata:
kwargs["ip_metadata"] = registration_metadata
response = self.client.IPAsset.mintAndRegisterIpAssetWithPilTerms(**kwargs)
return {
"txHash": response.get("txHash"),
"ipId": response.get("ipId"),
"tokenId": response.get("tokenId"),
"licenseTermsIds": response.get("licenseTermsIds"),
}
except Exception as e:
print(f"Error in mint_and_register_ip_with_terms: {str(e)}")
raise
def create_spg_nft_collection(
self,
name: str,
symbol: str,
is_public_minting: bool = True,
mint_open: bool = True,
mint_fee_recipient: str = None,
contract_uri: str = "",
base_uri: str = "",
max_supply: int = None,
mint_fee: int = None,
mint_fee_token: str = None,
owner: str = None,
) -> dict:
"""
Create a new SPG NFT collection that can be used for minting and registering IP assets.
Args:
name: (REQUIRED) Name of the NFT collection
symbol: (REQUIRED) Symbol for the NFT collection
is_public_minting: (OPTIONAL, default=True) Whether anyone can mint NFTs from this collection
mint_open: (OPTIONAL, default=True) Whether minting is currently enabled
mint_fee_recipient: (OPTIONAL) Address to receive minting fees (defaults to zero address)
contract_uri: (OPTIONAL) URI for the collection metadata (ERC-7572 standard)
base_uri: (OPTIONAL) Base URI for the collection. If not empty, tokenURI will be either
baseURI + token ID or baseURI + nftMetadataURI
max_supply: (OPTIONAL) Maximum supply of the collection (defaults to unlimited)
mint_fee: (OPTIONAL) Cost to mint a token (defaults to 0)
mint_fee_token: (OPTIONAL) Token address used for minting fees (defaults to native token)
owner: (OPTIONAL) Owner address of the collection (defaults to sender)
Returns:
dict: Information about the created collection including:
- tx_hash: Transaction hash
- spg_nft_contract: Address of the created collection
Note:
The underlying SDK supports additional transaction options (tx_options)
which are intentionally not exposed here as they're too low-level for agent interfaces.
"""
try:
# Default mint_fee_recipient to zero address if not provided
if mint_fee_recipient is None:
mint_fee_recipient = "0x0000000000000000000000000000000000000000"
else:
# Resolve the address if it's a domain name
mint_fee_recipient = self.address_resolver.resolve_address(
mint_fee_recipient
)
# Resolve owner address if provided
if owner:
owner = self.address_resolver.resolve_address(owner)
# Use the manually initialized NFTClient instead of client.NFT
response = self.nft_client.createNFTCollection(
name=name,
symbol=symbol,
is_public_minting=is_public_minting,
mint_open=mint_open,
mint_fee_recipient=mint_fee_recipient,
contract_uri=contract_uri,
base_uri=base_uri,
max_supply=max_supply,
mint_fee=mint_fee,
mint_fee_token=mint_fee_token,
owner=owner,
tx_options=None, # Always use default transaction options
)
return {
"tx_hash": response.get("txHash"),
"spg_nft_contract": response.get("nftContract"),
}
except Exception as e:
print(f"Error creating SPG NFT collection: {str(e)}")
raise
# def register_pil_terms(
# self,
# transferable: bool = False,
# commercial_use: bool = False,
# derivatives_allowed: bool = False,
# default_minting_fee: int = 92
# ) -> dict:
# """Register new PIL terms with customizable parameters."""
# response = self.client.License.registerPILTerms(
# transferable=transferable,
# royalty_policy=self.web3.to_checksum_address("0x0000000000000000000000000000000000000000"),
# default_minting_fee=default_minting_fee,
# expiration=0,
# commercial_use=commercial_use,
# commercial_attribution=False,
# commercializer_checker=self.web3.to_checksum_address("0x0000000000000000000000000000000000000000"),
# commercializer_checker_data="0x",
# commercial_rev_share=0,
# commercial_rev_ceiling=0,
# derivatives_allowed=derivatives_allowed,
# derivatives_attribution=False,
# derivatives_approval=False,
# derivatives_reciprocal=False,
# derivative_rev_ceiling=0,
# currency=self.web3.to_checksum_address("0x0000000000000000000000000000000000000000"),
# uri=""
# )
# return response
# def register_non_commercial_social_remixing_pil(self) -> dict:
# """Register a non-commercial social remixing PIL license."""
# return self.client.License.registerNonComSocialRemixingPIL()
# TODO: don't need this function for now
# def register_ip_asset(self, nft_contract: str, token_id: int, metadata: dict) -> dict:
# """
# Register an NFT as an IP Asset with metadata
# :param nft_contract: NFT contract address
# :param token_id: Token ID of the NFT
# :param metadata: IP Asset metadata following Story Protocol standard
# :return: Transaction details
# """
# try:
# # Using the IPAsset module from the SDK
# response = self.client.IPAsset.registerRootIP(
# nftContract=self.web3.to_checksum_address(nft_contract),
# tokenId=token_id,
# metadata=metadata
# )
# return response
# except Exception as e:
# print(f"Error registering IP asset: {str(e)}")
# raise
# def attach_license_terms(self, ip_id: str, license_terms_id: int) -> dict:
# """
# Attach a licensing policy to an IP Asset
# :param ip_id: IP Asset ID
# :param license_terms_id: License terms ID to attach
# :return: Transaction details
# """
# try:
# # Using the License module from the SDK
# response = self.client.License.addPolicyToIp(
# ipId=ip_id,
# licenseTermsId=license_terms_id
# )
# return response
# except Exception as e:
# print(f"Error attaching license terms: {str(e)}")
# raise
# TODO: keep this function and test for now - pass in spg nft contract. image url -> upload to ipfs -> create metadata -> mint and register nft
# def mint_and_register_nft(self, to_address: str, metadata_uri: str, ip_metadata: dict) -> dict:
# """
# Mint an NFT and register it as IP in one transaction
# :param to_address: Recipient's address
# :param metadata_uri: URI for the NFT metadata
# :param ip_metadata: IP Asset metadata following Story Protocol standard
# :return: Transaction details
# """
# try:
# # Using the IPAsset module's combined mint and register function
# response = self.client.IPAsset.mintAndRegisterRootIP(
# recipient=self.web3.to_checksum_address(to_address),
# tokenURI=metadata_uri,
# metadata=ip_metadata
# )
# return response
# except Exception as e:
# print(f"Error minting and registering NFT: {str(e)}")
# raise