# -*- coding: utf-8 -*-
"""
高校学科评估 MCP 服务器
基于 FastMCP 框架构建的高校学科评估查询系统
"""
import os
import logging
import json
from typing import List, Dict, Any, Optional
from fastmcp import FastMCP
import pandas as pd
from datetime import datetime
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("university_rankings_mcp.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 创建 FastMCP 服务器实例
mcp = FastMCP("university-rankings")
# 全局数据存储
_df = None
_excel_path = None
def _get_excel_path():
"""获取 Excel 文件路径"""
global _excel_path
if _excel_path:
return _excel_path
# 尝试多个可能的路径
possible_paths = [
os.environ.get("UNIVERSITY_EXCEL_PATH", ""),
os.path.join(os.path.dirname(__file__), "第四轮学科评估结果.xlsx"),
os.path.join(os.path.dirname(__file__), "data", "第四轮学科评估结果.xlsx"),
"/code/第四轮学科评估结果.xlsx", # 阿里云函数计算路径
"第四轮学科评估结果.xlsx",
]
for path in possible_paths:
if path and os.path.exists(path):
_excel_path = path
logger.info(f"找到数据文件: {path}")
return path
logger.error("未找到高校学科评估数据文件")
raise FileNotFoundError("未找到第四轮学科评估结果.xlsx文件,请设置UNIVERSITY_EXCEL_PATH环境变量")
def _load_data():
"""加载 Excel 数据"""
global _df
if _df is not None:
return _df
excel_path = _get_excel_path()
df = pd.read_excel(excel_path)
df = df[df['一级学科'] != '一级学科'].reset_index(drop=True)
# 简化列名
df.columns = ['_', '一级学科', '评估等级', '院校代码', '专业门类', '专业大类',
'院校名称', '所在省', '所在市', '院校类别', '一流大学学科', '院校性质']
# 清理数据
df = df.dropna(subset=['一级学科', '院校名称'])
_df = df
logger.info(f"数据加载成功!共 {len(_df)} 条记录,涵盖 {_df['一级学科'].nunique()} 个学科,{_df['院校名称'].nunique()} 所高校")
return _df
def _to_json(data: Any, status: str = "success") -> str:
"""将数据转换为JSON格式"""
result = {
"status": status,
"timestamp": datetime.now().isoformat(),
"data": data
}
return json.dumps(result, ensure_ascii=False, indent=2)
def _df_to_list(df: pd.DataFrame, max_results: int = 100) -> tuple:
"""将 DataFrame 转换为列表并处理超长数据"""
if len(df) > max_results:
df = df.head(max_results)
truncated = True
else:
truncated = False
records = df.to_dict(orient='records')
# 处理 NaN 值和类型转换
for record in records:
for key, value in record.items():
if pd.isna(value):
record[key] = None
elif isinstance(value, (int, float)):
if pd.isna(value):
record[key] = None
else:
record[key] = int(value) if isinstance(value, float) and value.is_integer() else float(value)
return records, truncated
# ========== MCP 工具定义 ==========
@mcp.tool()
async def search_university(
university_name: str,
page: int = 1,
per_page: int = 100
) -> str:
"""
功能1: 高校专业搜索
输入高校名,查看其所有学科及评估等级
参数:
university_name: 高校名称(支持模糊匹配),如"清华大学"、"北京大学"
page: 分页页码,从 1 开始,默认 1
per_page: 每页数量,默认 100
返回:
该高校的所有学科及其评估等级(JSON格式),包含分页信息
"""
try:
# 参数验证
if not university_name or not isinstance(university_name, str):
return _to_json({
"error": "university_name 参数不能为空且必须是字符串",
"hint": "请提供有效的高校名称,如'清华'、'北京大学'等"
}, "error")
if page < 1 or per_page < 1 or per_page > 1000:
return _to_json({
"error": f"无效的分页参数: page={page}, per_page={per_page}",
"hint": "page 必须 >= 1,per_page 必须在 1-1000 之间"
}, "error")
logger.info(f"开始高校搜索: university_name={university_name}, page={page}, per_page={per_page}")
df = _load_data()
# 先尝试精确匹配
result_df = df[df['院校名称'] == university_name].copy()
# 如果精确匹配失败,则模糊匹配
if len(result_df) == 0:
# 尝试包含匹配
result_df = df[df['院校名称'].str.contains(university_name, na=False)].copy()
# 如果仍然失败,尝试关键词匹配(支持更灵活的查询)
if len(result_df) == 0 and len(university_name) > 1:
# 分字匹配,例如"清华"能匹配"清华大学"
keywords = list(university_name)
mask = pd.Series([True] * len(df))
for keyword in keywords:
mask = mask & df['院校名称'].str.contains(keyword, na=False)
result_df = df[mask].copy()
if len(result_df) == 0:
return _to_json({
"error": f"未找到包含'{university_name}'的高校",
"hint": "请检查高校名称拼写,或使用更短的关键词重试,如'清华'而不是'清华大学北京'"
}, "not_found")
# 获取第一个匹配的高校的确切名称
actual_university = result_df.iloc[0]['院校名称']
result_df = df[df['院校名称'] == actual_university].copy()
# 计算分页
total_count = len(result_df)
total_pages = (total_count + per_page - 1) // per_page if per_page > 0 else 1
# 验证页码
if page > total_pages and total_pages > 0:
page = total_pages
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_df = result_df.iloc[start_idx:end_idx]
data = {
"university": actual_university,
"total_disciplines": total_count,
"page": page,
"per_page": per_page,
"total_pages": total_pages,
"disciplines": []
}
for row in paginated_df.to_dict(orient='records'):
data["disciplines"].append({
"discipline": row['一级学科'],
"grade": row['评估等级'],
"category": row['专业门类'],
"province": row['所在省'],
"city": row['所在市'],
"is_first_class": row['一流大学学科'] is not None
})
# 按等级分组统计
grade_counts = result_df['评估等级'].value_counts().to_dict()
data["grade_summary"] = grade_counts
logger.info(f"高校搜索完成: {actual_university},页码 {page}/{total_pages},共 {total_count} 个学科")
return json.dumps({
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"高校搜索失败: {str(e)}")
return _to_json({"error": f"高校搜索失败: {str(e)}"}, "error")
@mcp.tool()
async def search_discipline(
discipline_name: str,
page: int = 1,
per_page: int = 100
) -> str:
"""
功能2: 学科信息查询
输入学科名,查看开设该学科的所有高校及排名
参数:
discipline_name: 学科名称(支持模糊匹配),如"计算机"、"法学"
page: 分页页码,从 1 开始,默认 1
per_page: 每页数量,默认 100
返回:
开设该学科的高校列表,按等级排序(JSON格式),包含分页信息
"""
try:
# 参数验证
if not discipline_name or not isinstance(discipline_name, str):
return _to_json({
"error": "discipline_name 参数不能为空且必须是字符串",
"hint": "请提供有效的学科名称,如'计算机'、'法学'等"
}, "error")
if page < 1 or per_page < 1 or per_page > 1000:
return _to_json({
"error": f"无效的分页参数: page={page}, per_page={per_page}",
"hint": "page 必须 >= 1,per_page 必须在 1-1000 之间"
}, "error")
logger.info(f"开始学科查询: discipline_name={discipline_name}, page={page}, per_page={per_page}")
df = _load_data()
# 先尝试精确匹配
result_df = df[df['一级学科'] == discipline_name].copy()
# 如果精确匹配失败,则模糊匹配
if len(result_df) == 0:
# 尝试包含匹配
result_df = df[df['一级学科'].str.contains(discipline_name, na=False)].copy()
# 如果仍然失败,尝试关键词匹配(支持更灵活的查询)
if len(result_df) == 0 and len(discipline_name) > 1:
# 分字匹配,例如"计算机"能匹配"0812计算机科学与技术"
keywords = list(discipline_name)
mask = pd.Series([True] * len(df))
for keyword in keywords:
mask = mask & df['一级学科'].str.contains(keyword, na=False)
result_df = df[mask].copy()
if len(result_df) == 0:
# 获取所有可用学科列表提示
available_disciplines = df['一级学科'].unique().tolist()
return _to_json({
"error": f"未找到包含'{discipline_name}'的学科",
"hint": "请检查学科名称拼写,或使用更短的关键词重试",
"available_count": len(available_disciplines),
"sample_disciplines": available_disciplines[:5]
}, "not_found")
# 按等级排序
grade_order = {'A+': 0, 'A': 1, 'A-': 2, 'B+': 3, 'B': 4, 'B-': 5,
'C+': 6, 'C': 7, 'C-': 8}
result_df['grade_rank'] = result_df['评估等级'].map(lambda x: grade_order.get(x, 999))
result_df = result_df.sort_values('grade_rank').reset_index(drop=True)
# 计算分页
total_count = len(result_df)
total_pages = (total_count + per_page - 1) // per_page if per_page > 0 else 1
# 验证页码
if page > total_pages and total_pages > 0:
page = total_pages
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
paginated_df = result_df.iloc[start_idx:end_idx]
data = {
"discipline": result_df.iloc[0]['一级学科'],
"total_universities": total_count,
"page": page,
"per_page": per_page,
"total_pages": total_pages,
"universities": []
}
for rank, row in enumerate(paginated_df.to_dict(orient='records'), start=(page-1)*per_page+1):
data["universities"].append({
"rank": rank,
"university": row['院校名称'],
"grade": row['评估等级'],
"province": row['所在省'],
"university_category": row['院校类别'],
"is_first_class": row['一流大学学科'] is not None
})
# 等级分布
grade_dist = result_df['评估等级'].value_counts().to_dict()
data["grade_distribution"] = grade_dist
logger.info(f"学科查询完成: {data['discipline']},页码 {page}/{total_pages},共 {total_count} 所高校")
return json.dumps({
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}, ensure_ascii=False, indent=2)
except Exception as e:
error_msg = str(e)
logger.error(f"学科查询失败: {error_msg}")
# 提供更详细的错误信息和调试指引
return _to_json({
"error": f"学科查询失败",
"reason": error_msg,
"hint": "请检查学科名称是否正确,或使用更短的关键词重试"
}, "error")
# @mcp.tool()
# async def filter_by_grade(
# grades: List[str],
# page: int = 1,
# per_page: int = 100
# ) -> str:
# """
# 功能3: 等级筛选查询
# 按评估等级(A+/A/A-/B+等)筛选查看学科点
# 参数:
# grades: 等级列表,可选值:A+, A, A-, B+, B, B-, C+, C, C-
# 例如:["A+", "A"]。支持小写(会自动转换为大写)
# page: 分页页码,从 1 开始,默认 1
# per_page: 每页数量,默认 100
# 返回:
# 符合条件的学科列表(JSON格式),包含分页信息
# """
# try:
# # 验证和规范化等级参数
# valid_grades = {'A+', 'A', 'A-', 'B+', 'B', 'B-', 'C+', 'C', 'C-'}
# normalized_grades = []
# for g in grades:
# normalized_g = str(g).upper().strip()
# if normalized_g in valid_grades:
# normalized_grades.append(normalized_g)
# if not normalized_grades:
# return _to_json({
# "error": f"无效的等级参数",
# "invalid_grades": grades,
# "valid_grades": list(valid_grades),
# "hint": "等级参数不区分大小写,请从以上有效等级中选择"
# }, "error")
# logger.info(f"开始等级筛选: grades={normalized_grades}, page={page}, per_page={per_page}")
# df = _load_data()
# result_df = df[df['评估等级'].isin(normalized_grades)].copy()
# if len(result_df) == 0:
# return _to_json({"message": f"未找到等级为{normalized_grades}的学科点"}, "not_found")
# # 计算分页
# total_count = len(result_df)
# total_pages = (total_count + per_page - 1) // per_page if per_page > 0 else 1
# # 验证页码
# original_page = page
# if page < 1:
# page = 1
# if page > total_pages and total_pages > 0:
# # 当请求的页码超过总页数时,返回最后一页并提示
# logger.warning(f"请求的页码 {original_page} 超过总页数 {total_pages},已调整为 {total_pages}")
# page = total_pages
# start_idx = (page - 1) * per_page
# end_idx = start_idx + per_page
# paginated_df = result_df.iloc[start_idx:end_idx]
# data = {
# "grades": normalized_grades,
# "total_count": total_count,
# "page": page,
# "per_page": per_page,
# "total_pages": total_pages,
# "by_grade": {},
# "disciplines": []
# }
# # 按等级分类统计
# for grade in normalized_grades:
# grade_df = result_df[result_df['评估等级'] == grade]
# data["by_grade"][grade] = len(grade_df)
# # 详细列表
# records = paginated_df.to_dict(orient='records')
# # 处理 NaN 值
# for record in records:
# for key, value in record.items():
# if pd.isna(value):
# record[key] = None
# for row in records:
# data["disciplines"].append({
# "discipline": row['一级学科'],
# "grade": row['评估等级'],
# "university": row['院校名称'],
# "province": row['所在省'],
# "category": row['专业门类']
# })
# data["returned_count"] = len(records)
# logger.info(f"等级筛选完成: 等级 {normalized_grades},页码 {page}/{total_pages},共 {total_count} 条")
# return json.dumps({
# "status": "success",
# "data": data,
# "timestamp": datetime.now().isoformat()
# }, ensure_ascii=False, indent=2)
# except Exception as e:
# logger.exception("等级筛选失败")
# return _to_json({"error": f"等级筛选失败: {str(e)}"}, "error")
@mcp.tool()
async def browse_by_province(
province: str
) -> str:
"""
功能3: 地区浏览
按省份查看该地区高校的学科分布
参数:
province: 省份名称,如"北京"、"上海"、"浙江"
返回:
该地区高校的学科分布统计(JSON格式)
"""
try:
logger.info(f"开始地区浏览: province={province}")
df = _load_data()
result_df = df[df['所在省'] == province].copy()
if len(result_df) == 0:
return _to_json({"message": f"未找到位于'{province}'的数据"}, "not_found")
data = {
"province": province,
"total_disciplines": len(result_df),
"universities_count": result_df['院校名称'].nunique(),
"universities": [],
"grade_distribution": {},
"category_distribution": {}
}
# 高校列表及其学科数
for uni in sorted(result_df['院校名称'].unique()):
uni_df = result_df[result_df['院校名称'] == uni]
a_count = len(uni_df[uni_df['评估等级'].isin(['A+', 'A', 'A-'])])
data["universities"].append({
"name": uni,
"total_disciplines": len(uni_df),
"a_grade_count": a_count
})
# 等级分布
grade_dist = result_df['评估等级'].value_counts().to_dict()
data["grade_distribution"] = grade_dist
# 专业门类分布
category_dist = result_df['专业门类'].value_counts().to_dict()
data["category_distribution"] = category_dist
logger.info(f"地区浏览完成: {province},共 {data['universities_count']} 所高校")
return json.dumps({
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"地区浏览失败: {str(e)}")
return _to_json({"error": f"地区浏览失败: {str(e)}"}, "error")
@mcp.tool()
async def university_ranking(
rank_by: str = "disciplines",
limit: int = 50
) -> str:
"""
功能4: 高校排行榜
按学科数量、A+数量、一流大学数量等多维度排序
参数:
rank_by: 排序方式,可选值:
- "disciplines" - 按学科总数排序
- "a_plus" - 按A+学科数排序
- "a_grade" - 按A级及以上学科数排序
- "first_class" - 按一流大学学科数排序
limit: 返回前N个排名,默认50
返回:
按指定方式排序的高校排行榜(JSON格式)
"""
try:
# 参数验证
rank_key_map = {
"disciplines": "total_disciplines",
"a_plus": "a_plus_count",
"a_grade": "a_grade_count",
"first_class": "first_class_count"
}
if rank_by not in rank_key_map:
return _to_json({
"error": f"无效的排序方式: {rank_by}",
"valid_options": list(rank_key_map.keys()),
"descriptions": {
"disciplines": "按学科总数排序",
"a_plus": "按A+学科数排序",
"a_grade": "按A级及以上学科数排序",
"first_class": "按一流大学学科数排序"
}
}, "error")
if limit < 1 or limit > 1000:
return _to_json({
"error": f"无效的limit参数: {limit}",
"hint": "limit 必须在 1-1000 之间"
}, "error")
logger.info(f"开始高校排行: rank_by={rank_by}")
df = _load_data()
uni_list = []
for uni in df['院校名称'].unique():
uni_df = df[df['院校名称'] == uni]
a_plus = len(uni_df[uni_df['评估等级'] == 'A+'])
a_grade = len(uni_df[uni_df['评估等级'].isin(['A+', 'A', 'A-'])])
first_class = len(uni_df[uni_df['一流大学学科'].notna()])
uni_list.append({
"university": uni,
"total_disciplines": len(uni_df),
"a_plus_count": a_plus,
"a_grade_count": a_grade,
"first_class_count": first_class,
"province": uni_df.iloc[0]['所在省']
})
# 排序
rank_key_map = {
"disciplines": "total_disciplines",
"a_plus": "a_plus_count",
"a_grade": "a_grade_count",
"first_class": "first_class_count"
}
sort_key = rank_key_map.get(rank_by, "total_disciplines")
uni_list.sort(key=lambda x: x[sort_key], reverse=True)
rank_descriptions = {
"disciplines": "按学科总数排序",
"a_plus": "按A+学科数排序",
"a_grade": "按A级及以上学科数排序",
"first_class": "按一流大学学科数排序"
}
data = {
"rank_by": rank_by,
"rank_description": rank_descriptions.get(rank_by, "未知排序方式"),
"total_universities": len(uni_list),
"rankings": []
}
for rank, uni_info in enumerate(uni_list[:limit], 1):
uni_info['rank'] = rank
data["rankings"].append(uni_info)
logger.info(f"高校排行完成: 共 {len(uni_list)} 所高校")
return json.dumps({
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"高校排行失败: {str(e)}")
return _to_json({"error": f"高校排行失败: {str(e)}"}, "error")
@mcp.tool()
async def discipline_ranking(
rank_by: str = "universities",
limit: int = 50
) -> str:
"""
功能5: 学科排行榜
显示每个学科的开设高校数、A+数量统计
参数:
rank_by: 排序方式,可选值:
- "universities" - 按开设高校数排序
- "a_plus" - 按A+数量排序
- "a_grade" - 按A级及以上数量排序
limit: 返回前N个排名,默认50
返回:
按指定方式排序的学科排行榜(JSON格式)
"""
try:
# 参数验证
rank_key_map = {
"universities": "total_universities",
"a_plus": "a_plus_count",
"a_grade": "a_grade_count"
}
if rank_by not in rank_key_map:
return _to_json({
"error": f"无效的排序方式: {rank_by}",
"valid_options": list(rank_key_map.keys()),
"descriptions": {
"universities": "按开设高校数排序",
"a_plus": "按A+数量排序",
"a_grade": "按A级及以上数量排序"
}
}, "error")
if limit < 1 or limit > 1000:
return _to_json({
"error": f"无效的limit参数: {limit}",
"hint": "limit 必须在 1-1000 之间"
}, "error")
logger.info(f"开始学科排行: rank_by={rank_by}")
df = _load_data()
discipline_list = []
for disc in df['一级学科'].unique():
disc_df = df[df['一级学科'] == disc]
a_plus = len(disc_df[disc_df['评估等级'] == 'A+'])
a_grade = len(disc_df[disc_df['评估等级'].isin(['A+', 'A', 'A-'])])
discipline_list.append({
"discipline": disc,
"total_universities": len(disc_df),
"a_plus_count": a_plus,
"a_grade_count": a_grade,
"top_universities": []
})
# 排序
rank_key_map = {
"universities": "total_universities",
"a_plus": "a_plus_count",
"a_grade": "a_grade_count"
}
sort_key = rank_key_map.get(rank_by, "total_universities")
discipline_list.sort(key=lambda x: x[sort_key], reverse=True)
# 获取每个学科的Top高校
grade_order = {'A+': 0, 'A': 1, 'A-': 2, 'B+': 3, 'B': 4, 'B-': 5,
'C+': 6, 'C': 7, 'C-': 8}
for disc_info in discipline_list:
disc_df = df[df['一级学科'] == disc_info['discipline']].copy()
disc_df['grade_rank'] = disc_df['评估等级'].map(lambda x: grade_order.get(x, 999))
disc_df_sorted = disc_df.sort_values('grade_rank')
for _, row in disc_df_sorted.head(3).iterrows():
disc_info["top_universities"].append({
"university": row['院校名称'],
"grade": row['评估等级']
})
rank_descriptions = {
"universities": "按开设高校数排序",
"a_plus": "按A+数量排序",
"a_grade": "按A级及以上数量排序"
}
data = {
"rank_by": rank_by,
"rank_description": rank_descriptions.get(rank_by, "未知排序方式"),
"total_disciplines": len(discipline_list),
"rankings": []
}
for rank, disc_info in enumerate(discipline_list[:limit], 1):
disc_info['rank'] = rank
data["rankings"].append(disc_info)
logger.info(f"学科排行完成: 共 {len(discipline_list)} 个学科")
return json.dumps({
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"学科排行失败: {str(e)}")
return _to_json({"error": f"学科排行失败: {str(e)}"}, "error")
@mcp.tool()
async def strong_disciplines(
university: Optional[str] = None,
province: Optional[str] = None,
grades: List[str] = None
) -> str:
"""
功能6: 强势学科分析
查看某高校或地区的A/A+学科
参数:
university: 高校名称(模糊匹配),如"清华大学"、"北京大学"
province: 省份名称,如"北京"、"上海"
grades: 等级列表,默认["A+", "A", "A-"]
返回:
符合条件的强势学科列表(JSON格式)
"""
try:
if grades is None:
grades = ['A+', 'A', 'A-']
if university:
df = _load_data()
result_df = df[df['院校名称'].str.contains(university, na=False)].copy()
actual_university = result_df.iloc[0]['院校名称'] if len(result_df) > 0 else university
result_df = df[df['院校名称'] == actual_university]
scope = f"高校: {actual_university}"
logger.info(f"开始强势学科分析: university={actual_university}")
elif province:
df = _load_data()
result_df = df[df['所在省'] == province]
scope = f"地区: {province}"
logger.info(f"开始强势学科分析: province={province}")
else:
return _to_json({"message": "请提供高校或地区参数"}, "error")
if len(result_df) == 0:
return _to_json({"message": f"未找到数据: {scope}"}, "not_found")
# 筛选指定等级
result_df = result_df[result_df['评估等级'].isin(grades)]
if len(result_df) == 0:
return _to_json({"message": f"未找到等级为{grades}的学科: {scope}"}, "not_found")
data = {
"scope": scope,
"grades": grades,
"total_strong_disciplines": len(result_df),
"disciplines": []
}
# 按等级分类统计
from collections import Counter
grade_counts = Counter(result_df['评估等级'])
data["grade_counts"] = dict(grade_counts)
# 详细列表(按等级排序)
for grade in sorted(set(grades), key=lambda x: (['A+', 'A', 'A-'].index(x) if x in ['A+', 'A', 'A-'] else 999)):
grade_df = result_df[result_df['评估等级'] == grade]
if len(grade_df) > 0:
data["disciplines"].append({
"grade": grade,
"count": len(grade_df),
"disciplines_list": sorted(grade_df['一级学科'].unique().tolist())
})
logger.info(f"强势学科分析完成: {scope},共 {data['total_strong_disciplines']} 个强势学科")
return json.dumps({
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"强势学科分析失败: {str(e)}")
return _to_json({"error": f"强势学科分析失败: {str(e)}"}, "error")
@mcp.tool()
async def grade_analysis(
detail_level: str = "full"
) -> str:
"""
功能7: 评估等级分析
统计各等级的学科点分布、热门学科
参数:
detail_level: 详细程度,可选值:"full"(完整)、"summary"(摘要),默认"full"
返回:
各等级的学科分布统计信息(JSON格式)
"""
try:
logger.info(f"开始评估等级分析: detail_level={detail_level}")
df = _load_data()
data = {
"total_disciplines": len(df),
"grade_distribution": {},
"grade_details": []
}
# 总体分布
grade_dist = df['评估等级'].value_counts()
for grade, count in grade_dist.items():
if grade != '等级': # 排除标题行
pct = (count / len(df)) * 100
data["grade_distribution"][grade] = {
"count": int(count),
"percentage": round(pct, 2)
}
# 按等级详细分析
for grade in ['A+', 'A', 'A-', 'B+', 'B', 'B-', 'C+', 'C', 'C-']:
grade_df = df[df['评估等级'] == grade]
if len(grade_df) == 0:
continue
# 热门学科(该等级最多的学科)
top_disciplines = grade_df['一级学科'].value_counts().head(5)
detail = {
"grade": grade,
"total_count": int(len(grade_df)),
"unique_disciplines": int(grade_df['一级学科'].nunique()),
"unique_universities": int(grade_df['院校名称'].nunique()),
"top_disciplines": top_disciplines.to_dict(),
"discipline_distribution": grade_df['专业门类'].value_counts().head(3).to_dict()
}
data["grade_details"].append(detail)
logger.info("评估等级分析完成")
return json.dumps({
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"评估等级分析失败: {str(e)}")
return _to_json({"error": f"评估等级分析失败: {str(e)}"}, "error")
@mcp.tool()
async def get_statistics(
group_by: str = "所在省"
) -> str:
"""
功能8: 获取统计信息
参数:
group_by: 分组统计字段,可选值:
- "所在省" - 按省份统计
- "一级学科" - 按学科统计
- "评估等级" - 按等级统计
- "院校类别" - 按高校类别统计
返回:
统计信息(JSON格式)
"""
try:
logger.info(f"开始统计: group_by={group_by}")
df = _load_data()
# 基础统计
total_records = len(df)
total_universities = int(df['院校名称'].nunique())
total_disciplines = int(df['一级学科'].nunique())
# 校验分组字段
if group_by not in df.columns:
return _to_json({"error": f"不支持的分组字段: {group_by}"}, "error")
# 使用命名聚合,避免列名冲突(例如 group_by == '一级学科' 时)
grouped = df.groupby(group_by).agg(
高校数=('院校名称', 'nunique'),
学科数=('一级学科', 'nunique'),
记录数=('一级学科', 'size')
).reset_index()
# 按记录数或学科数排序(优先记录数)
if '记录数' in grouped.columns:
grouped = grouped.sort_values('记录数', ascending=False)
else:
grouped = grouped.sort_values('学科数', ascending=False)
# 将列名规范为可序列化的字段
# 使用统一的 'category' 字段名替代原始列名
grouped = grouped.rename(columns={group_by: 'category'})
stats = grouped.to_dict(orient='records')
data = {
"total_records": int(total_records),
"total_universities": total_universities,
"total_disciplines": total_disciplines,
"group_by": group_by,
"statistics": stats
}
logger.info(f"统计完成: {group_by}")
return json.dumps({
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}, ensure_ascii=False, indent=2)
except Exception as e:
logger.exception("统计失败")
return _to_json({"error": f"统计失败: {str(e)}"}, "error")
# 兼容性处理:将被装饰为 MCP 工具的对象暴露出原始可 await 的函数,
# 以便像以前那样直接从模块导入并 await 调用(例如 test_mcp.py)。
for _name in [
'search_university', 'search_discipline', 'filter_by_grade', 'browse_by_province',
'university_ranking', 'discipline_ranking', 'strong_disciplines', 'grade_analysis',
'get_statistics'
]:
_obj = globals().get(_name)
if _obj is not None and hasattr(_obj, 'fn'):
globals()[_name] = getattr(_obj, 'fn')
# ========== 运行服务器 ==========
def main():
"""启动 MCP 服务器"""
logger.info("=" * 60)
logger.info("🎓 高校学科评估 MCP 服务器启动")
logger.info("=" * 60)
# 预加载数据
try:
_load_data()
except Exception as e:
logger.warning(f"预加载数据失败: {e},将在首次请求时加载")
# 获取端口配置
port = int(os.environ.get("MCP_PORT", os.environ.get("PORT", "9000")))
logger.info(f"[SSE模式] 启动 MCP 服务器")
logger.info(f"[SSE模式] 监听地址: http://localhost:{port}/sse")
logger.info(f"[SSE模式] 网络地址: http://0.0.0.0:{port}/sse")
logger.info(f"[SSE模式] 使用 Ctrl+C 停止服务器")
logger.info("-" * 60)
# 启动 MCP 服务器
mcp.run(
transport="sse",
host="0.0.0.0",
port=port,
path="/sse",
log_level="info",
)
if __name__ == "__main__":
main()