from dataclasses import dataclass
import math
import os
from typing import Any, Callable, Dict, List, Optional, Tuple
from dotenv import load_dotenv
# Load configuration from config.env
_config_path = os.path.join(os.path.dirname(__file__), "..", "config.env")
load_dotenv(_config_path)
class ToolNotFoundError(Exception):
pass
@dataclass(frozen=True)
class ToolSchema:
tool_type: str
properties: Dict[str, Dict[str, Any]]
required: List[str]
@dataclass(frozen=True)
class ToolDefinition:
name: str
description: str
input_schema: ToolSchema
output_schema: ToolSchema
handler: Callable[[Dict[str, Any]], Dict[str, Any]]
def run_echo_tool(payload: Dict[str, Any]) -> Dict[str, Any]:
# echo 도구 실행 함수
text = payload.get("text", "")
return {"text": f"Echo: {text}"}
def run_uppercase_tool(payload: Dict[str, Any]) -> Dict[str, Any]:
# uppercase 도구 실행 함수
text = payload.get("text", "")
return {"text": str(text).upper()}
def run_concat_tool(payload: Dict[str, Any]) -> Dict[str, Any]:
# concat 도구 실행 함수
items = payload.get("items", [])
if not isinstance(items, list):
items = [items]
text = "".join(str(item) for item in items)
return {"text": text}
def _get_autocad_application() -> Tuple[Optional[Any], Optional[str]]:
try:
import win32com.client # type: ignore
except Exception as exc:
return None, f"pywin32 import failed: {exc}"
try:
acad = win32com.client.GetActiveObject("AutoCAD.Application")
return acad, None
except Exception:
try:
acad = win32com.client.Dispatch("AutoCAD.Application")
return acad, None
except Exception as exc:
return None, f"AutoCAD connection failed: {exc}"
def _normalize_text(text: str) -> str:
if text is None:
return ""
cleaned = text.replace("\\P", " ").replace("\r", " ").replace("\n", " ").replace("\t", " ")
cleaned = " ".join(cleaned.split())
return cleaned.strip()
def _env_bool(value: Optional[str], default: bool = False) -> bool:
if value is None:
return default
normalized = value.strip().lower()
if normalized in {"1", "true", "yes", "y", "on"}:
return True
if normalized in {"0", "false", "no", "n", "off"}:
return False
return default
def _get_insertion_point(entity: Any) -> Optional[Tuple[float, float, float]]:
try:
point = entity.InsertionPoint
return float(point[0]), float(point[1]), float(point[2])
except Exception:
try:
point = entity.Location
return float(point[0]), float(point[1]), float(point[2])
except Exception:
return None
def _get_bounding_box(entity: Any) -> Optional[Tuple[float, float, float, float]]:
try:
min_point, max_point = entity.GetBoundingBox()
min_x, min_y = float(min_point[0]), float(min_point[1])
max_x, max_y = float(max_point[0]), float(max_point[1])
return min_x, min_y, max_x, max_y
except Exception:
return None
def _is_fully_inside(
bbox: Optional[Tuple[float, float, float, float]],
start_x: float,
end_x: float,
start_y: float,
end_y: float,
) -> bool:
if bbox is None:
return False
min_x, min_y, max_x, max_y = bbox
return (start_x <= min_x <= end_x and start_x <= max_x <= end_x and max_y <= start_y and min_y >= end_y)
def run_open_dwg_by_number(payload: Dict[str, Any]) -> Dict[str, Any]:
drawing_no = str(payload.get("drawing_no", "")).strip()
if not drawing_no:
return {"status": "error", "message": "drawing_no is required"}
folder_path = payload.get("folder_path")
if not folder_path:
folder_path = os.getenv("OPEN_DWG_FOLDER_PATH")
if not folder_path:
folder_path = os.getenv("DWG_FOLDER")
if not folder_path:
return {"status": "error", "message": "folder_path or DWG_FOLDER environment variable must be provided"}
if not os.path.isdir(folder_path):
return {"status": "error", "message": f"folder not found: {folder_path}"}
case_sensitive = payload.get("case_sensitive")
if case_sensitive is None:
case_sensitive = _env_bool(os.getenv("OPEN_DWG_CASE_SENSITIVE"), False)
else:
case_sensitive = bool(case_sensitive)
matches: List[str] = []
for name in os.listdir(folder_path):
if not name.lower().endswith(".dwg"):
continue
haystack = name if case_sensitive else name.lower()
needle = drawing_no if case_sensitive else drawing_no.lower()
if needle in haystack:
matches.append(os.path.join(folder_path, name))
if not matches:
return {"status": "error", "message": f"no .dwg contains drawing number: {drawing_no}"}
allow_multiple = payload.get("allow_multiple")
if allow_multiple is None:
allow_multiple = _env_bool(os.getenv("OPEN_DWG_ALLOW_MULTIPLE"), False)
else:
allow_multiple = bool(allow_multiple)
if len(matches) > 1 and not allow_multiple:
return {"status": "error", "message": "multiple drawings matched", "matches": matches}
target_path = matches[0]
acad, err = _get_autocad_application()
if err:
return {"status": "error", "message": err}
try:
doc = acad.Documents.Open(target_path)
activate = payload.get("activate")
if activate is None:
activate = _env_bool(os.getenv("OPEN_DWG_ACTIVATE"), True)
else:
activate = bool(activate)
if activate:
try:
acad.Visible = True # AutoCAD 애플리케이션 창을 화면에 표시
except Exception:
pass # Visible 속성이 지원되지 않는 경우 무시
try:
doc.Activate() # 문서를 활성 문서로 설정
except Exception:
pass # Activate 실패해도 진행
return {
"status": "ok",
"message": "drawing opened",
"path": target_path,
"document": getattr(doc, "Name", ""),
"visible": activate
}
except Exception as exc:
return {"status": "error", "message": f"failed to open drawing: {exc}"}
def _cluster_by_y(items: List[Dict[str, Any]], tolerance: float) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for item in sorted(items, key=lambda x: x["y"], reverse=True):
placed = False
for row in rows:
if abs(item["y"] - row["y"]) <= tolerance:
row["items"].append(item)
row["y"] = sum(i["y"] for i in row["items"]) / len(row["items"])
placed = True
break
if not placed:
rows.append({"y": item["y"], "items": [item]})
return rows
def _cluster_by_x(items: List[Dict[str, Any]], tolerance: float) -> List[Dict[str, Any]]:
groups: List[Dict[str, Any]] = []
for item in sorted(items, key=lambda x: x["x"]):
placed = False
for group in groups:
if abs(item["x"] - group["x"]) <= tolerance:
group["items"].append(item)
group["x"] = sum(i["x"] for i in group["items"]) / len(group["items"])
placed = True
break
if not placed:
groups.append({"x": item["x"], "items": [item]})
return groups
def run_extract_material_table(payload: Dict[str, Any]) -> Dict[str, Any]:
layer_name = str(payload.get("layer", "")).strip()
if not layer_name:
layer_name = os.getenv("LAYER_SHOP_FIELD", "")
if not layer_name:
return {"status": "error", "message": "layer is required or set LAYER_SHOP_FIELD in config.env"}
categories = payload.get("categories")
if not categories:
categories_str = os.getenv("MATERIAL_CATEGORIES", "SHOP MATERIAL,FIELD MATERIAL")
categories = [cat.strip() for cat in categories_str.split(",") if cat.strip()]
else:
categories = [str(cat).strip() for cat in categories if str(cat).strip()]
if not categories:
return {"status": "error", "message": "categories must not be empty or set MATERIAL_CATEGORIES in config.env"}
row_tolerance = payload.get("row_tolerance")
if row_tolerance is None:
row_tolerance = float(os.getenv("ROW_TOLERANCE", "3.0"))
else:
row_tolerance = float(row_tolerance)
col_tolerance = payload.get("col_tolerance")
if col_tolerance is None:
col_tolerance = float(os.getenv("COL_TOLERANCE", "40.0"))
else:
col_tolerance = float(col_tolerance)
column_names_env = os.getenv("MATERIAL_COLUMNS", "")
column_names = [c.strip() for c in column_names_env.split(",") if c.strip()]
if not column_names:
return {"status": "error", "message": "MATERIAL_COLUMNS must be set in config.env"}
# 결과에 표시할 칼럼명 (사용자 친화적)
column_display_env = os.getenv("MATERIAL_COLUMNS_DISPLAY", "")
column_display_names = [c.strip() for c in column_display_env.split(",") if c.strip()]
if not column_display_names or len(column_display_names) != len(column_names):
column_display_names = column_names # 설정되지 않으면 원래 이름 사용
# 제외할 카테고리/섹션 (이 텍스트 아래는 데이터로 포함하지 않음)
exclude_categories_env = os.getenv("MATERIAL_CATEGORIES_EXCLUDE", "")
exclude_categories = [c.strip().upper() for c in exclude_categories_env.split(",") if c.strip()]
acad, err = _get_autocad_application()
if err:
return {"status": "error", "message": err}
try:
doc = acad.ActiveDocument
except Exception as exc:
return {"status": "error", "message": f"failed to access active document: {exc}"}
text_items: List[Dict[str, Any]] = []
for obj in doc.ModelSpace:
if obj.EntityName not in ["AcDbText", "AcDbMText"]:
continue
if obj.Layer != layer_name:
continue
point = _get_insertion_point(obj)
if not point:
continue
raw_text = getattr(obj, "TextString", "")
norm_text = _normalize_text(str(raw_text))
if not norm_text:
continue
bbox = _get_bounding_box(obj)
text_items.append({
"text": str(raw_text),
"norm": norm_text,
"x": point[0],
"y": point[1],
"z": point[2],
"bbox": bbox,
})
if not text_items:
return {"status": "error", "message": f"no text found on layer: {layer_name}"}
category_map = {c.upper(): c for c in categories}
category_items = [item for item in text_items if item["norm"].upper() in category_map]
if not category_items:
return {"status": "error", "message": "no category headers found"}
header_merge_tolerance = row_tolerance * 2.0
results: Dict[str, Any] = {}
flattened_rows: List[Dict[str, str]] = []
sorted_category_items = sorted(category_items, key=lambda x: x["y"], reverse=True)
for idx, cat_item in enumerate(sorted_category_items):
category_name = category_map[cat_item["norm"].upper()]
# 다음 카테고리의 Y 좌표 구하기 (각 카테고리는 자신과 다음 카테고리 사이만 처리)
if idx + 1 < len(sorted_category_items):
next_cat_y = sorted_category_items[idx + 1]["y"]
else:
next_cat_y = float("-inf") # 마지막 카테고리는 끝까지
# 현재 카테고리 아래 ~ 다음 카테고리 위까지만 가져오기
items_below = [
item for item in text_items
if item["y"] < cat_item["y"] - row_tolerance and item["y"] > next_cat_y
]
if not items_below:
results[category_name] = {"rows": [], "message": "no items below category"}
continue
rows = _cluster_by_y(items_below, row_tolerance)
sorted_rows = sorted(rows, key=lambda x: x["y"], reverse=True)
first_below_item = max(items_below, key=lambda x: x["y"])
header_start_y = first_below_item["y"]
header_row = min(sorted_rows, key=lambda r: abs(r["y"] - header_start_y))
header_start_x = min(item["x"] for item in header_row["items"])
# 2줄 헤더를 위해 더 넓은 범위 확보 (약 7-8줄 간격)
header_merge_tolerance_wide = row_tolerance * 3.0
header_rows = [row for row in sorted_rows if header_start_y - header_merge_tolerance_wide <= row["y"] <= header_start_y]
if not header_rows:
results[category_name] = {"rows": [], "message": "header row not found"}
continue
# 컬럼 이름을 직접 찾기
column_positions: Dict[str, float] = {}
column_map = {col.upper(): col for col in column_names}
for col_name in column_names:
col_name_upper = col_name.upper()
matching_items = [
item for row in header_rows
for item in row["items"]
if item["norm"].upper() == col_name_upper
]
if not matching_items:
results[category_name] = {
"rows": [],
"message": f"column header not found: {col_name}",
}
break
# 가장 왼쪽 텍스트의 X 좌표를 컬럼 시작점으로 사용
column_positions[col_name] = min(item["x"] for item in matching_items)
if col_name in results and "rows" in results[category_name]:
continue
header_base_y = min(row["y"] for row in header_rows)
header_cutoff_y = header_base_y - (row_tolerance * 0.5)
data_rows = [row for row in rows if row["y"] < header_cutoff_y]
first_col_name = column_names[0]
first_col_start = column_positions.get(first_col_name)
if first_col_start is None:
results[category_name] = {"rows": [], "message": "first column not found"}
continue
sorted_starts = sorted(column_positions.values())
first_col_end = (sorted_starts[1] - 1) if len(sorted_starts) > 1 else float("inf")
first_col_items = []
for row in data_rows:
for item in row["items"]:
bbox = item.get("bbox")
if bbox is None:
continue
min_x, min_y, max_x, max_y = bbox
x_in_range = min_x <= first_col_end and max_x >= first_col_start
if not x_in_range:
continue
if max_y >= header_cutoff_y:
continue
first_col_items.append(item)
first_col_items = sorted(first_col_items, key=lambda x: x["bbox"][3], reverse=True)
row_markers: List[float] = []
for item in first_col_items:
if not str(item.get("norm", "")).strip().isdigit():
continue
bbox = item.get("bbox")
if bbox is None:
continue
row_top_y = bbox[3]
if not row_markers or abs(row_top_y - row_markers[-1]) > row_tolerance:
row_markers.append(row_top_y)
if not row_markers:
results[category_name] = {"rows": [], "message": "no data rows found"}
continue
# 제외 카테고리 찾기 (현재 카테고리 아래에서)
exclude_cutoff_y = None
for item in items_below:
if item["norm"].upper() in exclude_categories:
if exclude_cutoff_y is None or item["y"] > exclude_cutoff_y:
exclude_cutoff_y = item["y"]
# NO 간격의 median 계산
row_gaps = []
for i in range(len(row_markers) - 1):
gap = row_markers[i] - row_markers[i + 1]
row_gaps.append(gap)
if row_gaps:
sorted_gaps = sorted(row_gaps)
median_gap = sorted_gaps[len(sorted_gaps) // 2]
else:
median_gap = row_tolerance * 5.0 # fallback
# Row bounds 생성
row_bounds: List[Tuple[float, float]] = []
for i, upper in enumerate(row_markers):
if i + 1 < len(row_markers):
# 다음 NO가 있으면 그걸 lower로
lower = row_markers[i + 1]
else:
# 마지막 Row: median_gap * 1.5를 기본 높이로 사용
lower = upper - (median_gap * 1.5)
# 제외 카테고리가 있으면 그것을 lower로 제한
if exclude_cutoff_y is not None and exclude_cutoff_y > lower:
lower = exclude_cutoff_y
row_bounds.append((upper, lower))
row_values_list: List[Dict[str, List[str]]] = [
{col: [] for col in column_positions} for _ in row_bounds
]
def _row_index_for_bbox(min_y: float, max_y: float) -> Optional[int]:
best_idx = None
best_overlap = 0.0
for i, (upper, lower) in enumerate(row_bounds):
overlap = min(upper, max_y) - max(lower, min_y)
if overlap > best_overlap:
best_overlap = overlap
best_idx = i
return best_idx
for item in (item for row in data_rows for item in row["items"]):
bbox = item.get("bbox")
if bbox is None:
continue
min_x, min_y, max_x, max_y = bbox
if max_y >= header_cutoff_y:
continue
row_idx = _row_index_for_bbox(min_y, max_y)
if row_idx is None:
continue
# X축: 텍스트의 중심점이 어느 컬럼에 속하는지로 판단
x_center = (min_x + max_x) / 2
matched_col = None
for col, start_x in column_positions.items():
next_starts = sorted(x for x in column_positions.values() if x > start_x)
end_x = next_starts[0] - 1 if next_starts else float("inf")
if start_x <= x_center <= end_x:
matched_col = col
break
if matched_col is None:
continue
row_values_list[row_idx][matched_col].append(item["norm"])
output_rows: List[Dict[str, str]] = []
for row_values in row_values_list:
merged = {col: " ".join(values).strip() for col, values in row_values.items()}
first_value = merged.get(first_col_name, "")
if first_value and first_value.isdigit():
# 칼럼명을 display 이름으로 변환
display_row = {}
for orig_col, display_col in zip(column_names, column_display_names):
display_row[display_col] = merged.get(orig_col, "")
output_rows.append(display_row)
results[category_name] = {
"rows": output_rows,
}
return {
"status": "ok",
"categories": results,
}
def run_find_dimtext_by_bmcs_symbol(payload: Dict[str, Any]) -> Dict[str, Any]:
bmcs_symbol = str(payload.get("bmcs_symbol", "")).strip()
if not bmcs_symbol:
return {"status": "error", "message": "bmcs_symbol is required"}
column_names_env = os.getenv("MATERIAL_COLUMNS", "")
column_names = [c.strip() for c in column_names_env.split(",") if c.strip()]
if not column_names:
return {"status": "error", "message": "MATERIAL_COLUMNS must be set in config.env"}
column_display_env = os.getenv("MATERIAL_COLUMNS_DISPLAY", "")
column_display_names = [c.strip() for c in column_display_env.split(",") if c.strip()]
if not column_display_names or len(column_display_names) != len(column_names):
column_display_names = column_names
def _find_display_name(targets: List[str]) -> Optional[str]:
for orig_col, display_col in zip(column_names, column_display_names):
if orig_col.upper() in targets or display_col.upper() in targets:
return display_col
return None
part_no_key = _find_display_name(["NO", "PART NO", "ITEM NO"])
if part_no_key is None:
part_no_key = column_display_names[0]
symbol_key = _find_display_name(["SYM", "SYMBOL", "BMCS SYMBOL"])
if symbol_key is None:
return {"status": "error", "message": "symbol column not found in MATERIAL_COLUMNS"}
table_result = run_extract_material_table({})
if table_result.get("status") != "ok":
return {"status": "error", "message": "failed to extract material table"}
categories = table_result.get("categories", {})
part_no_matches: List[Dict[str, str]] = []
for category_name, category_data in categories.items():
for row in category_data.get("rows", []):
sym_value = str(row.get(symbol_key, "")).strip()
if sym_value == bmcs_symbol:
part_no = str(row.get(part_no_key, "")).strip()
if part_no:
part_no_matches.append({"category": category_name, "part_no": part_no})
if not part_no_matches:
return {"status": "error", "message": f"no part no found for BMCS symbol: {bmcs_symbol}"}
acad, err = _get_autocad_application()
if err:
return {"status": "error", "message": err}
try:
doc = acad.ActiveDocument
except Exception as exc:
return {"status": "error", "message": f"failed to access active document: {exc}"}
isotext_layer = os.getenv("ISOTEXT_LAYER", "ISOTEXT").upper()
dimtext_layer = os.getenv("DIMTEXT_LAYER", "DIMTEXT").upper()
iso_items: List[Dict[str, Any]] = []
dim_items: List[Dict[str, Any]] = []
for obj in doc.ModelSpace:
if obj.EntityName not in ["AcDbText", "AcDbMText"]:
continue
point = _get_insertion_point(obj)
if not point:
continue
raw_text = getattr(obj, "TextString", "")
norm_text = _normalize_text(str(raw_text))
if not norm_text:
continue
layer = str(getattr(obj, "Layer", "")).upper()
item = {
"text": str(raw_text),
"norm": norm_text,
"point": (point[0], point[1]),
}
if layer == isotext_layer:
iso_items.append(item)
elif layer == dimtext_layer:
dim_items.append(item)
if not iso_items:
return {"status": "error", "message": "no ISOTEXT items found"}
if not dim_items:
return {"status": "error", "message": "no DIMTEXT items found"}
results: List[Dict[str, Any]] = []
missing_part_nos: List[str] = []
for match in part_no_matches:
part_no = match["part_no"]
iso_matches = [item for item in iso_items if item["norm"] == part_no]
if not iso_matches:
missing_part_nos.append(part_no)
continue
for iso_item in iso_matches:
iso_x, iso_y = iso_item["point"]
nearest_dim = None
nearest_dist = None
for dim_item in dim_items:
dim_x, dim_y = dim_item["point"]
dist = math.hypot(dim_x - iso_x, dim_y - iso_y)
if nearest_dist is None or dist < nearest_dist:
nearest_dist = dist
nearest_dim = dim_item
if nearest_dim is None:
continue
results.append({
"category": match["category"],
"part_no": part_no,
"bmcs_symbol": bmcs_symbol,
"iso_text": iso_item["text"],
"iso_point": {"x": iso_x, "y": iso_y},
"dim_text": nearest_dim["text"],
"dim_point": {"x": nearest_dim["point"][0], "y": nearest_dim["point"][1]},
"distance": nearest_dist,
})
if not results:
return {"status": "error", "message": "no ISOTEXT match found for part numbers"}
response: Dict[str, Any] = {
"status": "ok",
"bmcs_symbol": bmcs_symbol,
"matches": results,
}
if missing_part_nos:
response["missing_part_nos"] = missing_part_nos
return response
def run_close_dwg(payload: Dict[str, Any]) -> Dict[str, Any]:
"""Close a drawing by name. AutoCAD application stays open."""
drawing_name = str(payload.get("drawing_name", "")).strip()
if not drawing_name:
return {"status": "error", "message": "drawing_name is required"}
try:
import win32com.client # type: ignore
except Exception as exc:
return {"status": "error", "message": f"pywin32 import failed: {exc}"}
try:
acad = win32com.client.GetActiveObject("AutoCAD.Application")
except Exception:
try:
acad = win32com.client.Dispatch("AutoCAD.Application")
except Exception as exc:
return {"status": "error", "message": f"AutoCAD connection failed: {exc}"}
try:
# 도면 이름으로 해당 문서를 찾기
found_doc = None
doc_count = acad.Documents.Count
for i in range(doc_count):
try:
doc = acad.Documents.Item(i)
if doc.Name.lower() == drawing_name.lower() or drawing_name.lower() in doc.Name.lower():
found_doc = doc
break
except Exception:
continue
if found_doc is None:
return {"status": "error", "message": f"drawing not found: {drawing_name}"}
doc_name = found_doc.Name
# 저장하지 않고 닫기
found_doc.Close(False)
return {
"status": "ok",
"message": f"drawing closed: {doc_name}",
"drawing_name": doc_name
}
except Exception as exc:
return {"status": "error", "message": f"failed to close drawing: {exc}"}
TOOL_REGISTRY: Dict[str, ToolDefinition] = {
# 새 도구는 여기에 추가
"echo": ToolDefinition(
name="echo",
description="Echo back the input text.",
input_schema=ToolSchema(
tool_type="object",
properties={"text": {"type": "string", "description": "Text to echo"}},
required=["text"],
),
output_schema=ToolSchema(
tool_type="object",
properties={"text": {"type": "string", "description": "Echoed text"}},
required=["text"],
),
handler=run_echo_tool,
),
"uppercase": ToolDefinition(
name="uppercase",
description="Uppercase the input text.",
input_schema=ToolSchema(
tool_type="object",
properties={"text": {"type": "string", "description": "Text to uppercase"}},
required=["text"],
),
output_schema=ToolSchema(
tool_type="object",
properties={"text": {"type": "string", "description": "Uppercased text"}},
required=["text"],
),
handler=run_uppercase_tool,
),
"concat": ToolDefinition(
name="concat",
description="Concatenate a list of items into a string.",
input_schema=ToolSchema(
tool_type="object",
properties={
"items": {"type": "array", "description": "Items to concatenate"}
},
required=["items"],
),
output_schema=ToolSchema(
tool_type="object",
properties={"text": {"type": "string", "description": "Concatenated text"}},
required=["text"],
),
handler=run_concat_tool,
),
"open_dwg_by_number": ToolDefinition(
name="open_dwg_by_number",
description="Open a DWG file whose name contains the provided drawing number. Uses config.env defaults if arguments are not provided.",
input_schema=ToolSchema(
tool_type="object",
properties={
"drawing_no": {"type": "string", "description": "Drawing number to match"},
},
required=["drawing_no"],
),
output_schema=ToolSchema(
tool_type="object",
properties={
"status": {"type": "string", "description": "ok or error"},
"message": {"type": "string", "description": "Result message"},
"path": {"type": "string", "description": "Opened DWG path"},
"document": {"type": "string", "description": "Opened document name"},
"matches": {"type": "array", "description": "Matched files when multiple"},
},
required=["status"],
),
handler=run_open_dwg_by_number,
),
"extract_material_table": ToolDefinition(
name="extract_material_table",
description="Extract SHOP/FIELD material tables from a specific layer. Uses config from config.env if not provided.",
input_schema=ToolSchema(
tool_type="object",
properties={
"layer": {"type": "string", "description": "Target layer name; uses LAYER_SHOP_FIELD from config.env if not provided"},
"categories": {"type": "array", "description": "Category headers to search for; uses MATERIAL_CATEGORIES from config.env if not provided"},
"row_tolerance": {"type": "number", "description": "Row clustering tolerance (default from ROW_TOLERANCE in config.env)"},
"col_tolerance": {"type": "number", "description": "Column assignment tolerance (default from COL_TOLERANCE in config.env)"},
},
required=[],
),
output_schema=ToolSchema(
tool_type="object",
properties={
"status": {"type": "string", "description": "ok or error"},
"layer": {"type": "string", "description": "Layer name"},
"categories": {"type": "object", "description": "Extracted category data with rows and column positions"},
"rows": {"type": "array", "description": "Flattened rows with category and column values"},
},
required=["status"],
),
handler=run_extract_material_table,
),
"find_dimtext_by_bmcs_symbol": ToolDefinition(
name="find_dimtext_by_bmcs_symbol",
description="Find Part No by BMCS Symbol, then locate matching ISOTEXT and nearest DIMTEXT.",
input_schema=ToolSchema(
tool_type="object",
properties={
"bmcs_symbol": {"type": "string", "description": "BMCS Symbol value"},
},
required=["bmcs_symbol"],
),
output_schema=ToolSchema(
tool_type="object",
properties={
"status": {"type": "string", "description": "ok or error"},
"bmcs_symbol": {"type": "string", "description": "BMCS Symbol value"},
"matches": {"type": "array", "description": "Matched ISOTEXT/DIMTEXT results"},
"missing_part_nos": {"type": "array", "description": "Part numbers not found in ISOTEXT"},
},
required=["status"],
),
handler=run_find_dimtext_by_bmcs_symbol,
),
"close_dwg": ToolDefinition(
name="close_dwg",
description="Close a drawing without saving. AutoCAD application remains open.",
input_schema=ToolSchema(
tool_type="object",
properties={
"drawing_name": {"type": "string", "description": "Drawing name to close (e.g., 'filename.dwg')"},
},
required=["drawing_name"],
),
output_schema=ToolSchema(
tool_type="object",
properties={
"status": {"type": "string", "description": "ok or error"},
"message": {"type": "string", "description": "Result message"},
"drawing_name": {"type": "string", "description": "Closed drawing name"},
},
required=["status"],
),
handler=run_close_dwg,
),
}
def list_tool_definitions() -> List[ToolDefinition]:
return list(TOOL_REGISTRY.values())
def execute_tool(tool: str, payload: Dict[str, Any]) -> Dict[str, Any]:
# 도구 이름에 따라 실행 함수 라우팅
tool_def = TOOL_REGISTRY.get(tool)
if tool_def is not None:
return tool_def.handler(payload)
raise ToolNotFoundError(f"tool_not_found: {tool}")