myquery.py•28.1 kB
#!/usr/bin/env python3
"""
Advanced arXiv Query Tool
This script provides advanced searching capabilities for arXiv papers with:
1. Complex boolean query parsing
2. Date filtering
3. Citation count filtering (when available)
4. CCF ranking filtering
5. Result saving to files
6. Multiple query intersection
"""
import asyncio
import json
import argparse
import re
from pathlib import Path
import sys
from typing import Dict, Any, List, Set, Optional
from datetime import datetime, timezone
from dateutil import parser
# Add project path
project_root = Path(__file__).parent / "src"
sys.path.insert(0, str(project_root))
from arxiv_mcp_server.tools.search import handle_search
# Load configuration
def load_config():
"""Load configuration from JSON file."""
config_path = Path(__file__).parent / "query_config.json"
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"Warning: Configuration file {config_path} not found. Using default settings.")
return {"ccf_a_venues": {}, "search_patterns": {}}
config = load_config()
# CCF venues (loaded from config)
CCF_VENUES = {"A": set(), "B": set(), "C": set()}
ccf_venues_config = config.get("ccf_venues", {})
for level in ["A", "B", "C"]:
level_venues = ccf_venues_config.get(level, {})
for category, venues in level_venues.items():
CCF_VENUES[level].update(venues)
# Backward compatibility: also load ccf_a_venues
CCF_A_VENUES = set()
for category, venues in config.get("ccf_a_venues", {}).items():
CCF_A_VENUES.update(venues)
# Merge with new format
CCF_A_VENUES.update(CCF_VENUES["A"])
class QueryParser:
"""Parser for complex boolean queries."""
def __init__(self):
self.or_groups = []
self.and_groups = []
def parse_complex_query(self, query_str: str) -> List[List[str]]:
"""
Parse a complex boolean query into components for intersection.
Example:
("A" OR "B") AND ("C" OR "D")
Returns: [["A", "B"], ["C", "D"]]
"""
# Remove outer parentheses and split by AND
query_str = query_str.strip()
# Split by AND (case insensitive)
and_parts = re.split(r'\s+AND\s+', query_str, flags=re.IGNORECASE)
result = []
for part in and_parts:
# Remove parentheses
part = part.strip('()')
# Split by OR (case insensitive)
or_terms = re.split(r'\s+OR\s+', part, flags=re.IGNORECASE)
# Clean up terms (remove quotes)
clean_terms = [term.strip().strip('"\'') for term in or_terms]
result.append(clean_terms)
return result
def build_arxiv_queries(self, query_components: List[List[str]]) -> List[str]:
"""Build individual arXiv queries for each OR group."""
queries = []
for or_group in query_components:
# Create a query for each term in the OR group
for term in or_group:
# Use all: field for better content search
arxiv_query = f'all:"{term}"'
queries.append(arxiv_query)
return queries
class AdvancedQueryTool:
"""Advanced query tool with intersection and filtering capabilities."""
def __init__(self):
self.parser = QueryParser()
self.predefined_queries = self._load_predefined_queries()
def _load_predefined_queries(self) -> Dict[str, str]:
"""Load predefined query templates."""
return {
"program_repair_llm": self._build_query_from_config("program_repair", "llm_terms"),
"software_testing_ai": '("software testing" OR "test generation" OR "test case") AND ("AI" OR "machine learning" OR "neural")',
"code_generation_llm": '("code generation" OR "program synthesis" OR "code completion") AND ("LLM" OR "transformer" OR "neural")',
"vulnerability_detection": '("vulnerability detection" OR "security analysis" OR "bug detection") AND ("deep learning" OR "machine learning")',
}
def _build_query_from_config(self, pattern1: str, pattern2: str) -> str:
"""Build query from configuration patterns."""
patterns = config.get("search_patterns", {})
group1 = patterns.get(pattern1, [])
group2 = patterns.get(pattern2, [])
if not group1 or not group2:
return ""
group1_str = " OR ".join(f'"{term}"' for term in group1)
group2_str = " OR ".join(f'"{term}"' for term in group2)
return f"({group1_str}) AND ({group2_str})"
def list_predefined_queries(self) -> None:
"""List available predefined queries."""
print("\n可用的预定义查询:")
for i, (key, query) in enumerate(self.predefined_queries.items(), 1):
print(f"{i}. {key}")
print(f" 查询: {query[:100]}{'...' if len(query) > 100 else ''}")
def get_predefined_query(self, key: str) -> Optional[str]:
"""Get a predefined query by key."""
return self.predefined_queries.get(key)
async def search_with_intersection(
self,
query_str: str,
max_results_per_query: int = 100,
date_from: Optional[str] = None,
min_citations: Optional[int] = None,
ccf_a_only: bool = False,
ccf_level: Optional[str] = None,
save_file: Optional[str] = None
) -> Dict[str, Any]:
"""
Perform search with query intersection.
Args:
query_str: Complex boolean query string
max_results_per_query: Max results per individual query
date_from: Date filter (YYYY-MM-DD)
min_citations: Minimum citation count
ccf_a_only: Filter for CCF A-class venues only (deprecated, use ccf_level)
ccf_level: Filter for specific CCF level ("A", "B", or "C")
save_file: File path to save results
"""
print(f"Parsing query: {query_str}")
# Parse the complex query
query_components = self.parser.parse_complex_query(query_str)
print(f"Query components: {query_components}")
# Build individual arXiv queries (only for the first AND group)
first_group_queries = len(query_components[0]) if query_components else 0
print(f"Will execute {first_group_queries} search queries for first AND group, then filter for remaining {len(query_components) - 1} groups")
# Execute queries using cascading search strategy
# First search the first AND group, then filter results for subsequent groups
all_results = {} # paper_id -> paper_data
candidate_papers = set() # Papers that satisfy the first AND group
# Process the first AND group to get initial candidate set
if not query_components:
return {"query": query_str, "papers": [], "error": "No query components"}
first_group = query_components[0]
print(f"\nStep 1: Searching first AND group: {first_group}")
# Get a larger set from the first group to have enough candidates
first_group_max_results = max(max_results_per_query * 2, 100)
for term_idx, term in enumerate(first_group):
query = f'all:"{term}"'
print(f" Executing OR query {term_idx+1}/{len(first_group)}: {query}")
search_args = {
"query": query,
"max_results": first_group_max_results
}
if date_from:
search_args["date_from"] = date_from
try:
results = await handle_search(search_args)
result_data = json.loads(results[0].text)
for paper in result_data.get("papers", []):
paper_id = paper["id"]
all_results[paper_id] = paper
candidate_papers.add(paper_id)
print(f" Found {len(result_data.get('papers', []))} papers")
except Exception as e:
print(f" Error in query: {e}")
continue
print(f" Total candidate papers from first AND group: {len(candidate_papers)}")
# For subsequent AND groups, filter the candidates instead of searching independently
for group_idx in range(1, len(query_components)):
or_group = query_components[group_idx]
print(f"\nStep {group_idx + 1}: Filtering candidates with AND group {group_idx + 1}: {or_group}")
# Check which candidates contain any of the OR terms in their title/abstract
filtered_candidates = set()
for paper_id in candidate_papers:
paper = all_results[paper_id]
title = paper.get("title", "").lower()
abstract = paper.get("abstract", "").lower()
# Check if paper contains any term from the current OR group
for term in or_group:
term_lower = term.lower()
if term_lower in title or term_lower in abstract:
filtered_candidates.add(paper_id)
break # Found one match, no need to check other terms
print(f" Papers matching AND group {group_idx + 1}: {len(filtered_candidates)}")
candidate_papers = filtered_candidates
if not candidate_papers:
print(f" No papers remain after filtering with AND group {group_idx + 1}")
break
# Final results are the remaining candidates
intersected_ids = candidate_papers
print(f"\nFinal intersection: {len(intersected_ids)} papers")
# Filter results
filtered_papers = []
for paper_id in intersected_ids:
paper = all_results[paper_id]
# Apply filters
if self._passes_filters(paper, min_citations, ccf_a_only, ccf_level):
filtered_papers.append(paper)
# Sort by publication date (newest first)
filtered_papers.sort(key=lambda x: x["published"], reverse=True)
# Prepare final results
final_results = {
"query": query_str,
"search_queries_executed": first_group_queries,
"filtering_groups": len(query_components) - 1,
"total_unique_papers": len(all_results),
"intersected_papers": len(intersected_ids),
"filtered_papers": len(filtered_papers),
"papers": filtered_papers,
"query_components": query_components,
"filters": {
"date_from": date_from,
"min_citations": min_citations,
"ccf_a_only": ccf_a_only,
"ccf_level": ccf_level
}
}
# Save to file if requested
if save_file:
success = self._save_results(final_results, save_file)
final_results["saved_to"] = save_file if success else None
if success:
print(f"Results saved to: {save_file}")
return final_results
def _passes_filters(
self,
paper: Dict[str, Any],
min_citations: Optional[int],
ccf_a_only: bool,
ccf_level: Optional[str] = None
) -> bool:
"""Check if paper passes all filters."""
# Citation filter
if min_citations is not None:
citations = paper.get("citation_count", 0)
if citations < min_citations:
return False
# CCF filter (priority: ccf_level > ccf_a_only)
if ccf_level:
if not hasattr(self, '_ccf_warning_shown'):
print(f"\n⚠️ WARNING: CCF {ccf_level} 筛选基于启发式方法,存在以下局限性:")
print(" - arXiv主要包含预印本,大多数论文尚未在会议发表")
print(" - 筛选基于文本匹配,可能存在误判")
print(" - 真正的CCF筛选需要外部发表场所数据库")
print(" - 建议将此功能作为粗略筛选工具使用\n")
self._ccf_warning_shown = True
if not self._is_ccf_level_paper(paper, ccf_level):
return False
elif ccf_a_only:
if not hasattr(self, '_ccf_a_warning_shown'):
print(f"\n⚠️ WARNING: CCF A类筛选存在局限性,详见上述说明\n")
self._ccf_a_warning_shown = True
if not self._is_ccf_a_paper(paper):
return False
return True
def _is_ccf_level_paper(self, paper: Dict[str, Any], level: str) -> bool:
"""
Check if a paper is published in a venue of specified CCF level.
Note: This is a heuristic approach since arXiv doesn't provide venue information.
We look for venue mentions in title/abstract, but this has limitations:
1. Most arXiv papers are preprints, not published papers
2. Venue information is rarely in title/abstract
3. This may have false positives/negatives
For accurate CCF filtering, you would need an external database
mapping paper IDs to publication venues.
"""
if level not in CCF_VENUES:
return False
target_venues = CCF_VENUES[level]
return self._paper_mentions_venues(paper, target_venues)
def _paper_mentions_venues(self, paper: Dict[str, Any], venues: set) -> bool:
"""
Check if a paper mentions any venue in the given set.
WARNING: This is a heuristic and may not accurately identify
papers actually published in these venues.
"""
title = paper.get("title", "").upper()
abstract = paper.get("abstract", "").upper()
for venue in venues:
venue_upper = venue.upper()
# Look for venue name in title
if venue_upper in title:
print(f" DEBUG: Found venue '{venue}' in title: {paper.get('title', '')[:100]}...")
return True
# Look for venue name in abstract with strict context checking
if venue_upper in abstract:
# Only accept if mentioned in clear publication context
strict_keywords = [
f"PUBLISHED IN {venue_upper}",
f"ACCEPTED TO {venue_upper}",
f"ACCEPTED BY {venue_upper}",
f"PROCEEDINGS OF {venue_upper}",
f"APPEARS IN {venue_upper}",
f"PRESENTED AT {venue_upper}",
]
# Check for strict publication context
for keyword in strict_keywords:
if keyword in abstract:
print(f" DEBUG: Found venue '{venue}' with context '{keyword}' in abstract")
return True
# Additional check: avoid false positives from citations
# If venue is mentioned but with citation context, exclude it
citation_indicators = [
f"[{venue_upper}", # [ICML 2023]
f"({venue_upper}", # (ICML, 2023)
f"CITE",
f"REFERENCE",
f"RELATED WORK",
f"PRIOR WORK",
f"PREVIOUS",
]
# Check surrounding context to avoid citation mentions
venue_pos = abstract.find(venue_upper)
if venue_pos >= 0:
# Check 50 characters before and after
start_pos = max(0, venue_pos - 50)
end_pos = min(len(abstract), venue_pos + len(venue_upper) + 50)
context = abstract[start_pos:end_pos]
# If it looks like a citation, skip it
is_citation = any(indicator in context for indicator in citation_indicators)
if not is_citation:
print(f" DEBUG: Found venue '{venue}' in abstract context (may be false positive)")
return True
return False
def _is_ccf_a_paper(self, paper: Dict[str, Any]) -> bool:
"""
Check if a paper is published in a CCF A-class venue (backward compatibility).
WARNING: This is a heuristic approach with significant limitations.
See _is_ccf_level_paper for detailed explanation.
"""
return self._paper_mentions_venues(paper, CCF_A_VENUES)
def _save_results(self, results: Dict[str, Any], file_path: str) -> bool:
"""Save results to JSON file."""
try:
output_path = Path(file_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"Error saving results: {e}")
return False
async def main():
"""Main function for command line interface."""
parser = argparse.ArgumentParser(
description="Advanced arXiv Query Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Simple search
python myquery.py "machine learning" --max-results 20
# Complex boolean search
python myquery.py '("program repair" OR "bug fix") AND ("LLM" OR "neural")' --save results.json
# Search with date filter
python myquery.py "software testing" --date-from 2023-01-01 --ccf-a
# Full example with all filters
python myquery.py '("program repair" OR "software repair") AND ("LLM" OR "neural")' \\
--date-from 2022-01-01 --min-citations 10 --ccf-a --save repair_llm_papers.json
"""
)
parser.add_argument("query", nargs='?', help="Search query (supports boolean operations) or template name")
parser.add_argument("--template", help="Use predefined query template")
parser.add_argument("--list-templates", action="store_true", help="List available query templates")
parser.add_argument("--max-results", type=int, default=50,
help="Maximum results per individual query (default: 50)")
parser.add_argument("--date-from", help="Search papers published after this date (YYYY-MM-DD)")
parser.add_argument("--min-citations", type=int, help="Minimum citation count")
parser.add_argument("--ccf-a", action="store_true", help="Filter for CCF A-class venues only (heuristic)")
parser.add_argument("--ccf-level", choices=["A", "B", "C"], help="Filter for specific CCF level (heuristic)")
parser.add_argument("--ccf-info", action="store_true", help="Show information about CCF filtering limitations")
parser.add_argument("--save", help="Save results to JSON file")
parser.add_argument("--interactive", action="store_true", help="Run in interactive mode")
args = parser.parse_args()
tool = AdvancedQueryTool()
if args.list_templates:
tool.list_predefined_queries()
return
if args.ccf_info:
print_ccf_info()
return
if args.interactive:
await interactive_mode(tool)
else:
# Determine query to use
query = None
if args.template:
query = tool.get_predefined_query(args.template)
if not query:
print(f"Error: Template '{args.template}' not found.")
tool.list_predefined_queries()
return
print(f"Using template '{args.template}': {query}")
elif args.query:
query = args.query
else:
print("Error: Please provide a query or use --template option.")
parser.print_help()
return
# Execute single query
results = await tool.search_with_intersection(
query_str=query,
max_results_per_query=args.max_results,
date_from=args.date_from,
min_citations=args.min_citations,
ccf_a_only=args.ccf_a,
ccf_level=args.ccf_level,
save_file=args.save
)
print("\n" + "="*80)
print("SEARCH RESULTS SUMMARY")
print("="*80)
print(f"Query: {results['query']}")
print(f"Search queries executed: {results['search_queries_executed']}")
print(f"Filtering groups applied: {results['filtering_groups']}")
print(f"Total unique papers found: {results['total_unique_papers']}")
print(f"Papers after filtering: {results['intersected_papers']}")
print(f"Papers after additional filters: {results['filtered_papers']}")
if results['papers']:
print(f"\nTop {min(5, len(results['papers']))} papers:")
for i, paper in enumerate(results['papers'][:5], 1):
print(f"\n{i}. {paper['title']}")
print(f" Authors: {', '.join(paper['authors'][:3])}{'...' if len(paper['authors']) > 3 else ''}")
print(f" Published: {paper['published'][:10]}")
print(f" arXiv ID: {paper['id']}")
print(f" URL: {paper['url']}")
async def interactive_mode(tool: AdvancedQueryTool):
"""Interactive mode for the query tool."""
print("\n=== Advanced arXiv Query Tool - Interactive Mode ===")
print("支持复杂布尔查询,日期过滤,引用数过滤等功能")
print("输入 'help' 查看帮助,'templates' 查看预定义查询,'quit' 退出")
while True:
print("\n" + "-"*60)
# Get query
query_input = input("输入查询 (支持布尔操作) 或选择模板编号: ").strip()
if query_input.lower() in ['quit', 'exit', 'q']:
break
elif query_input.lower() == 'help':
print_help()
continue
elif query_input.lower() in ['templates', 't']:
tool.list_predefined_queries()
continue
elif not query_input:
continue
# Check if input is a template number
query = query_input
if query_input.isdigit():
template_num = int(query_input)
templates = list(tool.predefined_queries.keys())
if 1 <= template_num <= len(templates):
template_key = templates[template_num - 1]
query = tool.get_predefined_query(template_key)
print(f"使用模板: {template_key}")
print(f"查询: {query}")
else:
print("无效的模板编号")
continue
# Get options
try:
max_results = int(input("每个查询的最大结果数 (默认50): ") or "50")
date_from = input("起始日期 (YYYY-MM-DD, 可选): ").strip() or None
min_citations_str = input("最小引用数 (可选): ").strip()
min_citations = int(min_citations_str) if min_citations_str else None
ccf_level = input("CCF级别过滤 (A/B/C, 可选): ").strip().upper() or None
if ccf_level and ccf_level not in ["A", "B", "C"]:
print("无效的CCF级别,忽略此过滤条件")
ccf_level = None
ccf_a = False # Use ccf_level instead
save_file = input("保存文件路径 (可选): ").strip() or None
print("\n正在搜索...")
results = await tool.search_with_intersection(
query_str=query,
max_results_per_query=max_results,
date_from=date_from,
min_citations=min_citations,
ccf_a_only=ccf_a,
ccf_level=ccf_level,
save_file=save_file
)
# Display results
print(f"\n搜索完成!")
print(f"执行了 {results['search_queries_executed']} 个搜索查询")
print(f"应用了 {results['filtering_groups']} 个过滤组")
print(f"找到 {results['total_unique_papers']} 篇候选论文")
print(f"过滤后剩余 {results['intersected_papers']} 篇")
print(f"最终结果 {results['filtered_papers']} 篇")
if results['papers']:
show_detail = input(f"\n显示前5篇论文详情? (y/n): ").strip().lower() == 'y'
if show_detail:
for i, paper in enumerate(results['papers'][:5], 1):
print(f"\n{i}. {paper['title']}")
print(f" 作者: {', '.join(paper['authors'][:3])}{'...' if len(paper['authors']) > 3 else ''}")
print(f" 发表: {paper['published'][:10]}")
print(f" ID: {paper['id']}")
print(f" 摘要: {paper['abstract'][:200]}...")
else:
print("没有找到符合条件的论文。")
except ValueError as e:
print(f"输入错误: {e}")
except Exception as e:
print(f"搜索出错: {e}")
def print_ccf_info():
"""Print information about CCF filtering limitations."""
info_text = """
=== CCF筛选功能说明 ===
⚠️ 重要限制:
1. arXiv特性:
- arXiv主要是预印本服务器,大多数论文在投稿前上传
- 论文可能尚未在任何会议发表,或正在同行评议中
- 即使被接收,发表信息也不会更新到arXiv记录中
2. 筛选方法:
- 当前方法基于标题和摘要中的文本匹配
- 查找CCF场所名称提及,但这不等同于在该场所发表
- 可能存在误判:引用其他论文时提到场所名称
3. 准确性问题:
- 假阳性:提到场所但未在此发表的论文
- 假阴性:在场所发表但未提及场所名称的论文
- 大多数真正的CCF A类论文可能不会被识别
4. 建议用法:
✓ 作为粗略的相关性筛选工具
✓ 寻找可能与高质量会议相关的论文
✗ 不要作为严格的发表场所验证
5. 替代方案:
- 使用专门的学术数据库(如DBLP、Google Scholar)
- 查看论文的最新版本和发表状态
- 结合引用数量等其他质量指标
如需精确的CCF分类,建议使用专业的学术论文数据库。
"""
print(info_text)
def print_help():
"""Print help information."""
help_text = """
=== 高级arXiv查询工具帮助 ===
1. 布尔查询支持:
- 使用 OR 连接同义词: "program repair" OR "bug fix"
- 使用 AND 连接不同概念: ("repair" OR "fix") AND ("LLM" OR "neural")
- 用引号包围短语: "machine learning"
2. 示例查询:
("program repair" OR "software repair" OR "bug fix") AND ("LLM" OR "neural" OR "transformer")
3. 过滤选项:
- 日期过滤: 只搜索指定日期后发表的论文
- 引用数过滤: 设置最小引用数量要求
- CCF A类过滤: 只显示在CCF A类会议/期刊发表的论文
4. 保存功能:
- 可将搜索结果保存为JSON文件
- 包含完整的论文信息和搜索统计
5. 查询原理:
- 将复杂查询分解为多个子查询
- 分别执行每个子查询
- 对结果求交集
- 应用额外过滤条件
"""
print(help_text)
if __name__ == "__main__":
asyncio.run(main())