Skip to main content
Glama
company_operation_op.py4.13 kB
import asyncio import json from loguru import logger from flowllm.context.service_context import C from flowllm.enumeration.role import Role from flowllm.op.base_async_tool_op import BaseAsyncToolOp from flowllm.op.crawl.crawl4ai_op import Crawl4aiOp from flowllm.op.search.mcp_search_op import TongyiMcpSearchOp from flowllm.schema.message import Message from flowllm.schema.tool_call import ToolCall from flowllm.utils.common_utils import extract_content @C.register_op(register_app="FlowLLM") class CompanyOperationOp(BaseAsyncToolOp): file_path: str = __file__ def __init__( self, # llm: str = "qwen3_30b_instruct", llm: str = "qwen3_max_instruct", min_content_length: int = 5000, **kwargs, ): super().__init__(llm=llm, **kwargs) self.min_content_length = min_content_length def build_tool_call(self) -> ToolCall: return ToolCall( **{ "description": "获取公司的主营业务信息", "input_schema": { "name": { "type": "string", "description": "公司名称", "required": True, }, "code": { "type": "string", "description": "股票代码", "required": True, }, }, }, ) async def async_execute(self): name = self.input_dict["name"] code = self.input_dict["code"] # 爬取同花顺经营分析页面 crawl_op = Crawl4aiOp() await crawl_op.async_call(url=f"https://basic.10jqka.com.cn/{code}/operate.html#stockpage") # 判断爬取内容是否充分 content = crawl_op.output if len(content) < self.min_content_length: logger.warning(f"爬取内容不足({len(content)}字符),启用网页搜索补充") # 并行搜索营收和利润信息 ty_op1 = TongyiMcpSearchOp() ty_op2 = TongyiMcpSearchOp() self.submit_async_task(ty_op1.async_call, query=f"{name} {code} 最新财报 营收占比") await asyncio.sleep(1) self.submit_async_task(ty_op2.async_call, query=f"{name} {code} 最新财报 利润占比") await self.join_async_task() content = f"# 网页搜索结果\n\n## 营收信息\n{ty_op1.output}\n\n## 利润信息\n{ty_op2.output}" logger.info(f"最终内容长度: {len(content)}字符") # 提取业务板块信息 prompt = self.prompt_format( prompt_name="extract_operation_prompt", name=name, code=code, content=content, ) def extract_json_callback(message: Message): result = extract_content(message.content, "json") if result is None: logger.warning("JSON解析失败,返回空列表") return [] if not isinstance(result, list): logger.warning(f"返回结果非列表类型: {type(result)}") return [] return result operations = await self.llm.achat( messages=[Message(role=Role.USER, content=prompt)], callback_fn=extract_json_callback, ) logger.info(f"提取业务板块数量: {len(operations)}") self.set_result(json.dumps(operations, ensure_ascii=False, indent=2)) async def main(): from flowllm.app import FlowLLMApp async with FlowLLMApp(args=["config=fin_research"]): # 测试案例 test_cases = [ # ("紫金矿业", "601899"), ("小米集团", "01810"), # ("阿里巴巴", "09988"), ] for name, code in test_cases: logger.info(f"\n{'=' * 50}\n测试: {name}({code})\n{'=' * 50}") op = CompanyOperationOp() await op.async_call(code=code, name=name) logger.info(f"\n结果:\n{op.output}") if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FlowLLM-AI/finance-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server