Skip to content

strategy_server

StrategyServer(num_clients, total_epoch)

Bases: ABC

Implement the strategy of server.

Parameters:

Name Type Description Default
num_clients int

client number

required
total_epoch int

the epoch number of client trainning

required

Attributes:

Name Type Description
_num_clients int)

client number

_total_epoch int

the epoch number of client trainning

_custom_handlers Dict[str, Any]
_complete_num int

the number of client that has completed federated learning

_clients Dict[str, ClientStatus])

a dict storage the client status

_training_clients dict)

a dict storage the training clients

_server_param dict

the server model

_ready_num int

the number of ready client

_uploaded_num int

the number of client that has uploaded its model parameters

_aggregated_num int

the number of client that has aggregated its model parameters

_on_aggregating bool

whether the server is in aggregating stage

_params dict

the server model parameters

Source code in iflearner/business/homo/strategy/strategy_server.py
def __init__(self, num_clients, total_epoch) -> None:
    self._num_clients = num_clients
    self._total_epoch = total_epoch
    self._custom_handlers: Dict[str, Any] = dict()
    self._complete_num: int = 0
    self._clients: Dict[str, ClientStatus] = {}
    self._training_clients: dict = {}
    self._server_param = None
    self._ready_num = 0
    self._uploaded_num = 0
    self._aggregated_num = 0
    self._on_aggregating = False
    self._params: dict = {}
    self._metric = Metric(logdir="metric")

clients_to_json()

save clients to json file.

Returns:

Name Type Description
str str

json string

Source code in iflearner/business/homo/strategy/strategy_server.py
def clients_to_json(self) -> str:
    """save clients to json file.

    Returns:
        str: json string
    """

    tmp = dict()
    for k, v in self._clients.items():
        print(k, v)
        tmp[k] = v.__dict__

    return json.dumps(tmp)

get_client_notification(party_name)

Get the notification information of the specified client.

Parameters:

Name Type Description Default
party_name str

client name

required

Returns:

Type Description
Tuple[str, Any]

Tuple[str, Any]: the notification message type and notification data

Source code in iflearner/business/homo/strategy/strategy_server.py
def get_client_notification(self, party_name: str) -> Tuple[str, Any]:
    """Get the notification information of the specified client.

    Args:
        party_name (str): client name

    Returns:
        Tuple[str, Any]: the notification message type and notification data
    """
    if party_name in self._training_clients:
        if self._on_aggregating:
            if not self._training_clients[party_name].get("aggregating", False):
                self._training_clients[party_name]["aggregating"] = True
                result = homo_pb2.AggregateResult(parameters=self._server_param)

                self._aggregated_num += 1
                if self._aggregated_num == self._num_clients:
                    self._aggregated_num = 0
                    self._on_aggregating = False
                    self._training_clients.clear()

                return message_type.MSG_AGGREGATE_RESULT, result
        elif not self._training_clients[party_name].get("training", False):
            self._training_clients[party_name]["training"] = True
            return message_type.MSG_NOTIFY_TRAINING, None
    return "", None

handler_client_ready(party_name)

Handle the message of MSG_CLIENT_READY from the client.

Source code in iflearner/business/homo/strategy/strategy_server.py
def handler_client_ready(self, party_name: str) -> None:
    """Handle the message of MSG_CLIENT_READY from the client."""
    logger.info(f"Client ready: {party_name}")
    if party_name not in self._clients:
        raise HomoException(
            HomoException.HomoResponseCode.Unauthorized, "Unregistered client."
        )

    self._clients[party_name].ready = True
    self._clients[party_name].current_epoch += 1
    self._ready_num += 1
    if self._ready_num == self._num_clients:
        logger.info("Clients are all ready.")
        self._ready_num = 0
        for k in self._clients.keys():
            self._training_clients[k] = dict()

handler_complete(party_name)

Handle the message of MSG_COMPLETE from the client.

Parameters:

Name Type Description Default
party_name str

client name

required

Raises:

Type Description
HomoException

if party_name not in the register list, raise the Unauthorized error

Source code in iflearner/business/homo/strategy/strategy_server.py
def handler_complete(self, party_name: str) -> None:
    """Handle the message of MSG_COMPLETE from the client.

    Args:
        party_name (str): client name

    Raises:
        HomoException: if party_name not in the register list, raise the Unauthorized error
    """
    logger.info(f"Client complete: {party_name}")
    if party_name not in self._clients:
        raise HomoException(
            HomoException.HomoResponseCode.Unauthorized, "Unregistered client."
        )

    self._complete_num += 1
    self._clients[party_name].complete = True

handler_register(party_name, sample_num=0, step_num=0)

Handle the message of MSG_REGISTER from the client.

Parameters:

Name Type Description Default
party_name str

client name

required
sample_num Optional[int]

the total sample number of client party_name . Defaults to 0.

0
step_num int

The number a client epoch needs to be optimized, always equals to the batch number of client. Defaults to 0.

0

Raises:

Type Description
HomoException

description

Returns:

Type Description
homo_pb2.RegistrationResponse

homo_pb2.RegistrationResponse: if party_name not in the register list, raise the Unauthorized error

Source code in iflearner/business/homo/strategy/strategy_server.py
def handler_register(
    self, party_name: str, sample_num: Optional[int] = 0, step_num: int = 0
) -> homo_pb2.RegistrationResponse:
    """Handle the message of MSG_REGISTER from the client.

    Args:
        party_name (str): client name
        sample_num (Optional[int], optional): the total sample number of client `party_name` . Defaults to 0.
        step_num (int, optional): The number a client epoch needs to be optimized, always equals to the batch number of client. Defaults to 0.

    Raises:
        HomoException: _description_

    Returns:
        homo_pb2.RegistrationResponse: if party_name not in the register list, raise the Unauthorized error
    """
    logger.info(f"Client register: {party_name}")
    if len(self._clients) >= self._num_clients:
        raise HomoException(
            HomoException.HomoResponseCode.Unauthorized,
            "Registered clients are full.",
        )

    self._clients[party_name] = ClientStatus(self._total_epoch)

handler_upload_param(party_name, data)

Handle the message of MSG_UPLOAD_PARAM from the client.

Parameters:

Name Type Description Default
party_name str

client name

required
data homo_pb2.UploadParam

the data uploaded from party_name, with grpc format

required

Raises:

Type Description
HomoException

if party_name not in the training_clients list, raise the Forbidden error

Source code in iflearner/business/homo/strategy/strategy_server.py
def handler_upload_param(self, party_name: str, data: homo_pb2.UploadParam) -> None:
    """Handle the message of MSG_UPLOAD_PARAM from the client.

    Args:
        party_name (str): client name
        data (homo_pb2.UploadParam): the data uploaded from `party_name`, with grpc format

    Raises:
        HomoException:  if party_name not in the training_clients list, raise the Forbidden error
    """
    logger.info(f"Client: {party_name}, epoch: {data.epoch}")
    if party_name not in self._training_clients:
        raise HomoException(
            HomoException.HomoResponseCode.Forbidden, "Client not notified."
        )

    self._training_clients[party_name]["param"] = data.parameters
    self._uploaded_num += 1
    if self._params is None:
        self._params = dict()
        for param_name, param_info in data.parameters.items():
            self._params[param_name] = np.array(param_info.values).reshape(
                param_info.shape
            )

    if data.metrics is not None:
        for k, v in data.metrics.items():
            self._metric.add(k, party_name, data.epoch, v)