Skip to main content
Glama
MCP-Mirror

Beeper MCP Server

by MCP-Mirror
beeper.py17.8 kB
import time import json import pkgutil import logging import os import yaml from web3 import Web3 from web3.contract.contract import ContractFunction from web3.constants import ADDRESS_ZERO from web3.types import (TxParams, Wei) from .utils import ( _load_contract_erc20, get_address_from_private_key) from typing import ( Optional, ) logger = logging.getLogger(__name__) def load_config(): config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') with open(config_path, 'r') as f: return yaml.safe_load(f) class Beeper: def __init__(self, chain_type: str = 'bsc-testnet', private_key: str = None ): if len(private_key) != 64 and len(private_key) != 66: raise ValueError("Private key is required") config = load_config() chain_config = config['chain'][chain_type] self.private_key = private_key self.chain_id = chain_config['chain_id'] self.web3 = Web3(Web3.HTTPProvider(chain_config['rpc_url'])) if not self.web3.is_connected(): raise ValueError("Failed to connect to the chain") self.max_approval_int = int(f"0x{64 * 'f'}", 16) self.max_approval_check_int = int( f"0x{15 * '0'}{49 * 'f'}", 16) router_address = Web3.to_checksum_address(chain_config['pancake_v3_swap_router_addr']) router_abi = pkgutil.get_data('chain', 'solc/pancake_swaprouter_v3.abi').decode() self.router = self.web3.eth.contract(address=router_address, abi=router_abi) factory_address = Web3.to_checksum_address(chain_config['pancake_v3_factory_addr']) factory_abi = pkgutil.get_data('chain', 'solc/pancake_factory_v3.abi').decode() self.factory = self.web3.eth.contract(address=factory_address, abi=factory_abi) beeper_util_address = Web3.to_checksum_address(chain_config['beeper_util_addr']) beeper_util_abi = json.loads(pkgutil.get_data('chain', 'solc/Util.sol/Util.json').decode()) self.beeper_util = self.web3.eth.contract(address=beeper_util_address, abi=beeper_util_abi ['abi']) beeper_address = Web3.to_checksum_address(chain_config['beeper_addr']) beeper_abi = json.loads(pkgutil.get_data('chain', 'solc/Beeper.sol/Beeper.json').decode()) self.beeper = self.web3.eth.contract(address=beeper_address, abi=beeper_abi['abi']) def get_balance(self, wallet_address: str, token_address :str ) -> int: wallet_address = Web3.to_checksum_address(wallet_address) if token_address == "": balance = self.web3.eth.get_balance(wallet_address) else: balance = self._get_erc20_balance(wallet_address, token_address) return balance def _get_erc20_balance(self, wallet_address: str, token_address :str ) -> int: wallet_address = Web3.to_checksum_address(wallet_address) if token_address == "": balance = self.web3.eth.get_balance(wallet_address) else: token_address = Web3.to_checksum_address(token_address) balance = _load_contract_erc20(self.web3, token_address).functions.balanceOf(wallet_address).call() return balance def deploy_token(self, image: str = 'https://twitter.com/', token_name : str = 'Beeper', token_symbol: str = 'Power by Beeper', token_supply: int = 10000000000 * 10**18, initial_tick: int = -207400, fee: int = 2500, #1%; 500:0.05%; 2500:0.25% buy_fee: int = 2500, ) -> tuple[str,str,str]: if self.private_key == "": raise Exception(f"No admin private key") user_addr = self.get_address_from_private_key() wbnb = self.router.functions.WETH9().call() try: generated_salt, token_address = self.beeper_util.functions.generateSalt(user_addr, 0, token_name, token_symbol, image, "", token_supply, wbnb).call() logger.info(f"Salt: {generated_salt.hex()} {token_address}") except Exception as e: raise e pool_config =[initial_tick, wbnb, buy_fee] return self._build_and_send_tx( self.private_key, self.beeper.functions.deployToken( token_name, token_symbol, token_supply, fee, generated_salt, user_addr, 0, image, "", pool_config ), self._get_tx_params(address=user_addr, gas=9_000_000), ), token_address, token_supply def make_trade(self, input_token :str = None, output_token :str = None, amount: int = 1000000, fee: int = 10000 # deploy same setting ): if input_token is None : return self._native_to_token( self.get_address_from_private_key(), self.private_key, output_token, amount, fee ) elif output_token is None: return self._token_to_native( self.get_address_from_private_key(), self.private_key, input_token, amount, fee ) else: return self._token_to_token(self.get_address_from_private_key(), self.private_key, input_token,output_token, amount, fee) def _native_to_token(self, wallet_address: str, private_key: str, token_address :str, amount: int = 1000000, fee: int = 10000 # deploy same setting ): wallet_address = Web3.to_checksum_address(wallet_address) token_address = Web3.to_checksum_address(token_address) pool_fees = [10000, 2500, 500, 100] for pool_fee in pool_fees: pool_addr = self.get_token_pool(token_address, pool_fee) if pool_addr != ADDRESS_ZERO: logger.debug(f"pool addr at: {pool_addr} {pool_fee}") fee = pool_fee break return self._build_and_send_tx( private_key, self.router.functions.exactInputSingle( { "tokenIn": self.router.functions.WETH9().call(), "tokenOut": token_address, "fee": fee, "recipient": wallet_address, "deadline": int(time.time()) + 3600, "amountIn": amount, "amountOutMinimum": 0, "sqrtPriceLimitX96": 0, } ), self._get_tx_params(address=wallet_address,value=amount), ) def _token_to_native(self, wallet_address: str, private_key: str, token_address :str, amount: int = 1000000, fee: int = 10000 # deploy same setting ): wallet_address = Web3.to_checksum_address(wallet_address) token_address = Web3.to_checksum_address(token_address) pool_fees = [10000, 2500, 500, 100] for pool_fee in pool_fees: pool_addr = self.get_token_pool(token_address, pool_fee) if pool_addr != ADDRESS_ZERO: print(f"pool addr at: {pool_addr} {pool_fee}") fee = pool_fee break self._check_approval(wallet_address, token_address) print(f"Seller: {amount} {token_address} to bnb...") swap_data = self.router.encode_abi( "exactInputSingle", args=[ ( token_address, self.router.functions.WETH9().call(), fee, ADDRESS_ZERO, int(time.time()) + 3600, amount, 0, 0, ) ], ) unwrap_data = self.router.encode_abi( "unwrapWETH9", args=[0, wallet_address] ) return self._build_and_send_tx( private_key, self.router.functions.multicall([swap_data, unwrap_data]), self._get_tx_params(address=wallet_address), ) def _token_to_token(self, wallet_address: str, private_key: str, input_token :str, output_token :str, amount: int = 1000000, fee: int = 10000 ): wallet_address = Web3.to_checksum_address(wallet_address) input_token = Web3.to_checksum_address(input_token) output_token = Web3.to_checksum_address(output_token) wbnb_address = Web3.to_checksum_address(self.router.functions.WETH9().call()) if input_token != wbnb_address and output_token != wbnb_address: return self._token_to_token_via_hop(wallet_address, private_key, input_token, output_token, amount, fee) self._check_approval(wallet_address, input_token) return self._build_and_send_tx( private_key, self.router.functions.exactInputSingle( { "tokenIn": input_token, "tokenOut": output_token, "fee": fee, "recipient": wallet_address, "deadline": int(time.time()) + 3600, "amountIn": amount, "amountOutMinimum": 0, "sqrtPriceLimitX96": 0, } ), self._get_tx_params(address=wallet_address), ) def _token_to_token_via_hop(self, wallet_address: str, input_token :str, output_token :str, amount: int = 1000000, fee: int = 10000 ): wallet_address = Web3.to_checksum_address(wallet_address) input_token = Web3.to_checksum_address(input_token) output_token = Web3.to_checksum_address(output_token) self._check_approval(wallet_address, self.private_key, input_token) wbnb_address = self.router.functions.WETH9().call() tokens = [input_token, wbnb_address, output_token] #fees = [fee, fee] fees = [] # pancake fee: 100, 500, 2500, 10000 pool_fees = [10000, 2500, 500, 100] for fee in pool_fees: pool_addr = self.get_token_pool(input_token, fee) if pool_addr != ADDRESS_ZERO: logger.debug(f"pool addr at: {pool_addr} {fee}") fees.append(fee) break if len(fees) != 1: raise Exception(f"No pair for input token or not paired with wbnb") for fee in pool_fees: pool_addr = self.get_token_pool(output_token, fee) if pool_addr != ADDRESS_ZERO: logger.debug(f"pool addr at: {pool_addr} {fee}") fees.append(fee) break if len(fees) != 2: raise Exception(f"No pair for output token or not paired with wbnb") path = self._encode_path(tokens, fees) return self._build_and_send_tx( self.private_key, self.router.functions.exactInput( { "path": path, "recipient": wallet_address, "deadline": int(time.time()) + 3600, "amountIn": amount, "amountOutMinimum": 0, } ), self._get_tx_params(address=wallet_address), ) def _check_approval(self, wallet_address: str, token_address: str): token_address = Web3.to_checksum_address(token_address) is_approved = self._is_approved(wallet_address, token_address) if not is_approved: self.approve(wallet_address, self.private_key, token_address) logger.warning(f"Approved {token_address}: {is_approved}") def _build_and_send_tx( self, private_key: str, function: ContractFunction, tx_params: TxParams ) : """Build and send a transaction.""" transaction = function.build_transaction(tx_params) try: signed_txn = self.web3.eth.account.sign_transaction(transaction, private_key) rawTX = signed_txn.raw_transaction tx_hash = self.web3.eth.send_raw_transaction(rawTX) tx_receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash) if tx_receipt['status'] == 0: logger.error("Transaction failed") self._display_cause(tx_hash) else: logger.debug(f"Transaction succeeded:: {tx_hash.hex()}\nnonce: {tx_params['nonce']}") return "0x"+str(tx_hash.hex()) except Exception as e: raise e def transfer_asset(self, received_address :str, token_address :str, amount: int = 1000000 ): if token_address == "": return self._transfer(self.get_address_from_private_key(), self.private_key, received_address, amount) else: return self._transfer_token(self.get_address_from_private_key(), self.private_key, received_address, token_address, amount) def _transfer_token(self, wallet_address: str, private_key: str, received_address :str, token_address :str, amount: int = 1000000 ): wallet_address = Web3.to_checksum_address(wallet_address) received_address = Web3.to_checksum_address(received_address) token_address = Web3.to_checksum_address(token_address) token = _load_contract_erc20(self.web3, token_address) return self._build_and_send_tx( private_key, token.functions.transfer(received_address, amount), self._get_tx_params(address=wallet_address), ) def _transfer(self, wallet_address: str, private_key: str, received_address :str, amount: int = 1000000 ): wallet_address = Web3.to_checksum_address(wallet_address) received_address = Web3.to_checksum_address(received_address) transaction = { 'chainId': self.chain_id, "to": received_address, "nonce": self.web3.eth.get_transaction_count(wallet_address), "gas": 100_000, "gasPrice": self.web3.eth.gas_price, "value": amount, } try: signed_txn = self.web3.eth.account.sign_transaction(transaction, private_key) rawTX = signed_txn.raw_transaction tx_hash = self.web3.eth.send_raw_transaction(rawTX) tx_receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash) if tx_receipt['status'] == 0: logger.error("Transfer transaction failed") self._display_cause(tx_hash) else: logger.debug(f'Transfer transaction succeeded: {tx_hash.hex()}') return "0x"+str(tx_hash.hex()) except Exception as e: raise e def _display_cause(self, tx_hash: str): print(f"check: {tx_hash.hex()}") tx = self.web3.eth.get_transaction(tx_hash) replay_tx = { 'to': tx['to'], 'from': tx['from'], 'value': tx['value'], 'data': tx['input'], } try: self.web3.eth.call(replay_tx, tx.blockNumber - 1) except Exception as e: print(e) raise e def _get_tx_params( self, address : str, value: Wei = Wei(0), gas: Optional[Wei] = None ) -> TxParams: """Get generic transaction parameters.""" params: TxParams = { "from": address, "value": value, "nonce": self.web3.eth.get_transaction_count(address) , "gasPrice": self.web3.eth.gas_price, "gas": 300_000, } if gas: params["gas"] = gas return params def get_address_from_private_key(self) -> str: return get_address_from_private_key(self.private_key) def get_token_pool(self, token_address :str, fee: int = 10000, ) -> str: token_address = Web3.to_checksum_address(token_address) wbnb_address = self.router.functions.WETH9().call() return self.factory.functions.getPool(token_address, wbnb_address, fee).call() def get_token_symbol(self, token_address: str) -> str: try: if len(token_address) != 42: return "BNB" token_address = Web3.to_checksum_address(token_address) token_contract = _load_contract_erc20(self.web3, token_address) return token_contract.functions.symbol().call() except Exception as e: raise ValueError(f"fail to get symbol: {str(e)}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MCP-Mirror/stopWarByWar_beeper-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server