Skip to main content
Glama
client_v2.py21.1 kB
# kik_mcp_module/client_v2.py import httpx import logging import uuid import ssl import os from typing import Optional from datetime import datetime # Cryptography imports for AES-256-CBC encryption of document IDs try: from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend HAS_CRYPTOGRAPHY = True except ImportError: HAS_CRYPTOGRAPHY = False from .models_v2 import ( KikV2DecisionType, KikV2SearchPayload, KikV2SearchPayloadDk, KikV2SearchPayloadMk, KikV2RequestData, KikV2QueryRequest, KikV2KeyValuePair, KikV2SearchResponse, KikV2SearchResponseDk, KikV2SearchResponseMk, KikV2SearchResult, KikV2CompactDecision, KikV2DocumentMarkdown ) logger = logging.getLogger(__name__) class KikV2ApiClient: """ New KIK v2 API Client for https://ekapv2.kik.gov.tr This client uses the modern JSON-based API endpoint that provides better structured data compared to the legacy form-based API. """ BASE_URL = "https://ekapv2.kik.gov.tr" # Endpoint mappings for different decision types ENDPOINTS = { KikV2DecisionType.UYUSMAZLIK: "/b_ihalearaclari/api/KurulKararlari/GetKurulKararlari", KikV2DecisionType.DUZENLEYICI: "/b_ihalearaclari/api/KurulKararlari/GetKurulKararlariDk", KikV2DecisionType.MAHKEME: "/b_ihalearaclari/api/KurulKararlari/GetKurulKararlariMk" } # AES-256-CBC encryption key for document ID encryption (reverse engineered from ekapv2.kik.gov.tr Angular app) # This key is used to encrypt numeric gundemMaddesiId values to 64-character hex hashes for document URLs DOCUMENT_ID_ENCRYPTION_KEY = bytes([ 236, 193, 164, 43, 12, 135, 121, 170, 4, 244, 123, 219, 82, 158, 124, 174, 174, 228, 219, 174, 208, 104, 174, 120, 32, 76, 250, 4, 143, 159, 211, 176 ]) @staticmethod def encrypt_document_id(numeric_id: str) -> str: """ Encrypt a numeric KİK gundemMaddesiId to the 64-character hex hash used in document URLs. Algorithm: AES-256-CBC with PKCS7 padding Output format: IV (16 bytes hex) + Ciphertext (16 bytes hex) = 64 chars Args: numeric_id: The numeric document ID from search results (e.g., "177280") Returns: 64-character hex string for use in document URL KararId parameter """ if not HAS_CRYPTOGRAPHY: raise ImportError("cryptography library required for document ID encryption") # Generate random IV (16 bytes) iv = os.urandom(16) # Create AES-CBC cipher with the encryption key cipher = Cipher( algorithms.AES(KikV2ApiClient.DOCUMENT_ID_ENCRYPTION_KEY), modes.CBC(iv), backend=default_backend() ) encryptor = cipher.encryptor() # Encode plaintext and apply PKCS7 padding plaintext = numeric_id.encode('utf-8') block_size = 16 padding_len = block_size - (len(plaintext) % block_size) padded_plaintext = plaintext + bytes([padding_len] * padding_len) # Encrypt ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize() # Return IV + ciphertext as lowercase hex (64 characters total) return iv.hex() + ciphertext.hex() def __init__(self, request_timeout: float = 60.0): # Create SSL context with legacy server support ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE # Enable legacy server connect option for older SSL implementations (Python 3.12+) if hasattr(ssl, 'OP_LEGACY_SERVER_CONNECT'): ssl_context.options |= ssl.OP_LEGACY_SERVER_CONNECT # Set broader cipher suite support including legacy ciphers ssl_context.set_ciphers('ALL:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!SRP:!CAMELLIA') self.http_client = httpx.AsyncClient( base_url=self.BASE_URL, verify=ssl_context, headers={ "Accept": "application/json", "Accept-Language": "tr", "Content-Type": "application/json", "Origin": self.BASE_URL, "Referer": f"{self.BASE_URL}/sorgulamalar/kurul-kararlari", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36", "api-version": "v1", "sec-ch-ua": '"Not;A=Brand";v="99", "Google Chrome";v="139", "Chromium";v="139"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"' }, timeout=request_timeout ) # Generate security headers (these might need to be updated based on API requirements) self.security_headers = self._generate_security_headers() def _generate_security_headers(self) -> dict: """ Generate the custom security headers required by KIK v2 API. These headers appear to be for request validation/encryption. """ # Generate a random GUID for each session request_guid = str(uuid.uuid4()) # These are example values - in a real implementation, these might need # to be calculated based on the request content or session return { "X-Custom-Request-Guid": request_guid, "X-Custom-Request-R8id": "hwnOjsN8qdgtDw70x3sKkxab0rj2bQ8Uph4+C+oU+9AMmQqRN3eMOEEeet748DOf", "X-Custom-Request-Siv": "p2IQRTitF8z7I39nBjdAqA==", "X-Custom-Request-Ts": "1vB3Wwrt8YQ5U6t3XAzZ+Q==" } def _build_search_payload(self, decision_type: KikV2DecisionType, karar_metni: str = "", karar_no: str = "", basvuran: str = "", idare_adi: str = "", baslangic_tarihi: str = "", bitis_tarihi: str = ""): """Build the search payload for KIK v2 API.""" key_value_pairs = [] # Add non-empty search criteria if karar_metni: key_value_pairs.append(KikV2KeyValuePair(key="KararMetni", value=karar_metni)) if karar_no: key_value_pairs.append(KikV2KeyValuePair(key="KararNo", value=karar_no)) if basvuran: key_value_pairs.append(KikV2KeyValuePair(key="BasvuranAdi", value=basvuran)) if idare_adi: key_value_pairs.append(KikV2KeyValuePair(key="IdareAdi", value=idare_adi)) if baslangic_tarihi: key_value_pairs.append(KikV2KeyValuePair(key="BaslangicTarihi", value=baslangic_tarihi)) if bitis_tarihi: key_value_pairs.append(KikV2KeyValuePair(key="BitisTarihi", value=bitis_tarihi)) # If no search criteria provided, use a generic search if not key_value_pairs: key_value_pairs.append(KikV2KeyValuePair(key="KararMetni", value="")) query_request = KikV2QueryRequest(keyValueOfstringanyType=key_value_pairs) request_data = KikV2RequestData(keyValuePairs=query_request) # Return appropriate payload based on decision type if decision_type == KikV2DecisionType.UYUSMAZLIK: return KikV2SearchPayload(sorgulaKurulKararlari=request_data) elif decision_type == KikV2DecisionType.DUZENLEYICI: return KikV2SearchPayloadDk(sorgulaKurulKararlariDk=request_data) elif decision_type == KikV2DecisionType.MAHKEME: return KikV2SearchPayloadMk(sorgulaKurulKararlariMk=request_data) else: raise ValueError(f"Unsupported decision type: {decision_type}") async def search_decisions(self, decision_type: KikV2DecisionType = KikV2DecisionType.UYUSMAZLIK, karar_metni: str = "", karar_no: str = "", basvuran: str = "", idare_adi: str = "", baslangic_tarihi: str = "", bitis_tarihi: str = "") -> KikV2SearchResult: """ Search KIK decisions using the v2 API. Args: decision_type: Type of decision to search (uyusmazlik/duzenleyici/mahkeme) karar_metni: Decision text search karar_no: Decision number (e.g., "2025/UH.II-1801") basvuran: Applicant name idare_adi: Administration name baslangic_tarihi: Start date (YYYY-MM-DD format) bitis_tarihi: End date (YYYY-MM-DD format) Returns: KikV2SearchResult with compact decision list """ logger.info(f"KikV2ApiClient: Searching {decision_type.value} decisions with criteria - karar_metni: '{karar_metni}', karar_no: '{karar_no}', basvuran: '{basvuran}'") try: # Build request payload payload = self._build_search_payload( decision_type=decision_type, karar_metni=karar_metni, karar_no=karar_no, basvuran=basvuran, idare_adi=idare_adi, baslangic_tarihi=baslangic_tarihi, bitis_tarihi=bitis_tarihi ) # Update security headers for this request headers = {**self.http_client.headers, **self._generate_security_headers()} # Get the appropriate endpoint for this decision type endpoint = self.ENDPOINTS[decision_type] # Make API request response = await self.http_client.post( endpoint, json=payload.model_dump(), headers=headers ) response.raise_for_status() response_data = response.json() logger.debug(f"KikV2ApiClient: Raw API response structure: {type(response_data)}") # Parse the API response based on decision type if decision_type == KikV2DecisionType.UYUSMAZLIK: api_response = KikV2SearchResponse(**response_data) result_data = api_response.SorgulaKurulKararlariResponse.SorgulaKurulKararlariResult elif decision_type == KikV2DecisionType.DUZENLEYICI: api_response = KikV2SearchResponseDk(**response_data) result_data = api_response.SorgulaKurulKararlariDkResponse.SorgulaKurulKararlariDkResult elif decision_type == KikV2DecisionType.MAHKEME: api_response = KikV2SearchResponseMk(**response_data) result_data = api_response.SorgulaKurulKararlariMkResponse.SorgulaKurulKararlariMkResult else: raise ValueError(f"Unsupported decision type: {decision_type}") # Check for API errors if result_data.hataKodu and result_data.hataKodu != "0": logger.warning(f"KikV2ApiClient: API returned error - Code: {result_data.hataKodu}, Message: {result_data.hataMesaji}") return KikV2SearchResult( decisions=[], total_records=0, page=1, error_code=result_data.hataKodu, error_message=result_data.hataMesaji ) # Convert to compact format compact_decisions = [] total_count = 0 for decision_group in result_data.KurulKararTutanakDetayListesi: for decision_detail in decision_group.KurulKararTutanakDetayi: compact_decision = KikV2CompactDecision( kararNo=decision_detail.kararNo, kararTarihi=decision_detail.kararTarihi, basvuran=decision_detail.basvuran, idareAdi=decision_detail.idareAdi, basvuruKonusu=decision_detail.basvuruKonusu, gundemMaddesiId=decision_detail.gundemMaddesiId, decision_type=decision_type.value ) compact_decisions.append(compact_decision) total_count += 1 logger.info(f"KikV2ApiClient: Found {total_count} decisions") return KikV2SearchResult( decisions=compact_decisions, total_records=total_count, page=1, error_code="0", error_message="" ) except httpx.HTTPStatusError as e: logger.error(f"KikV2ApiClient: HTTP error during search: {e.response.status_code} - {e.response.text}") return KikV2SearchResult( decisions=[], total_records=0, page=1, error_code="HTTP_ERROR", error_message=f"HTTP {e.response.status_code}: {e.response.text}" ) except Exception as e: logger.error(f"KikV2ApiClient: Unexpected error during search: {str(e)}") return KikV2SearchResult( decisions=[], total_records=0, page=1, error_code="UNEXPECTED_ERROR", error_message=str(e) ) async def get_document_markdown(self, document_id: str) -> KikV2DocumentMarkdown: """ Get KİK decision document content in Markdown format. This method uses a two-step process: 1. Call GetSorgulamaUrl endpoint to get the actual document URL 2. Use httpx to fetch the document content Args: document_id: The gundemMaddesiId from search results Returns: KikV2DocumentMarkdown with document content converted to Markdown """ logger.info(f"KikV2ApiClient: Getting document for ID: {document_id}") if not document_id or not document_id.strip(): return KikV2DocumentMarkdown( document_id=document_id, kararNo="", markdown_content="", source_url="", error_message="Document ID is required" ) try: # Step 1: Get the actual document URL using GetSorgulamaUrl endpoint logger.info(f"KikV2ApiClient: Step 1 - Getting document URL for ID: {document_id}") # Update security headers for this request headers = {**self.http_client.headers, **self._generate_security_headers()} # Call GetSorgulamaUrl to get the real document URL url_payload = {"sorguSayfaTipi": 2} # As shown in curl example url_response = await self.http_client.post( "/b_ihalearaclari/api/KurulKararlari/GetSorgulamaUrl", json=url_payload, headers=headers ) url_response.raise_for_status() url_data = url_response.json() # Get the base document URL from API response base_document_url = url_data.get("sorgulamaUrl", "") if not base_document_url: return KikV2DocumentMarkdown( document_id=document_id, kararNo="", markdown_content="", source_url="", error_message="Could not get document URL from GetSorgulamaUrl API" ) # If document_id is numeric, encrypt it to get the KararId hash # The web interface uses AES-256-CBC encrypted hashes for document URLs karar_id = document_id if document_id.isdigit(): try: karar_id = self.encrypt_document_id(document_id) logger.info(f"KikV2ApiClient: Encrypted numeric ID {document_id} to hash: {karar_id}") except Exception as enc_error: logger.warning(f"KikV2ApiClient: Could not encrypt document ID, using as-is: {enc_error}") # Construct full document URL with the encrypted KararId document_url = f"{base_document_url}?KararId={karar_id}" logger.info(f"KikV2ApiClient: Step 2 - Retrieved document URL: {document_url}") except Exception as e: logger.error(f"KikV2ApiClient: Error getting document URL for ID {document_id}: {str(e)}") # Fallback to old method if GetSorgulamaUrl fails # Also encrypt numeric IDs in fallback path karar_id = document_id if document_id.isdigit(): try: karar_id = self.encrypt_document_id(document_id) logger.info(f"KikV2ApiClient: Encrypted numeric ID in fallback: {karar_id}") except Exception as enc_error: logger.warning(f"KikV2ApiClient: Could not encrypt in fallback: {enc_error}") document_url = f"https://ekap.kik.gov.tr/EKAP/Vatandas/KurulKararGoster.aspx?KararId={karar_id}" logger.info(f"KikV2ApiClient: Falling back to direct URL: {document_url}") try: # Step 2: Use httpx to get the document content logger.info(f"KikV2ApiClient: Step 2 - Using httpx to retrieve document from: {document_url}") # Create a separate httpx client for document retrieval with HTML headers doc_ssl_context = ssl.create_default_context() doc_ssl_context.check_hostname = False doc_ssl_context.verify_mode = ssl.CERT_NONE if hasattr(ssl, 'OP_LEGACY_SERVER_CONNECT'): doc_ssl_context.options |= ssl.OP_LEGACY_SERVER_CONNECT doc_ssl_context.set_ciphers('ALL:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!SRP:!CAMELLIA') async with httpx.AsyncClient( verify=doc_ssl_context, headers={ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "tr,en-US;q=0.5", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36" }, timeout=60.0, follow_redirects=True ) as doc_client: response = await doc_client.get(document_url) response.raise_for_status() html_content = response.text logger.info(f"KikV2ApiClient: Retrieved content via httpx, length: {len(html_content)}") # Convert HTML to Markdown using MarkItDown with BytesIO try: from markitdown import MarkItDown from io import BytesIO md = MarkItDown() html_bytes = html_content.encode('utf-8') html_stream = BytesIO(html_bytes) result = md.convert_stream(html_stream, file_extension=".html") markdown_content = result.text_content return KikV2DocumentMarkdown( document_id=document_id, kararNo="", markdown_content=markdown_content, source_url=document_url, error_message="" ) except ImportError: return KikV2DocumentMarkdown( document_id=document_id, kararNo="", markdown_content="MarkItDown library not available", source_url=document_url, error_message="MarkItDown library not installed" ) except Exception as e: logger.error(f"KikV2ApiClient: Error retrieving document {document_id}: {str(e)}") return KikV2DocumentMarkdown( document_id=document_id, kararNo="", markdown_content="", source_url=document_url, error_message=str(e) ) async def close_client_session(self): """Close HTTP client session.""" await self.http_client.aclose() logger.info("KikV2ApiClient: HTTP client session closed.")

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/saidsurucu/yargi-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server