Skip to main content
Glama
company_factor_op.py10.6 kB
import asyncio import json import re from typing import List, Dict from loguru import logger from flowllm.context.service_context import C from flowllm.enumeration.role import Role from flowllm.op.base_async_tool_op import BaseAsyncToolOp from flowllm.schema.message import Message from flowllm.schema.tool_call import ToolCall from flowllm.utils.common_utils import extract_content @C.register_op(register_app="FlowLLM") class CompanyFactorOp(BaseAsyncToolOp): file_path: str = __file__ def __init__( self, llm: str = "qwen3_max_instruct", # llm: str = "qwen3_30b_instruct", # llm: str = "qwen3_80b_instruct", # llm: str = "qwen25_max_instruct", revenue_threshold: float = 0.05, profit_threshold: float = 0.05, max_segments: int = 3, save_answer: bool = True, **kwargs, ): super().__init__(llm=llm, save_answer=save_answer, **kwargs) self.revenue_threshold: float = revenue_threshold self.profit_threshold: float = profit_threshold self.max_segments: int = max_segments def build_tool_call(self) -> ToolCall: return ToolCall( **{ "description": "构建公司整体估值的金融因子逻辑图,综合所有业务板块的因子分析", "input_schema": { "name": { "type": "string", "description": "公司名称", "required": True, }, "code": { "type": "string", "description": "股票代码", "required": True, }, }, }, ) async def async_execute(self): name = self.input_dict["name"] code = self.input_dict["code"] logger.info(f"开始为公司 {name}({code}) 构建金融因子逻辑图") # Step 1: 获取业务板块信息 segments = await self._get_company_segments(name, code) logger.info(f"获取到 {len(segments)} 个业务板块") # Step 2: 对每个业务板块调用 CompanySegmentFactorOp segment_results = await self._analyze_segments(name, segments) logger.info(f"完成 {len(segment_results)} 个业务板块的因子分析") # Step 3: 合并所有Meta信息列表 merged_meta_list = await self._merge_meta_lists(segment_results) logger.info(f"合并后的Meta信息数量: {len(merged_meta_list)}") # Step 4: 融合所有因子逻辑图 merged_graph = await self._merge_factor_graphs(name, merged_meta_list, segment_results) logger.info(f"完成因子逻辑图融合") # Step 5: 清理因子节点中的特殊字符 cleaned_graph = self._clean_factor_nodes(merged_graph) logger.info(f"完成因子节点清理") # Step 6: 构建最终结果 result = { "name": name, "code": code, "segments": [s["name"] for s in segments], "meta_list": merged_meta_list, "factor_graph": cleaned_graph, } self.set_result(json.dumps(result, ensure_ascii=False, indent=2)) async def _get_company_segments(self, name: str, code: str) -> List[Dict]: """获取并过滤公司业务板块""" from flowllm.op.fin_search.company_operation_op import CompanyOperationOp # 调用CompanyOperationOp获取业务板块 operation_op = CompanyOperationOp() await operation_op.async_call(name=name, code=code) # 解析返回结果 operations = json.loads(operation_op.output) logger.info(f"原始业务板块: {operations}") # 过滤营收和利润都低于阈值的板块 filtered_segments = [] for op in operations: revenue = op.get("revenue") profit = op.get("profit") # 至少有一个指标超过阈值,或者至少有一个指标有值 keep = False if revenue is not None and revenue >= self.revenue_threshold: keep = True if profit is not None and profit >= self.profit_threshold: keep = True # 如果revenue或profit为null,但另一个有值,也保留 if (revenue is None and profit is not None) or (profit is None and revenue is not None): keep = True if keep: filtered_segments.append(op) # 按重要性排序(优先revenue,其次profit) def sort_key(seg): r = seg.get("revenue") or 0 p = seg.get("profit") or 0 return -(r + p) # 降序 filtered_segments.sort(key=sort_key) # 最多保留max_segments个板块 result = filtered_segments[: self.max_segments] logger.info(f"过滤后保留 {len(result)} 个业务板块: {[s['name'] for s in result]}") return result async def _analyze_segments(self, name: str, segments: List[Dict]) -> List[Dict]: """串行调用CompanySegmentFactorOp分析每个业务板块""" from flowllm.op.fin_search.company_segment_factor_op import CompanySegmentFactorOp from flowllm.op.search.mcp_search_op import TongyiMcpSearchOp logger.info(f"开始串行分析 {len(segments)} 个业务板块") results = [] for i, segment in enumerate(segments): segment_name = segment["name"] logger.info(f"开始分析业务板块 {i + 1}/{len(segments)}: {segment_name}") try: # 创建搜索Op和因子分析Op search_op = TongyiMcpSearchOp() factor_op = CompanySegmentFactorOp() << search_op # 串行执行任务 await factor_op.async_call(name=name, segment=segment_name) # 解析返回结果(包含meta_list和mermaid_graph) factor_result = json.loads(factor_op.output) result = { "segment": segment["name"], "revenue": segment.get("revenue"), "profit": segment.get("profit"), "mermaid_graph": factor_result["mermaid_graph"], "meta_list": factor_result["meta_list"], } results.append(result) logger.info(f"板块 {segment_name} 分析成功") except Exception as e: logger.exception(f"板块 {segment_name} 分析失败: {e}") continue logger.info(f"完成 {len(results)}/{len(segments)} 个板块的分析") return results async def _merge_meta_lists(self, segment_results: List[Dict]) -> List[str]: """合并并去重所有业务板块的Meta信息列表""" # 收集所有meta信息 all_meta = [] for result in segment_results: all_meta.extend(result.get("meta_list", [])) if not all_meta: logger.warning("没有收集到任何Meta信息") return [] # 使用LLM进行合并去重和冲突解决 prompt = self.prompt_format( prompt_name="merge_meta_prompt", meta_lists=json.dumps(all_meta, ensure_ascii=False, indent=2), ) merged_list = await self.llm.achat( messages=[Message(role=Role.USER, content=prompt)], callback_fn=lambda msg: extract_content(msg.content, "json"), enable_stream_print=True, ) return merged_list async def _merge_factor_graphs( self, name: str, meta_list: List[str], segment_results: List[Dict], ) -> str: """融合所有业务板块的因子逻辑图""" # 收集所有segment的mermaid图 segment_graphs = [] for result in segment_results: segment_graphs.append( { "segment": result["segment"], "graph": result["mermaid_graph"], } ) # 使用LLM融合所有图 prompt = self.prompt_format( prompt_name="merge_graphs_prompt", name=name, meta_list=json.dumps(meta_list, ensure_ascii=False, indent=2), segment_graphs=json.dumps(segment_graphs, ensure_ascii=False, indent=2), ) merged_graph = await self.llm.achat( messages=[Message(role=Role.USER, content=prompt)], callback_fn=lambda msg: extract_content(msg.content, "mermaid"), enable_stream_print=True, ) return merged_graph def _clean_factor_nodes(self, mermaid_graph: str) -> str: """清理mermaid图中因子节点的特殊字符 查找所有形如 Ax[yyyy] 的因子节点,确保 yyyy 中不包含小括号、花括号、方括号 """ def clean_content(match): node_id = match.group(1) # Ax content = match.group(2) # yyyy # 移除所有括号:() {} [] cleaned_content = content.replace("(", "").replace(")", "") cleaned_content = cleaned_content.replace("{", "").replace("}", "") cleaned_content = cleaned_content.replace("[", "").replace("]", "") # 如果内容发生了变化,记录日志 if content != cleaned_content: logger.debug(f"清理因子节点: {node_id}[{content}] -> {node_id}[{cleaned_content}]") return f"{node_id}[{cleaned_content}]" # 匹配 Ax[...] 格式的因子节点(A后跟数字,方括号内是内容) # 使用非贪婪匹配,确保匹配到最近的右方括号 pattern = r"(A\d+)\[([^\]]+)\]" cleaned_graph = re.sub(pattern, clean_content, mermaid_graph) return cleaned_graph async def main(): from flowllm.app import FlowLLMApp async with FlowLLMApp(args=["config=fin_research"]): test_cases = [ ("紫金矿业", "601899"), # ("川投能源", "600674"), # ("兴业银锡", "000426"), # ("小米集团", "01810"), # ("阿里巴巴", "09988"), ] for name, code in test_cases: logger.info(f"\n{'=' * 60}\n测试: {name}({code})\n{'=' * 60}") op = CompanyFactorOp() await op.async_call(name=name, code=code) logger.info(f"\n最终结果:\n{op.output}") if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/FlowLLM-AI/finance-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server