Skip to content

homo_server

HomoServer(strategy)

Bases: base_server.BaseServer

Implement homogeneous server base on base_server.BaseServer.

Source code in iflearner/communication/homo/homo_server.py
def __init__(self, strategy: strategy_server.StrategyServer) -> None:
    self._callback_messages: dict = dict()
    self._strategy = strategy

callback(request, context)

The channel of pushing message to clients initiatively.

Source code in iflearner/communication/homo/homo_server.py
def callback(
    self, request: base_pb2.BaseRequest, context: Any
) -> base_pb2.BaseResponse:
    """The channel of pushing message to clients initiatively."""

    start = timeit.default_timer()
    type, resp_data = self._strategy.get_client_notification(request.party_name)
    if type is not None:
        stop = timeit.default_timer()
        logger.info(
            f"OUT: party: {request.party_name}, message type: {type}, time: {1000 * (stop - start)}ms"
        )

    if resp_data is None:
        return base_pb2.BaseResponse(type=type)

    return base_pb2.BaseResponse(type=type, data=resp_data.SerializeToString())

post(request, context)

Handle client requests asynchronously.

Source code in iflearner/communication/homo/homo_server.py
def post(
    self, request: base_pb2.BaseRequest, context: Any
) -> base_pb2.BaseResponse:
    """Handle client requests asynchronously."""

    try:
        start = timeit.default_timer()
        resp_data = None
        if request.type == message_type.MSG_UPLOAD_PARAM:
            req_data = homo_pb2.UploadParam()
            req_data.ParseFromString(request.data)
            resp_data = self._strategy.handler_upload_param(
                request.party_name, req_data
            )  # type: ignore
        elif request.type in self._strategy.custom_handlers:
            resp_data = self._strategy.custom_handlers[request.type](request.data)
    except HomoException as e:
        logger.info(e)
        return base_pb2.BaseResponse(code=e.code, message=e.message)
    except Exception as e:
        logger.info(e)
        return base_pb2.BaseResponse(
            code=HomoException.HomoResponseCode.InternalError, message=str(e)
        )
    else:
        if resp_data is None:
            return base_pb2.BaseResponse()
        return base_pb2.BaseResponse(data=resp_data.SerializeToString())  # type: ignore
    finally:
        stop = timeit.default_timer()
        logger.info(
            f"IN: party: {request.party_name}, message type: {request.type}, time: {1000 * (stop - start)}ms"
        )

send(request, context)

Handle client requests synchronously.

Source code in iflearner/communication/homo/homo_server.py
def send(
    self, request: base_pb2.BaseRequest, context: Any
) -> base_pb2.BaseResponse:
    """Handle client requests synchronously."""

    try:
        start = timeit.default_timer()
        resp_data = None
        if request.type == message_type.MSG_REGISTER:
            data = homo_pb2.RegistrationInfo()
            data.ParseFromString(request.data)
            resp_data = self._strategy.handler_register(
                request.party_name, data.sample_num, data.step_num
            )
        elif request.type == message_type.MSG_CLIENT_READY:
            resp_data = self._strategy.handler_client_ready(request.party_name)  # type: ignore
        elif request.type == message_type.MSG_COMPLETE:
            resp_data = self._strategy.handler_complete(request.party_name)  # type: ignore
        elif request.type in self._strategy.custom_handlers:
            resp_data = self._strategy.custom_handlers[request.type](
                request.party_name, request.data
            )
    except HomoException as e:
        logger.info(e)
        return base_pb2.BaseResponse(code=e.code, message=e.message)
    except Exception as e:
        logger.info(e)
        return base_pb2.BaseResponse(
            code=HomoException.HomoResponseCode.InternalError, message=str(e)
        )
    else:
        if resp_data is None:
            return base_pb2.BaseResponse()
        return base_pb2.BaseResponse(data=resp_data.SerializeToString())  # type: ignore
    finally:
        stop = timeit.default_timer()
        logger.info(
            f"IN: party: {request.party_name}, message type: {request.type}, time: {1000 * (stop - start)}ms"
        )