mcp_server.py•19.4 kB
# -*- coding: utf-8 -*-
"""
2025年国考小助手 MCP 服务器
基于 FastMCP 框架构建的智能国考岗位查询与匹配系统
"""
import os
import logging
import json
from typing import List, Dict, Any, Optional
from fastmcp import FastMCP
import pandas as pd
from datetime import datetime
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("guokao_mcp.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 创建 FastMCP 服务器实例
mcp = FastMCP("guokao-helper")
# 全局数据存储
_df = None
_excel_path = None
def _get_excel_path():
"""获取 Excel 文件路径"""
global _excel_path
if _excel_path:
return _excel_path
# 尝试多个可能的路径
possible_paths = [
os.environ.get("GUOKAO_EXCEL_PATH", ""),
os.path.join(os.path.dirname(__file__), "2025年国考岗位表.xls"),
os.path.join(os.path.dirname(__file__), "data", "2025年国考岗位表.xls"),
"/code/2025年国考岗位表.xls", # 阿里云函数计算路径
"2025年国考岗位表.xls",
]
for path in possible_paths:
if path and os.path.exists(path):
_excel_path = path
logger.info(f"找到数据文件: {path}")
return path
logger.error("未找到国考岗位表数据文件")
raise FileNotFoundError("未找到 2025年国考岗位表.xls 文件,请设置 GUOKAO_EXCEL_PATH 环境变量")
def _load_data():
"""加载 Excel 数据"""
global _df
if _df is not None:
return _df
excel_path = _get_excel_path()
all_dfs = []
xls = pd.ExcelFile(excel_path)
for sheet_name in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet_name)
df['机关类别'] = sheet_name
all_dfs.append(df)
_df = pd.concat(all_dfs, ignore_index=True)
logger.info(f"数据加载成功!共 {len(_df)} 个岗位,招考 {int(_df['招考人数'].sum())} 人")
return _df
def _df_to_json(df: pd.DataFrame, max_results: int = 50) -> Dict[str, Any]:
"""将 DataFrame 转换为 JSON 格式"""
# 限制返回数量
if len(df) > max_results:
df = df.head(max_results)
truncated = True
else:
truncated = False
records = df.to_dict(orient='records')
# 处理 NaN 值
for record in records:
for key, value in record.items():
if pd.isna(value):
record[key] = None
return {
"total_positions": len(records),
"total_recruitment": int(df['招考人数'].sum()) if len(df) > 0 else 0,
"truncated": truncated,
"positions": records
}
# ========== MCP 工具定义 ==========
@mcp.tool()
async def filter_positions(
organ_type: Optional[str] = None,
department: Optional[str] = None,
location: Optional[str] = None,
education: Optional[str] = None,
political_status: Optional[str] = None,
work_years: Optional[str] = None,
major: Optional[str] = None,
exam_type: Optional[str] = None,
position_type: Optional[str] = None,
min_recruitment: Optional[int] = None,
max_recruitment: Optional[int] = None,
max_results: int = 30
) -> str:
"""
多条件智能筛选国考岗位
参数:
organ_type: 机关类别(精确匹配),可选值:
- "中央党群机关"
- "中央国家行政机关(本级)"
- "中央国家行政机关省级以下直属机构"
- "中央国家行政机关参照公务员法管理事业单位"
department: 部门名称关键词(模糊匹配),如"税务"、"海关"
location: 工作地点(模糊匹配),如"北京"、"广东"
education: 学历要求(模糊匹配),如"本科"、"硕士研究生"
political_status: 政治面貌,如"不限"、"中共党员"
work_years: 基层工作最低年限,如"无限制"、"二年"
major: 专业关键词(模糊匹配),如"计算机"、"法学"
exam_type: 考试类别(模糊匹配)
position_type: 职位属性(模糊匹配)
min_recruitment: 最小招考人数
max_recruitment: 最大招考人数
max_results: 最大返回结果数,默认30
返回:
符合条件的岗位列表(JSON格式)
"""
try:
logger.info(f"开始筛选岗位: organ_type={organ_type}, department={department}, location={location}")
df = _load_data()
result = df.copy()
# 机关类别 - 精确匹配
if organ_type:
result = result[result['机关类别'] == organ_type]
# 部门名称 - 模糊匹配
if department:
result = result[result['部门名称'].str.contains(department, na=False)]
# 工作地点 - 模糊匹配
if location:
result = result[result['工作地点'].str.contains(location, na=False)]
# 学历 - 模糊匹配
if education:
result = result[result['学历'].str.contains(education, na=False)]
# 政治面貌 - 精确匹配或不限
if political_status:
if political_status == '不限':
result = result[result['政治面貌'] == '不限']
else:
result = result[
(result['政治面貌'] == political_status) |
(result['政治面貌'] == '不限') |
(result['政治面貌'].str.contains(political_status, na=False))
]
# 基层工作最低年限
if work_years:
if work_years == '无限制':
result = result[result['基层工作最低年限'] == '无限制']
else:
result = result[result['基层工作最低年限'].str.contains(work_years, na=False)]
# 专业 - 模糊匹配
if major:
result = result[result['专业'].str.contains(major, na=False, regex=False)]
# 考试类别 - 模糊匹配
if exam_type:
result = result[result['考试类别'].str.contains(exam_type, na=False)]
# 职位属性 - 模糊匹配
if position_type:
result = result[result['职位属性'].str.contains(position_type, na=False)]
# 招考人数范围
if min_recruitment:
result = result[result['招考人数'] >= min_recruitment]
if max_recruitment:
result = result[result['招考人数'] <= max_recruitment]
output = _df_to_json(result, max_results)
output["filter_conditions"] = {
"organ_type": organ_type,
"department": department,
"location": location,
"education": education,
"political_status": political_status,
"work_years": work_years,
"major": major
}
logger.info(f"筛选完成,返回 {output['total_positions']} 个结果")
return json.dumps(output, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"筛选失败: {str(e)}")
return json.dumps({"error": f"筛选失败: {str(e)}"}, ensure_ascii=False)
@mcp.tool()
async def match_by_major(
major_keyword: str,
max_results: int = 30
) -> str:
"""
根据专业关键词匹配可报考的岗位
参数:
major_keyword: 专业关键词,如"计算机"、"软件工程"、"法学"、"会计"、"金融"
max_results: 最大返回结果数,默认30
返回:
匹配的岗位列表(JSON格式)
"""
try:
logger.info(f"开始专业匹配: major_keyword={major_keyword}")
df = _load_data()
result = df[df['专业'].str.contains(major_keyword, na=False, regex=False)]
output = _df_to_json(result, max_results)
output["major_keyword"] = major_keyword
logger.info(f"专业匹配完成,返回 {output['total_positions']} 个结果")
return json.dumps(output, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"专业匹配失败: {str(e)}")
return json.dumps({"error": f"专业匹配失败: {str(e)}"}, ensure_ascii=False)
@mcp.tool()
async def personal_match(
education: Optional[str] = None,
major: Optional[str] = None,
political_status: Optional[str] = None,
work_years: Optional[int] = None,
location: Optional[str] = None,
has_base_project: Optional[bool] = None,
max_results: int = 30
) -> str:
"""
根据个人条件一键匹配可报考的国考岗位
参数:
education: 学历,可选值:"大专"、"本科"、"硕士研究生"、"博士研究生"
major: 专业关键词,如"计算机"、"法学"
political_status: 政治面貌,可选值:"中共党员"、"共青团员"、"群众"
work_years: 基层工作年限(数字),如 0、1、2、3、5
location: 期望工作地点,如"北京"、"上海"
has_base_project: 是否有服务基层项目经历,True/False
max_results: 最大返回结果数,默认30
返回:
符合个人条件的可报考岗位列表(JSON格式)
"""
try:
logger.info(f"开始个人条件匹配: education={education}, major={major}, political_status={political_status}")
df = _load_data()
result = df.copy()
# 学历匹配
if education:
education_map = {
'大专': ['大专', '大专及以上', '大专或本科'],
'本科': ['本科', '本科及以上', '本科或硕士研究生', '大专或本科', '大专及以上', '仅限本科'],
'硕士研究生': ['硕士研究生', '硕士研究生及以上', '本科及以上', '本科或硕士研究生', '仅限硕士研究生'],
'博士研究生': ['博士研究生', '硕士研究生及以上', '本科及以上', '仅限博士研究生']
}
if education in education_map:
allowed_edu = education_map[education]
result = result[result['学历'].isin(allowed_edu)]
# 专业匹配
if major:
result = result[result['专业'].str.contains(major, na=False, regex=False)]
# 政治面貌匹配
if political_status:
if political_status == '中共党员':
# 党员可以报考所有岗位
pass
elif political_status == '共青团员':
# 团员可以报考"不限"和"中共党员或共青团员"的岗位
result = result[
(result['政治面貌'] == '不限') |
(result['政治面貌'].str.contains('共青团员', na=False))
]
else: # 群众
result = result[result['政治面貌'] == '不限']
# 基层工作年限匹配
if work_years is not None:
year_map = {
0: ['无限制'],
1: ['无限制', '一年'],
2: ['无限制', '一年', '二年'],
3: ['无限制', '一年', '二年', '三年'],
5: ['无限制', '一年', '二年', '三年', '五年以上']
}
if work_years in year_map:
allowed_years = year_map[work_years]
result = result[result['基层工作最低年限'].isin(allowed_years)]
# 工作地点匹配
if location:
result = result[result['工作地点'].str.contains(location, na=False)]
# 服务基层项目经历
if has_base_project is False:
# 没有基层项目经历,只能报考"无限制"的岗位
result = result[result['服务基层项目工作经历'] == '无限制']
output = _df_to_json(result, max_results)
output["personal_conditions"] = {
"education": education,
"major": major,
"political_status": political_status,
"work_years": work_years,
"location": location,
"has_base_project": has_base_project
}
logger.info(f"个人条件匹配完成,返回 {output['total_positions']} 个结果")
return json.dumps(output, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"个人条件匹配失败: {str(e)}")
return json.dumps({"error": f"个人条件匹配失败: {str(e)}"}, ensure_ascii=False)
@mcp.tool()
async def get_position_detail(
department: str,
position_name: str
) -> str:
"""
获取指定岗位的详细信息
参数:
department: 部门名称(精确或模糊匹配)
position_name: 招考职位名称(精确或模糊匹配)
返回:
岗位详细信息(JSON格式)
"""
try:
logger.info(f"获取岗位详情: department={department}, position_name={position_name}")
df = _load_data()
# 先尝试精确匹配
result = df[(df['部门名称'] == department) & (df['招考职位'] == position_name)]
# 如果没有结果,尝试模糊匹配
if len(result) == 0:
result = df[
(df['部门名称'].str.contains(department, na=False)) &
(df['招考职位'].str.contains(position_name, na=False))
]
if len(result) == 0:
return json.dumps({"error": "未找到符合条件的岗位"}, ensure_ascii=False)
# 取第一个结果
position = result.iloc[0].to_dict()
# 处理 NaN 值
for key, value in position.items():
if pd.isna(value):
position[key] = None
logger.info(f"获取岗位详情成功: {department} - {position_name}")
return json.dumps(position, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"获取岗位详情失败: {str(e)}")
return json.dumps({"error": f"获取岗位详情失败: {str(e)}"}, ensure_ascii=False)
@mcp.tool()
async def get_statistics(
group_by: str = "机关类别"
) -> str:
"""
获取国考岗位统计信息
参数:
group_by: 分组统计字段,可选值:
- "机关类别"(默认)
- "学历"
- "政治面貌"
- "基层工作最低年限"
返回:
统计信息(JSON格式)
"""
try:
logger.info(f"获取统计信息: group_by={group_by}")
df = _load_data()
# 基础统计
total_positions = len(df)
total_recruitment = int(df['招考人数'].sum())
# 分组统计
if group_by not in df.columns:
return json.dumps({"error": f"不支持的分组字段: {group_by}"}, ensure_ascii=False)
grouped = df.groupby(group_by).agg({
'招考人数': ['count', 'sum']
}).reset_index()
grouped.columns = [group_by, '岗位数', '招考人数']
grouped = grouped.sort_values('招考人数', ascending=False)
stats = grouped.to_dict(orient='records')
output = {
"total_positions": total_positions,
"total_recruitment": total_recruitment,
"group_by": group_by,
"statistics": stats
}
logger.info(f"统计信息获取成功")
return json.dumps(output, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"获取统计信息失败: {str(e)}")
return json.dumps({"error": f"获取统计信息失败: {str(e)}"}, ensure_ascii=False)
# @mcp.tool()
# async def compare_positions(
# positions: List[Dict[str, str]]
# ) -> str:
# """
# 对比多个岗位的信息
# 参数:
# positions: 岗位列表,每个岗位包含 department(部门名称)和 position_name(职位名称)
# 例如:[{"department": "税务局", "position_name": "科员"}, ...]
# 返回:
# 岗位对比信息(JSON格式)
# """
# try:
# logger.info(f"开始对比岗位: {len(positions)} 个岗位")
# df = _load_data()
# comparison_data = []
# for pos in positions:
# department = pos.get("department", "")
# position_name = pos.get("position_name", "")
# result = df[
# (df['部门名称'].str.contains(department, na=False)) &
# (df['招考职位'].str.contains(position_name, na=False))
# ]
# if len(result) > 0:
# position = result.iloc[0]
# comparison_data.append({
# "部门名称": position.get('部门名称'),
# "招考职位": position.get('招考职位'),
# "工作地点": position.get('工作地点'),
# "学历": position.get('学历'),
# "政治面貌": position.get('政治面貌'),
# "基层工作最低年限": position.get('基层工作最低年限'),
# "招考人数": int(position.get('招考人数', 0)),
# "专业": str(position.get('专业', ''))[:100] + "..." if len(str(position.get('专业', ''))) > 100 else position.get('专业')
# })
# output = {
# "comparison_count": len(comparison_data),
# "positions": comparison_data
# }
# logger.info(f"岗位对比完成,共对比 {len(comparison_data)} 个岗位")
# return json.dumps(output, ensure_ascii=False, indent=2)
# except Exception as e:
# logger.error(f"岗位对比失败: {str(e)}")
# return json.dumps({"error": f"岗位对比失败: {str(e)}"}, ensure_ascii=False)
# ========== 运行服务器 ==========
def main():
"""启动 MCP 服务器"""
logger.info("=" * 60)
logger.info("🎯 2025年国考小助手 MCP 服务器启动")
logger.info("=" * 60)
# 预加载数据
try:
_load_data()
except Exception as e:
logger.warning(f"预加载数据失败: {e},将在首次请求时加载")
# 获取端口配置
port = int(os.environ.get("MCP_PORT", os.environ.get("PORT", "9000")))
logger.info(f"[SSE模式] 启动 MCP 服务器")
logger.info(f"[SSE模式] 监听地址: http://localhost:{port}/sse")
logger.info(f"[SSE模式] 网络地址: http://0.0.0.0:{port}/sse")
logger.info(f"[SSE模式] 使用 Ctrl+C 停止服务器")
logger.info("-" * 60)
# 启动 MCP 服务器
mcp.run(
transport="sse",
host="0.0.0.0",
port=port,
path="/sse",
log_level="info",
)
if __name__ == "__main__":
main()