yeepay_yop_download_cert
Download and save CFCA certificates using key algorithm, serial number, auth code, asymmetric key pairs, and password. Returns paths for private (.pfx) and public (.cer) certificates.
Instructions
根据密钥算法、CFCA证书的序列号、授权码、非对称密钥对(公钥和私钥)、密码,下载该证书,并保存到本地路径
Args: algorithm: 密钥算法,可选值为 "RSA" 或 "SM2",默认为 "RSA" serial_no: cfca证书序列号 auth_code: cfca证书授权码 private_key: Base64 编码后的私钥字符串 public_key: Base64 编码后的公钥字符串 pwd: 密码,长度:12~16位
Returns: Dict包含: - message: 响应信息 - pfxCert: 私钥证书路径(.pfx) - pubCert: 公钥证书路径(.cer)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| algorithm | No | RSA | |
| auth_code | No | ||
| private_key | No | ||
| public_key | No | ||
| pwd | No | ||
| serial_no | No |
Implementation Reference
- yop_mcp/main.py:215-249 (handler)Handler function for the 'yeepay_yop_download_cert' tool. Decorated with @mcp.tool() for registration. Defines input/output schema via type hints, defaults, and docstring. Delegates to core download_cert helper.@mcp.tool() def yeepay_yop_download_cert( # pylint: disable=too-many-arguments,too-many-positional-arguments algorithm: str = "RSA", serial_no: str = "", auth_code: str = "", private_key: str = "", public_key: str = "", pwd: str = "", ) -> Dict[str, Any]: """ 根据密钥算法、CFCA证书的序列号、授权码、非对称密钥对(公钥和私钥)、密码,下载该证书,并保存到本地路径 Args: algorithm: 密钥算法,可选值为 "RSA" 或 "SM2",默认为 "RSA" serial_no: cfca证书序列号 auth_code: cfca证书授权码 private_key: Base64 编码后的私钥字符串 public_key: Base64 编码后的公钥字符串 pwd: 密码,长度:12~16位 Returns: Dict包含: - message: 响应信息 - pfxCert: 私钥证书路径(.pfx) - pubCert: 公钥证书路径(.cer) """ return download_cert( algorithm=algorithm, serial_no=serial_no, auth_code=auth_code, private_key=private_key, public_key=public_key, pwd=pwd, )
- yop_mcp/main.py:215-215 (registration)The @mcp.tool() decorator registers the yeepay_yop_download_cert function as an MCP tool.@mcp.tool()
- yop_mcp/main.py:224-240 (schema)Input/output schema defined in the docstring of the handler, matching the function parameters.""" 根据密钥算法、CFCA证书的序列号、授权码、非对称密钥对(公钥和私钥)、密码,下载该证书,并保存到本地路径 Args: algorithm: 密钥算法,可选值为 "RSA" 或 "SM2",默认为 "RSA" serial_no: cfca证书序列号 auth_code: cfca证书授权码 private_key: Base64 编码后的私钥字符串 public_key: Base64 编码后的公钥字符串 pwd: 密码,长度:12~16位 Returns: Dict包含: - message: 响应信息 - pfxCert: 私钥证书路径(.pfx) - pubCert: 公钥证书路径(.cer) """
- tools/cert_utils.py:460-548 (helper)Core helper function implementing the certificate download, validation, P10 request generation, and file saving logic.def download_cert( algorithm: str = "RSA", serial_no: str = "", auth_code: str = "", private_key: str = "", public_key: str = "", pwd: str = "", ) -> Dict[str, Any]: # 确定密钥类型 key_type = KeyType.SM2 if algorithm.upper() == "SM2" else KeyType.RSA2048 # 检查输入参数 check_result = CertUtils.check_input( serial_no, auth_code, key_type, private_key, public_key, pwd ) if not check_result.result: return {"message": check_result.msg} # 检查公私钥匹配 p10_generated = False # 标记是否已生成P10请求 try: if not p10_generated and not CertUtils.check_key( private_key, public_key, key_type ): return {"message": "商户公私钥不匹配,请重新输入"} except Exception as e: return {"message": f"密钥解析异常: {str(e)}"} # 生成证书请求 if p10_generated: cert_req = private_key else: try: cert_req = CertUtils.gen_p10(private_key, public_key, key_type) except Exception as e: return {"message": f"生成证书请求失败: {str(e)}"} # 确定证书保存路径 cert_path = ( Config.SM2_CERT_SAVE_PATH if key_type == KeyType.SM2 else Config.RSA_CERT_SAVE_PATH ) pri_cert_path = os.path.join(cert_path, f"{serial_no}.pfx") pub_cert_path = os.path.join(cert_path, f"{serial_no}.cer") # 检查证书是否已存在 if SupportUtil.is_file_exists(pri_cert_path) and SupportUtil.is_file_exists( pub_cert_path ): return { "message": "本地证书已存在", "pfxCert": pri_cert_path, "pubCert": pub_cert_path, } try: # 获取证书 cert: Optional[str] = None if SupportUtil.is_file_exists(pub_cert_path): cert = SupportUtil.read_file_as_string(pub_cert_path) else: cert_download_result = CertUtils.download_cert_from_cfca( serial_no, auth_code, cert_req ) if cert_download_result.error_msg: return {"message": cert_download_result.error_msg} cert = cert_download_result.cert # 检查证书与私钥匹配 if cert and not CertUtils.check_cert(private_key, cert, key_type): return {"message": "证书已下载过,且证书与输入的私钥不匹配,请核对"} # 保存证书 if cert: pub_cert_path = CertUtils.make_pub_cert(cert, serial_no, cert_path) if not p10_generated and cert: pri_cert_path = CertUtils.make_pfx_cert( private_key, cert, key_type, pwd, serial_no, cert_path ) return { "message": "CFCA证书激活并下载成功", "pfxCert": pri_cert_path, "pubCert": pub_cert_path, } except Exception as e: return {"message": f"系统异常,请稍后重试: {str(e)}"}
- tools/cert_utils.py:376-411 (helper)Low-level helper for downloading certificate from CFCA API using HTTP request.def download_cert_from_cfca( serial_no: str, auth_code: str, cert_req: str ) -> CertDownloadResult: try: # 准备请求数据 param = { "serialNo": serial_no, "authCode": auth_code, "certReq": cert_req, "toolsVersion": Config.TOOLS_VERSION, } # 发送请求到CFCA API headers = {} headers["Authorization"] = "Basic " + base64.b64encode( Config.BASIC.encode("utf-8") ).decode("utf-8") response = HttpUtils.get_response( Config.CFCA_CERT_DOWNLOAD_URL, param, headers ) map_data = JsonUtils.json_to_pojo(response, dict) if map_data.get("code") == "000000": data_map = map_data.get("data") return CertDownloadResult().with_cert( "-----BEGIN CERTIFICATE-----\n" + data_map.get("cert") + "\n-----END CERTIFICATE-----" ) else: return CertDownloadResult().with_error_msg(map_data.get("message")) except Exception as e: return CertDownloadResult(error_msg=f"下载证书失败: {str(e)}")