I am using flwr framework to send a random array from client to server and then server will merge the array and sends back to each of the clients.
import numpy as np
import flwr as fl
from flwr.server.strategy import Strategy
from typing import List
class MergeArraysStrategy(fl.server.strategy.FedAvg):
def aggregate_fit(self, rnd, results, failures):
aggregated_parameters, aggregated_metrics = super().aggregate_fit(rnd, results, failures)
print(aggregated_parameters, aggregated_metrics)
# if aggregated_parameters is not None:
# # Convert `Parameters` to `List[np.ndarray]`
# aggregated_ndarrays: List[np.ndarray] = fl.common.parameters_to_ndarrays(aggregated_parameters)
# # Save aggregated_ndarrays
# print(f"Saving round {rnd} aggregated_ndarrays...")
# np.savez(f"round-{rnd}-weights.npz", *aggregated_ndarrays)
# return aggregated_parameters, aggregated_metrics
print(results)
self.arrays = []
for result in results:
self.arrays.append(result.parameters)
if self.arrays:
merged_array = np.concatenate(self.arrays)
self.arrays = []
return merged_array, {}
else:
# Return some default or empty array if there are no results
return np.array([]), {}
def configure_evaluate(self, server_round, parameters, client_manager):
pass
def evaluate(self, value, parameters):
pass
# Create a Flower server
strategy = MergeArraysStrategy(min_available_clients=3, min_fit_clients=3)
client_manager = fl.server.SimpleClientManager()
server = fl.server.Server(client_manager=client_manager, strategy=strategy)
# Start the server
fl.server.start_server(
server_address="127.0.0.1:8080",
config=fl.server.ServerConfig(num_rounds=2),
server=server
)
Client code:
import flwr as fl
import numpy as np
import flwr
from flwr.common import (
Code,
EvaluateIns,
EvaluateRes,
FitIns,
FitRes,
GetParametersIns,
GetParametersRes,
Status,
ndarrays_to_parameters,
parameters_to_ndarrays,
)
class Client(fl.client.NumPyClient):
def __init__(self, array):
self.array = array
print(self.array)
def get_parameters(self, ins: GetParametersIns) -> GetParametersRes:
#print(f"[Client {self.cid}] get_parameters")
# Get parameters as a list of NumPy ndarray's
ndarrays: np.ndarray = self.array
# Serialize ndarray's into a Parameters object
parameters = ndarrays_to_parameters(ndarrays)
# Build and return response
status = Status(code=Code.EVALUATE_NOT_IMPLEMENTED, message="Success")
return GetParametersRes(
status=status,
parameters=parameters,
)
# def fit(self, parameters):
# self.array = parameters
# fit_res = flwr.common.FitRes(status=flwr.common.Status(
# code=flwr.common.Code.EVALUATE_NOT_IMPLEMENTED ,
# message="Client does not implement `fit`",
# ),
# parameters=self.array,
# num_examples=len(self.array ),
# metrics={})
# return fit_res
def fit(self, ins: FitIns) -> FitRes:
# Deserialize parameters to NumPy ndarray's
parameters_original = ins.parameters
self.array = parameters_to_ndarrays(parameters_original)
# Update the model parameters using your training logic
# This is where you should perform the model training with the received parameters
# Serialize updated ndarray's into a Parameters object
parameters_updated = ndarrays_to_parameters(self.array)
# Build and return response
status = Status(code=Code.EVALUATE_NOT_IMPLEMENTED, message="Success") # Change the status code to SUCCESS
return FitRes(
status=status,
parameters=parameters_updated, # Return the updated model parameters
num_examples=len(self.array),
metrics={},
)
def evaluate(self, parameters):
pass
# Create a Flower client
client = Client(array=np.random.randn(2))
# Connect to the server
fl.client.start_client(server_address="127.0.0.1:8080", client=client)
# The server should handle the aggregation logic and return the merged array
# The client can retrieve the merged array from its 'array' attribute
merged_array = client.array
print(merged_array)
Scenario: Suppose two clients have [1, 2] and [3, 4] data. The server will collect these datasets and concatenate them. Then the server will have [1, 2, 3, 4] and send back to each of the clients. So after one round each client will have [1, 2, 3, 4].
But I am getting the result as my expectation. The error log on the server side is as follows:
INFO flwr 2023-10-27 18:55:29,709 | app.py:165 | Starting Flower server, config: ServerConfig(num_rounds=2, round_timeout=None)
INFO flwr 2023-10-27 18:55:29,745 | app.py:179 | Flower ECE: gRPC server running (2 rounds), SSL is disabled
INFO flwr 2023-10-27 18:55:29,746 | server.py:89 | Initializing global parameters
INFO flwr 2023-10-27 18:55:29,746 | server.py:277 | Requesting initial parameters from one random client
INFO flwr 2023-10-27 18:55:37,053 | server.py:281 | Received initial parameters from one random client
INFO flwr 2023-10-27 18:55:37,054 | server.py:91 | Evaluating initial parameters
None
INFO flwr 2023-10-27 18:55:37,054 | server.py:105 | FL starting
DEBUG flwr 2023-10-27 18:55:47,744 | server.py:228 | fit_round 1: strategy sampled 3 clients (out of 3)
DEBUG flwr 2023-10-27 18:55:47,759 | server.py:242 | fit_round 1 received 0 results and 3 failures
None {}
[]
INFO flwr 2023-10-27 18:55:47,760 | server.py:172 | evaluate_round 1: no clients selected, cancel
DEBUG flwr 2023-10-27 18:55:47,760 | server.py:228 | fit_round 2: strategy sampled 3 clients (out of 3)
DEBUG flwr 2023-10-27 18:55:47,766 | server.py:242 | fit_round 2 received 0 results and 3 failures
None {}
[]
INFO flwr 2023-10-27 18:55:47,766 | server.py:172 | evaluate_round 2: no clients selected, cancel
INFO flwr 2023-10-27 18:55:47,767 | server.py:154 | FL finished in 10.712144899999998
INFO flwr 2023-10-27 18:55:47,767 | app.py:225 | app_fit: losses_distributed []
INFO flwr 2023-10-27 18:55:47,768 | app.py:226 | app_fit: metrics_distributed_fit {}
INFO flwr 2023-10-27 18:55:47,768 | app.py:227 | app_fit: metrics_distributed {}
INFO flwr 2023-10-27 18:55:47,768 | app.py:228 | app_fit: losses_centralized []
INFO flwr 2023-10-27 18:55:47,769 | app.py:229 | app_fit: metrics_centralized {}