mcp_pb2_grpc.py•17.1 kB
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import mcp_pb2 as mcp__pb2
GRPC_GENERATED_VERSION = "1.73.1"
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(
GRPC_VERSION, GRPC_GENERATED_VERSION
)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f"The grpc package installed is at version {GRPC_VERSION},"
+ " but the generated code in mcp_pb2_grpc.py depends on"
+ f" grpcio>={GRPC_GENERATED_VERSION}."
+ f" Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}"
+ f" or downgrade your generated code using grpcio-tools<={GRPC_VERSION}."
)
class MnemosyneMCPStub(object):
"""===================================================================
核心服務定義 (Core Service Definition)
===================================================================
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.HealthCheck = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/HealthCheck",
request_serializer=mcp__pb2.HealthCheckRequest.SerializeToString,
response_deserializer=mcp__pb2.HealthCheckResponse.FromString,
_registered_method=True,
)
self.IngestProject = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/IngestProject",
request_serializer=mcp__pb2.IngestProjectRequest.SerializeToString,
response_deserializer=mcp__pb2.IngestProjectResponse.FromString,
_registered_method=True,
)
self.GetIngestStatus = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/GetIngestStatus",
request_serializer=mcp__pb2.GetIngestStatusRequest.SerializeToString,
response_deserializer=mcp__pb2.IngestStatus.FromString,
_registered_method=True,
)
self.Search = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/Search",
request_serializer=mcp__pb2.SearchRequest.SerializeToString,
response_deserializer=mcp__pb2.SearchResponse.FromString,
_registered_method=True,
)
self.RunImpactAnalysis = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/RunImpactAnalysis",
request_serializer=mcp__pb2.ImpactAnalysisRequest.SerializeToString,
response_deserializer=mcp__pb2.ImpactAnalysisResponse.FromString,
_registered_method=True,
)
self.ApplyConstraint = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/ApplyConstraint",
request_serializer=mcp__pb2.ApplyConstraintRequest.SerializeToString,
response_deserializer=mcp__pb2.ApplyConstraintResponse.FromString,
_registered_method=True,
)
self.AcquireLock = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/AcquireLock",
request_serializer=mcp__pb2.AcquireLockRequest.SerializeToString,
response_deserializer=mcp__pb2.AcquireLockResponse.FromString,
_registered_method=True,
)
self.ReleaseLock = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/ReleaseLock",
request_serializer=mcp__pb2.ReleaseLockRequest.SerializeToString,
response_deserializer=mcp__pb2.ReleaseLockResponse.FromString,
_registered_method=True,
)
self.GetGraphVisualization = channel.unary_unary(
"/mnemosyne.mcp.v1.MnemosyneMCP/GetGraphVisualization",
request_serializer=mcp__pb2.GetGraphVisualizationRequest.SerializeToString,
response_deserializer=mcp__pb2.GraphVisualization.FromString,
_registered_method=True,
)
class MnemosyneMCPServicer(object):
"""===================================================================
核心服務定義 (Core Service Definition)
===================================================================
"""
def HealthCheck(self, request, context):
"""--- 系統診斷 (System Diagnostics) ---"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def IngestProject(self, request, context):
"""--- 數據導入與管理 (Ingestion & Management) ---"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def GetIngestStatus(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def Search(self, request, context):
"""--- 查詢與分析 (Query & Analysis) ---"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def RunImpactAnalysis(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def ApplyConstraint(self, request, context):
"""--- 約束與鎖定 (Constraints & Locking) ---"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def AcquireLock(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def ReleaseLock(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def GetGraphVisualization(self, request, context):
"""--- 可視化 (Visualization) ---"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def add_MnemosyneMCPServicer_to_server(servicer, server):
rpc_method_handlers = {
"HealthCheck": grpc.unary_unary_rpc_method_handler(
servicer.HealthCheck,
request_deserializer=mcp__pb2.HealthCheckRequest.FromString,
response_serializer=mcp__pb2.HealthCheckResponse.SerializeToString,
),
"IngestProject": grpc.unary_unary_rpc_method_handler(
servicer.IngestProject,
request_deserializer=mcp__pb2.IngestProjectRequest.FromString,
response_serializer=mcp__pb2.IngestProjectResponse.SerializeToString,
),
"GetIngestStatus": grpc.unary_unary_rpc_method_handler(
servicer.GetIngestStatus,
request_deserializer=mcp__pb2.GetIngestStatusRequest.FromString,
response_serializer=mcp__pb2.IngestStatus.SerializeToString,
),
"Search": grpc.unary_unary_rpc_method_handler(
servicer.Search,
request_deserializer=mcp__pb2.SearchRequest.FromString,
response_serializer=mcp__pb2.SearchResponse.SerializeToString,
),
"RunImpactAnalysis": grpc.unary_unary_rpc_method_handler(
servicer.RunImpactAnalysis,
request_deserializer=mcp__pb2.ImpactAnalysisRequest.FromString,
response_serializer=mcp__pb2.ImpactAnalysisResponse.SerializeToString,
),
"ApplyConstraint": grpc.unary_unary_rpc_method_handler(
servicer.ApplyConstraint,
request_deserializer=mcp__pb2.ApplyConstraintRequest.FromString,
response_serializer=mcp__pb2.ApplyConstraintResponse.SerializeToString,
),
"AcquireLock": grpc.unary_unary_rpc_method_handler(
servicer.AcquireLock,
request_deserializer=mcp__pb2.AcquireLockRequest.FromString,
response_serializer=mcp__pb2.AcquireLockResponse.SerializeToString,
),
"ReleaseLock": grpc.unary_unary_rpc_method_handler(
servicer.ReleaseLock,
request_deserializer=mcp__pb2.ReleaseLockRequest.FromString,
response_serializer=mcp__pb2.ReleaseLockResponse.SerializeToString,
),
"GetGraphVisualization": grpc.unary_unary_rpc_method_handler(
servicer.GetGraphVisualization,
request_deserializer=mcp__pb2.GetGraphVisualizationRequest.FromString,
response_serializer=mcp__pb2.GraphVisualization.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
"mnemosyne.mcp.v1.MnemosyneMCP", rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers(
"mnemosyne.mcp.v1.MnemosyneMCP", rpc_method_handlers
)
# This class is part of an EXPERIMENTAL API.
class MnemosyneMCP(object):
"""===================================================================
核心服務定義 (Core Service Definition)
===================================================================
"""
@staticmethod
def HealthCheck(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/HealthCheck",
mcp__pb2.HealthCheckRequest.SerializeToString,
mcp__pb2.HealthCheckResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
@staticmethod
def IngestProject(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/IngestProject",
mcp__pb2.IngestProjectRequest.SerializeToString,
mcp__pb2.IngestProjectResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
@staticmethod
def GetIngestStatus(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/GetIngestStatus",
mcp__pb2.GetIngestStatusRequest.SerializeToString,
mcp__pb2.IngestStatus.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
@staticmethod
def Search(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/Search",
mcp__pb2.SearchRequest.SerializeToString,
mcp__pb2.SearchResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
@staticmethod
def RunImpactAnalysis(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/RunImpactAnalysis",
mcp__pb2.ImpactAnalysisRequest.SerializeToString,
mcp__pb2.ImpactAnalysisResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
@staticmethod
def ApplyConstraint(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/ApplyConstraint",
mcp__pb2.ApplyConstraintRequest.SerializeToString,
mcp__pb2.ApplyConstraintResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
@staticmethod
def AcquireLock(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/AcquireLock",
mcp__pb2.AcquireLockRequest.SerializeToString,
mcp__pb2.AcquireLockResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
@staticmethod
def ReleaseLock(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/ReleaseLock",
mcp__pb2.ReleaseLockRequest.SerializeToString,
mcp__pb2.ReleaseLockResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)
@staticmethod
def GetGraphVisualization(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/mnemosyne.mcp.v1.MnemosyneMCP/GetGraphVisualization",
mcp__pb2.GetGraphVisualizationRequest.SerializeToString,
mcp__pb2.GraphVisualization.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True,
)