Bases: base_server.BaseServer
Implement homogeneous server base on base_server.BaseServer.
Source code in iflearner/communication/homo/homo_server.py
| def __init__(self, strategy: strategy_server.StrategyServer) -> None:
self._callback_messages: dict = dict()
self._strategy = strategy
|
callback(request, context)
The channel of pushing message to clients initiatively.
Source code in iflearner/communication/homo/homo_server.py
| def callback(
self, request: base_pb2.BaseRequest, context: Any
) -> base_pb2.BaseResponse:
"""The channel of pushing message to clients initiatively."""
start = timeit.default_timer()
type, resp_data = self._strategy.get_client_notification(request.party_name)
if type is not None:
stop = timeit.default_timer()
logger.info(
f"OUT: party: {request.party_name}, message type: {type}, time: {1000 * (stop - start)}ms"
)
if resp_data is None:
return base_pb2.BaseResponse(type=type)
return base_pb2.BaseResponse(type=type, data=resp_data.SerializeToString())
|
post(request, context)
Handle client requests asynchronously.
Source code in iflearner/communication/homo/homo_server.py
| def post(
self, request: base_pb2.BaseRequest, context: Any
) -> base_pb2.BaseResponse:
"""Handle client requests asynchronously."""
try:
start = timeit.default_timer()
resp_data = None
if request.type == message_type.MSG_UPLOAD_PARAM:
req_data = homo_pb2.UploadParam()
req_data.ParseFromString(request.data)
resp_data = self._strategy.handler_upload_param(
request.party_name, req_data
) # type: ignore
elif request.type in self._strategy.custom_handlers:
resp_data = self._strategy.custom_handlers[request.type](request.data)
except HomoException as e:
logger.info(e)
return base_pb2.BaseResponse(code=e.code, message=e.message)
except Exception as e:
logger.info(e)
return base_pb2.BaseResponse(
code=HomoException.HomoResponseCode.InternalError, message=str(e)
)
else:
if resp_data is None:
return base_pb2.BaseResponse()
return base_pb2.BaseResponse(data=resp_data.SerializeToString()) # type: ignore
finally:
stop = timeit.default_timer()
logger.info(
f"IN: party: {request.party_name}, message type: {request.type}, time: {1000 * (stop - start)}ms"
)
|
send(request, context)
Handle client requests synchronously.
Source code in iflearner/communication/homo/homo_server.py
| def send(
self, request: base_pb2.BaseRequest, context: Any
) -> base_pb2.BaseResponse:
"""Handle client requests synchronously."""
try:
start = timeit.default_timer()
resp_data = None
if request.type == message_type.MSG_REGISTER:
data = homo_pb2.RegistrationInfo()
data.ParseFromString(request.data)
resp_data = self._strategy.handler_register(
request.party_name, data.sample_num, data.step_num
)
elif request.type == message_type.MSG_CLIENT_READY:
resp_data = self._strategy.handler_client_ready(request.party_name) # type: ignore
elif request.type == message_type.MSG_COMPLETE:
resp_data = self._strategy.handler_complete(request.party_name) # type: ignore
elif request.type in self._strategy.custom_handlers:
resp_data = self._strategy.custom_handlers[request.type](
request.party_name, request.data
)
except HomoException as e:
logger.info(e)
return base_pb2.BaseResponse(code=e.code, message=e.message)
except Exception as e:
logger.info(e)
return base_pb2.BaseResponse(
code=HomoException.HomoResponseCode.InternalError, message=str(e)
)
else:
if resp_data is None:
return base_pb2.BaseResponse()
return base_pb2.BaseResponse(data=resp_data.SerializeToString()) # type: ignore
finally:
stop = timeit.default_timer()
logger.info(
f"IN: party: {request.party_name}, message type: {request.type}, time: {1000 * (stop - start)}ms"
)
|